Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20/// Global tracker for containers created by this component.
21/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
22static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25/// Registers a container ID for tracking (will be cleaned up on shutdown).
26fn track_container(id: String) {
27    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28        tracker.insert(id);
29    }
30}
31
32/// Removes a container ID from tracking (when it's been removed naturally).
33fn untrack_container(id: &str) {
34    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35        tracker.remove(id);
36    }
37}
38
39/// Cleans up all tracked containers. Call this on application shutdown.
40pub async fn cleanup_tracked_containers() {
41    let ids: Vec<String> = {
42        match CONTAINER_TRACKER.lock() {
43            Ok(tracker) => tracker.iter().cloned().collect(),
44            Err(_) => return,
45        }
46    };
47
48    if ids.is_empty() {
49        return;
50    }
51
52    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54    let docker = match Docker::connect_with_unix_defaults() {
55        Ok(d) => d,
56        Err(e) => {
57            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58            return;
59        }
60    };
61
62    for id in ids {
63        match docker
64            .remove_container(
65                &id,
66                Some(bollard::container::RemoveContainerOptions {
67                    force: true,
68                    ..Default::default()
69                }),
70            )
71            .await
72        {
73            Ok(_) => {
74                tracing::debug!("Cleaned up container {}", id);
75                untrack_container(&id);
76            }
77            Err(e) => {
78                tracing::warn!("Failed to cleanup container {}: {}", id, e);
79            }
80        }
81    }
82}
83
84// Header constants for container operations
85
86/// Timeout (seconds) for connecting to the Docker daemon.
87const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
90pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92/// Header key for specifying the container image to use for "run" operations.
93pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95/// Header key for specifying or receiving the container ID.
96pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98/// Header key for the log stream type (stdout or stderr).
99pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101/// Header key for the log timestamp.
102pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104/// Header key for specifying the container name for "run" operations.
105pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107/// Header key for the result status of a container operation (e.g., "success").
108pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110/// Configuration for the container component endpoint.
111///
112/// This struct holds the parsed URI configuration including the operation type,
113/// optional container image, and Docker host connection details.
114#[derive(Debug, Clone)]
115pub struct ContainerConfig {
116    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
117    pub operation: String,
118    /// The container image to use for "run" operations (can be overridden via header).
119    pub image: Option<String>,
120    /// The container name to use for "run" operations (can be overridden via header).
121    pub name: Option<String>,
122    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
123    pub host: Option<String>,
124    /// Command to run in the container (e.g., "sleep 30").
125    pub cmd: Option<String>,
126    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
127    pub ports: Option<String>,
128    /// Environment variables in format "KEY=value,KEY2=value2".
129    pub env: Option<String>,
130    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
131    pub network: Option<String>,
132    /// Container ID or name for logs consumer.
133    pub container_id: Option<String>,
134    /// Follow log output (default: true for consumer).
135    pub follow: bool,
136    /// Include timestamps in logs (default: false).
137    pub timestamps: bool,
138    /// Number of lines to show from the end of logs (default: all).
139    pub tail: Option<String>,
140    /// Automatically pull the image if not present (default: true).
141    pub auto_pull: bool,
142    /// Automatically remove the container when it exits (default: true).
143    pub auto_remove: bool,
144}
145
146impl ContainerConfig {
147    /// Parses a container URI into a `ContainerConfig`.
148    ///
149    /// # Arguments
150    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
151    ///
152    /// # Errors
153    /// Returns an error if the URI scheme is not "container".
154    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
155        let parts = parse_uri(uri)?;
156        if parts.scheme != "container" {
157            return Err(CamelError::InvalidUri(format!(
158                "expected scheme 'container', got '{}'",
159                parts.scheme
160            )));
161        }
162
163        let image = parts.params.get("image").cloned();
164        let name = parts.params.get("name").cloned();
165        let cmd = parts.params.get("cmd").cloned();
166        let ports = parts.params.get("ports").cloned();
167        let env = parts.params.get("env").cloned();
168        let network = parts.params.get("network").cloned();
169        let container_id = parts.params.get("containerId").cloned();
170        let follow = parts
171            .params
172            .get("follow")
173            .map(|v| v.eq_ignore_ascii_case("true"))
174            .unwrap_or(true);
175        let timestamps = parts
176            .params
177            .get("timestamps")
178            .map(|v| v.eq_ignore_ascii_case("true"))
179            .unwrap_or(false);
180        let tail = parts.params.get("tail").cloned();
181        let auto_pull = parts
182            .params
183            .get("autoPull")
184            .map(|v| v.eq_ignore_ascii_case("true"))
185            .unwrap_or(true);
186        let auto_remove = parts
187            .params
188            .get("autoRemove")
189            .map(|v| v.eq_ignore_ascii_case("true"))
190            .unwrap_or(true);
191        let host = parts
192            .params
193            .get("host")
194            .cloned()
195            .or_else(|| Some("unix:///var/run/docker.sock".to_string()));
196
197        Ok(Self {
198            operation: parts.path,
199            image,
200            name,
201            host,
202            cmd,
203            ports,
204            env,
205            network,
206            container_id,
207            follow,
208            timestamps,
209            tail,
210            auto_pull,
211            auto_remove,
212        })
213    }
214
215    fn docker_socket_path(&self) -> Result<&str, CamelError> {
216        let host = self
217            .host
218            .as_deref()
219            .unwrap_or("unix:///var/run/docker.sock");
220
221        if host.starts_with("unix://") {
222            return Ok(host.trim_start_matches("unix://"));
223        }
224
225        if host.contains("://") {
226            return Err(CamelError::ProcessorError(format!(
227                "Unsupported Docker host scheme: {} (only unix:// is supported)",
228                host
229            )));
230        }
231
232        Ok(host)
233    }
234
235    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
236        let socket_path = self.docker_socket_path()?;
237        Docker::connect_with_unix(
238            socket_path,
239            DOCKER_CONNECT_TIMEOUT_SECS,
240            bollard::API_DEFAULT_VERSION,
241        )
242        .map_err(|e| {
243            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
244        })
245    }
246
247    /// Connects to the Docker daemon using the configured host.
248    ///
249    /// This method establishes a Unix socket connection to Docker and verifies
250    /// the connection by sending a ping request.
251    ///
252    /// # Errors
253    /// Returns an error if the connection fails or the ping request fails.
254    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
255        let docker = self.connect_docker_client()?;
256        docker
257            .ping()
258            .await
259            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
260        Ok(docker)
261    }
262
263    #[allow(clippy::type_complexity)]
264    fn parse_ports(
265        &self,
266    ) -> Option<(
267        HashMap<String, HashMap<(), ()>>,
268        HashMap<String, Option<Vec<PortBinding>>>,
269    )> {
270        let ports_str = self.ports.as_ref()?;
271
272        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
273        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
274
275        for mapping in ports_str.split(',') {
276            let mapping = mapping.trim();
277            if mapping.is_empty() {
278                continue;
279            }
280
281            let (host_port, container_spec) = mapping.split_once(':')?;
282
283            let (container_port, protocol) = if container_spec.contains('/') {
284                let parts: Vec<&str> = container_spec.split('/').collect();
285                (parts[0], parts[1])
286            } else {
287                (container_spec, "tcp")
288            };
289
290            let container_key = format!("{}/{}", container_port, protocol);
291
292            exposed_ports.insert(container_key.clone(), HashMap::new());
293
294            port_bindings.insert(
295                container_key,
296                Some(vec![PortBinding {
297                    host_ip: None,
298                    host_port: Some(host_port.to_string()),
299                }]),
300            );
301        }
302
303        if exposed_ports.is_empty() {
304            None
305        } else {
306            Some((exposed_ports, port_bindings))
307        }
308    }
309
310    fn parse_env(&self) -> Option<Vec<String>> {
311        let env_str = self.env.as_ref()?;
312
313        let env_vars: Vec<String> = env_str
314            .split(',')
315            .map(|s| s.trim().to_string())
316            .filter(|s| !s.is_empty())
317            .collect();
318
319        if env_vars.is_empty() {
320            None
321        } else {
322            Some(env_vars)
323        }
324    }
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
328enum ProducerOperation {
329    List,
330    Run,
331    Start,
332    Stop,
333    Remove,
334}
335
336fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
337    match operation {
338        "list" => Ok(ProducerOperation::List),
339        "run" => Ok(ProducerOperation::Run),
340        "start" => Ok(ProducerOperation::Start),
341        "stop" => Ok(ProducerOperation::Stop),
342        "remove" => Ok(ProducerOperation::Remove),
343        _ => Err(CamelError::ProcessorError(format!(
344            "Unknown container operation: {}",
345            operation
346        ))),
347    }
348}
349
350fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
351    exchange
352        .input
353        .header(HEADER_CONTAINER_NAME)
354        .and_then(|v| v.as_str().map(|s| s.to_string()))
355        .or_else(|| config.name.clone())
356}
357
358async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
359    let images = docker
360        .list_images::<&str>(None)
361        .await
362        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
363
364    Ok(images.iter().any(|img| {
365        img.repo_tags
366            .iter()
367            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
368    }))
369}
370
371async fn pull_image_with_progress(
372    docker: &Docker,
373    image: &str,
374    timeout_secs: u64,
375) -> Result<(), CamelError> {
376    use futures::StreamExt;
377
378    tracing::info!("Pulling image: {}", image);
379
380    let mut stream = docker.create_image(
381        Some(bollard::image::CreateImageOptions {
382            from_image: image,
383            ..Default::default()
384        }),
385        None,
386        None,
387    );
388
389    let start = std::time::Instant::now();
390    let mut last_progress = std::time::Instant::now();
391
392    while let Some(item) = stream.next().await {
393        if start.elapsed().as_secs() > timeout_secs {
394            return Err(CamelError::ProcessorError(format!(
395                "Image pull timeout after {}s. Try manually: docker pull {}",
396                timeout_secs, image
397            )));
398        }
399
400        match item {
401            Ok(update) => {
402                // Log progress every 2 seconds
403                if last_progress.elapsed().as_secs() >= 2 {
404                    if let Some(status) = update.status {
405                        tracing::debug!("Pull progress: {}", status);
406                    }
407                    last_progress = std::time::Instant::now();
408                }
409            }
410            Err(e) => {
411                let err_str = e.to_string().to_lowercase();
412                if err_str.contains("unauthorized") || err_str.contains("401") {
413                    return Err(CamelError::ProcessorError(format!(
414                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
415                        image
416                    )));
417                }
418                if err_str.contains("not found") || err_str.contains("404") {
419                    return Err(CamelError::ProcessorError(format!(
420                        "Image '{}' not found in registry. Check the image name and tag",
421                        image
422                    )));
423                }
424                return Err(CamelError::ProcessorError(format!(
425                    "Failed to pull image '{}': {}",
426                    image, e
427                )));
428            }
429        }
430    }
431
432    tracing::info!("Successfully pulled image: {}", image);
433    Ok(())
434}
435
436async fn ensure_image_available(
437    docker: &Docker,
438    image: &str,
439    auto_pull: bool,
440    timeout_secs: u64,
441) -> Result<(), CamelError> {
442    if image_exists_locally(docker, image).await? {
443        tracing::debug!("Image '{}' already available locally", image);
444        return Ok(());
445    }
446
447    if !auto_pull {
448        return Err(CamelError::ProcessorError(format!(
449            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
450            image, image
451        )));
452    }
453
454    pull_image_with_progress(docker, image, timeout_secs).await
455}
456
457fn format_docker_event(event: &bollard::models::EventMessage) -> String {
458    let action = event.action.as_deref().unwrap_or("unknown");
459    let actor = event.actor.as_ref();
460
461    let container_name = actor
462        .and_then(|a| a.attributes.as_ref())
463        .and_then(|attrs| attrs.get("name"))
464        .map(|s| s.as_str())
465        .unwrap_or("unknown");
466
467    let image = actor
468        .and_then(|a| a.attributes.as_ref())
469        .and_then(|attrs| attrs.get("image"))
470        .map(|s| s.as_str())
471        .unwrap_or("");
472
473    let exit_code = actor
474        .and_then(|a| a.attributes.as_ref())
475        .and_then(|attrs| attrs.get("exitCode"))
476        .map(|s| s.as_str());
477
478    match action {
479        "create" => {
480            if image.is_empty() {
481                format!("[CREATE] Container {}", container_name)
482            } else {
483                format!("[CREATE] Container {} ({})", container_name, image)
484            }
485        }
486        "start" => format!("[START]  Container {}", container_name),
487        "die" => {
488            if let Some(code) = exit_code {
489                format!("[DIE]    Container {} (exit: {})", container_name, code)
490            } else {
491                format!("[DIE]    Container {}", container_name)
492            }
493        }
494        "destroy" => format!("[DESTROY] Container {}", container_name),
495        "stop" => format!("[STOP]   Container {}", container_name),
496        "pause" => format!("[PAUSE]  Container {}", container_name),
497        "unpause" => format!("[UNPAUSE] Container {}", container_name),
498        "restart" => format!("[RESTART] Container {}", container_name),
499        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
500    }
501}
502
503async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
504    create: CreateFn,
505    start: StartFn,
506    remove: RemoveFn,
507) -> Result<String, CamelError>
508where
509    CreateFn: FnOnce() -> CreateFut,
510    CreateFut: Future<Output = Result<String, CamelError>>,
511    StartFn: FnOnce(String) -> StartFut,
512    StartFut: Future<Output = Result<(), CamelError>>,
513    RemoveFn: FnOnce(String) -> RemoveFut,
514    RemoveFut: Future<Output = Result<(), CamelError>>,
515{
516    let container_id = create().await?;
517    if let Err(start_err) = start(container_id.clone()).await {
518        if let Err(remove_err) = remove(container_id.clone()).await {
519            return Err(CamelError::ProcessorError(format!(
520                "Failed to start container: {}. Cleanup failed: {}",
521                start_err, remove_err
522            )));
523        }
524        return Err(start_err);
525    }
526
527    Ok(container_id)
528}
529
530/// Producer for executing container operations.
531///
532/// This producer handles synchronous container operations like listing,
533/// creating, starting, stopping, and removing containers.
534#[derive(Clone)]
535pub struct ContainerProducer {
536    config: ContainerConfig,
537    docker: Docker,
538}
539
540impl Service<Exchange> for ContainerProducer {
541    type Response = Exchange;
542    type Error = CamelError;
543    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
544
545    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
546        Poll::Ready(Ok(()))
547    }
548
549    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
550        let config = self.config.clone();
551        let docker = self.docker.clone();
552        Box::pin(async move {
553            // Extract operation from header or use config
554            let operation_name = exchange
555                .input
556                .header(HEADER_ACTION)
557                .and_then(|v| v.as_str().map(|s| s.to_string()))
558                .unwrap_or_else(|| config.operation.clone());
559
560            let operation = parse_producer_operation(&operation_name)?;
561
562            // Execute operation
563            match operation {
564                ProducerOperation::List => {
565                    let containers = docker.list_containers::<String>(None).await.map_err(|e| {
566                        CamelError::ProcessorError(format!("Failed to list containers: {}", e))
567                    })?;
568
569                    let json_value = serde_json::to_value(&containers).map_err(|e| {
570                        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
571                    })?;
572
573                    exchange.input.body = Body::Json(json_value);
574                    exchange.input.set_header(
575                        HEADER_ACTION_RESULT,
576                        serde_json::Value::String("success".to_string()),
577                    );
578                }
579                ProducerOperation::Run => {
580                    // Run operation: create and start a container from an image
581                    let image = exchange
582                        .input
583                        .header(HEADER_IMAGE)
584                        .and_then(|v| v.as_str().map(|s| s.to_string()))
585                        .or(config.image.clone())
586                        .ok_or_else(|| {
587                            CamelError::ProcessorError(
588                                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
589                            )
590                        })?;
591
592                    // Ensure image is available (auto-pull if needed)
593                    let pull_timeout = 300; // 5 minutes default
594                    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
595                        .await
596                        .map_err(|e| {
597                            CamelError::ProcessorError(format!(
598                                "Image '{}' not available: {}",
599                                image, e
600                            ))
601                        })?;
602
603                    let container_name = resolve_container_name(&exchange, &config);
604                    let container_name_ref = container_name.as_deref().unwrap_or("");
605                    let cmd_parts: Option<Vec<String>> = config
606                        .cmd
607                        .as_ref()
608                        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
609                    let auto_remove = config.auto_remove;
610                    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
611                    let env_vars = config.parse_env();
612                    let network_mode = config.network.clone();
613
614                    let docker_create = docker.clone();
615                    let docker_start = docker.clone();
616                    let docker_remove = docker.clone();
617
618                    let container_id = run_container_with_cleanup(
619                        move || async move {
620                            let create_options = bollard::container::CreateContainerOptions {
621                                name: container_name_ref,
622                                ..Default::default()
623                            };
624                            let container_config = bollard::container::Config::<String> {
625                                image: Some(image.clone()),
626                                cmd: cmd_parts,
627                                env: env_vars,
628                                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
629                                host_config: Some(HostConfig {
630                                    auto_remove: Some(auto_remove),
631                                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
632                                    network_mode,
633                                    ..Default::default()
634                                }),
635                                ..Default::default()
636                            };
637
638                            let create_response = docker_create
639                                .create_container(Some(create_options), container_config)
640                                .await
641                                .map_err(|e| {
642                                    let err_str = e.to_string().to_lowercase();
643                                    if err_str.contains("409") || err_str.contains("conflict") {
644                                        CamelError::ProcessorError(format!(
645                                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
646                                            container_name_ref
647                                        ))
648                                    } else {
649                                        CamelError::ProcessorError(format!(
650                                            "Failed to create container: {}",
651                                            e
652                                        ))
653                                    }
654                                })?;
655
656                            Ok(create_response.id)
657                        },
658                        move |container_id| async move {
659                            docker_start
660                                .start_container::<String>(&container_id, None)
661                                .await
662                                .map_err(|e| {
663                                    CamelError::ProcessorError(format!(
664                                        "Failed to start container: {}",
665                                        e
666                                    ))
667                                })
668                        },
669                        move |container_id| async move {
670                            docker_remove
671                                .remove_container(&container_id, None)
672                                .await
673                                .map_err(|e| {
674                                    CamelError::ProcessorError(format!(
675                                        "Failed to remove container after start failure: {}",
676                                        e
677                                    ))
678                                })
679                        },
680                    )
681                    .await?;
682
683                    track_container(container_id.clone());
684
685                    exchange
686                        .input
687                        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
688                    exchange.input.set_header(
689                        HEADER_ACTION_RESULT,
690                        serde_json::Value::String("success".to_string()),
691                    );
692                }
693                ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
694                    // Lifecycle operations require a container ID
695                    let container_id = exchange
696                        .input
697                        .header(HEADER_CONTAINER_ID)
698                        .and_then(|v| v.as_str().map(|s| s.to_string()))
699                        .ok_or_else(|| {
700                            CamelError::ProcessorError(format!(
701                                "{} header is required for {} operation",
702                                HEADER_CONTAINER_ID, operation_name
703                            ))
704                        })?;
705
706                    match operation {
707                        ProducerOperation::Start => {
708                            docker
709                                .start_container::<String>(&container_id, None)
710                                .await
711                                .map_err(|e| {
712                                    CamelError::ProcessorError(format!(
713                                        "Failed to start container: {}",
714                                        e
715                                    ))
716                                })?;
717                        }
718                        ProducerOperation::Stop => {
719                            docker
720                                .stop_container(&container_id, None)
721                                .await
722                                .map_err(|e| {
723                                    CamelError::ProcessorError(format!(
724                                        "Failed to stop container: {}",
725                                        e
726                                    ))
727                                })?;
728                        }
729                        ProducerOperation::Remove => {
730                            docker
731                                .remove_container(&container_id, None)
732                                .await
733                                .map_err(|e| {
734                                    CamelError::ProcessorError(format!(
735                                        "Failed to remove container: {}",
736                                        e
737                                    ))
738                                })?;
739                            untrack_container(&container_id);
740                        }
741                        _ => {}
742                    }
743
744                    // Set success result
745                    exchange.input.set_header(
746                        HEADER_ACTION_RESULT,
747                        serde_json::Value::String("success".to_string()),
748                    );
749                }
750            }
751
752            Ok(exchange)
753        })
754    }
755}
756
757/// Consumer for receiving Docker container events or logs.
758///
759/// This consumer subscribes to Docker events or container logs and forwards them
760/// to the route as exchanges. It implements automatic reconnection on connection failures.
761pub struct ContainerConsumer {
762    config: ContainerConfig,
763}
764
765#[async_trait]
766impl Consumer for ContainerConsumer {
767    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
768        match self.config.operation.as_str() {
769            "events" => self.start_events_consumer(context).await,
770            "logs" => self.start_logs_consumer(context).await,
771            _ => Err(CamelError::EndpointCreationFailed(format!(
772                "Consumer only supports 'events' or 'logs' operations, got '{}'",
773                self.config.operation
774            ))),
775        }
776    }
777
778    async fn stop(&mut self) -> Result<(), CamelError> {
779        Ok(())
780    }
781
782    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
783        camel_component::ConcurrencyModel::Concurrent { max: None }
784    }
785}
786
787impl ContainerConsumer {
788    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
789        use futures::StreamExt;
790
791        loop {
792            if context.is_cancelled() {
793                tracing::info!("Container events consumer shutting down");
794                return Ok(());
795            }
796
797            let docker = match self.config.connect_docker().await {
798                Ok(d) => d,
799                Err(e) => {
800                    tracing::error!(
801                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
802                        e
803                    );
804                    tokio::select! {
805                        _ = context.cancelled() => {
806                            tracing::info!("Container events consumer shutting down");
807                            return Ok(());
808                        }
809                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
810                    }
811                    continue;
812                }
813            };
814
815            let mut event_stream = docker.events::<String>(None);
816
817            loop {
818                tokio::select! {
819                    _ = context.cancelled() => {
820                        tracing::info!("Container events consumer shutting down");
821                        return Ok(());
822                    }
823
824                    msg = event_stream.next() => {
825                        match msg {
826                            Some(Ok(event)) => {
827                                let formatted = format_docker_event(&event);
828                                let message = Message::new(Body::Text(formatted));
829                                let exchange = Exchange::new(message);
830
831                                if let Err(e) = context.send(exchange).await {
832                                    tracing::error!("Failed to send exchange: {:?}", e);
833                                    break;
834                                }
835                            }
836                            Some(Err(e)) => {
837                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
838                                break;
839                            }
840                            None => {
841                                tracing::info!("Docker event stream ended. Reconnecting...");
842                                break;
843                            }
844                        }
845                    }
846                }
847            }
848
849            tokio::select! {
850                _ = context.cancelled() => {
851                    tracing::info!("Container events consumer shutting down");
852                    return Ok(());
853                }
854                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
855            }
856        }
857    }
858
859    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
860        use futures::StreamExt;
861
862        let container_id = self.config.container_id.clone().ok_or_else(|| {
863            CamelError::EndpointCreationFailed(
864                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
865                    .to_string(),
866            )
867        })?;
868
869        loop {
870            if context.is_cancelled() {
871                tracing::info!("Container logs consumer shutting down");
872                return Ok(());
873            }
874
875            let docker = match self.config.connect_docker().await {
876                Ok(d) => d,
877                Err(e) => {
878                    tracing::error!(
879                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
880                        e
881                    );
882                    tokio::select! {
883                        _ = context.cancelled() => {
884                            tracing::info!("Container logs consumer shutting down");
885                            return Ok(());
886                        }
887                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
888                    }
889                    continue;
890                }
891            };
892
893            let tail = self
894                .config
895                .tail
896                .clone()
897                .unwrap_or_else(|| "all".to_string());
898
899            let options = bollard::container::LogsOptions::<String> {
900                follow: self.config.follow,
901                stdout: true,
902                stderr: true,
903                timestamps: self.config.timestamps,
904                tail,
905                ..Default::default()
906            };
907
908            let mut log_stream = docker.logs(&container_id, Some(options));
909            let container_id_header = container_id.clone();
910
911            loop {
912                tokio::select! {
913                    _ = context.cancelled() => {
914                        tracing::info!("Container logs consumer shutting down");
915                        return Ok(());
916                    }
917
918                    msg = log_stream.next() => {
919                        match msg {
920                            Some(Ok(log_output)) => {
921                                let (stream_type, content) = match log_output {
922                                    bollard::container::LogOutput::StdOut { message } => {
923                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
924                                    }
925                                    bollard::container::LogOutput::StdErr { message } => {
926                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
927                                    }
928                                    bollard::container::LogOutput::Console { message } => {
929                                        ("console", String::from_utf8_lossy(&message).into_owned())
930                                    }
931                                    bollard::container::LogOutput::StdIn { message } => {
932                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
933                                    }
934                                };
935
936                                let content = content.trim_end();
937                                if content.is_empty() {
938                                    continue;
939                                }
940
941                                let mut message = Message::new(Body::Text(content.to_string()));
942                                message.set_header(
943                                    HEADER_CONTAINER_ID,
944                                    serde_json::Value::String(container_id_header.clone()),
945                                );
946                                message.set_header(
947                                    HEADER_LOG_STREAM,
948                                    serde_json::Value::String(stream_type.to_string()),
949                                );
950
951                                if self.config.timestamps
952                                    && let Some(ts) = extract_timestamp(content) {
953                                        message.set_header(
954                                            HEADER_LOG_TIMESTAMP,
955                                            serde_json::Value::String(ts),
956                                        );
957                                    }
958
959                                let exchange = Exchange::new(message);
960
961                                if let Err(e) = context.send(exchange).await {
962                                    tracing::error!("Failed to send log exchange: {:?}", e);
963                                    break;
964                                }
965                            }
966                            Some(Err(e)) => {
967                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
968                                break;
969                            }
970                            None => {
971                                if self.config.follow {
972                                    tracing::info!("Docker log stream ended. Reconnecting...");
973                                    break;
974                                } else {
975                                    tracing::info!("Container logs consumer finished (follow=false)");
976                                    return Ok(());
977                                }
978                            }
979                        }
980                    }
981                }
982            }
983
984            tokio::select! {
985                _ = context.cancelled() => {
986                    tracing::info!("Container logs consumer shutting down");
987                    return Ok(());
988                }
989                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
990            }
991        }
992    }
993}
994
995fn extract_timestamp(log_line: &str) -> Option<String> {
996    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
997    if parts.len() > 1 && parts[0].contains('T') {
998        Some(parts[0].to_string())
999    } else {
1000        None
1001    }
1002}
1003
1004/// Component for creating container endpoints.
1005///
1006/// This component handles URIs with the "container" scheme and creates
1007/// appropriate producer and consumer endpoints for Docker operations.
1008///
1009/// Containers created via `run` operation are tracked globally and can be
1010/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1011pub struct ContainerComponent;
1012
1013impl ContainerComponent {
1014    /// Creates a new container component instance.
1015    pub fn new() -> Self {
1016        Self
1017    }
1018}
1019
1020impl Default for ContainerComponent {
1021    fn default() -> Self {
1022        Self::new()
1023    }
1024}
1025
1026impl Component for ContainerComponent {
1027    fn scheme(&self) -> &str {
1028        "container"
1029    }
1030
1031    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1032        let config = ContainerConfig::from_uri(uri)?;
1033        Ok(Box::new(ContainerEndpoint {
1034            uri: uri.to_string(),
1035            config,
1036        }))
1037    }
1038}
1039
1040/// Endpoint for container operations.
1041///
1042/// This endpoint creates producers for executing container operations
1043/// and consumers for receiving container events.
1044pub struct ContainerEndpoint {
1045    uri: String,
1046    config: ContainerConfig,
1047}
1048
1049impl Endpoint for ContainerEndpoint {
1050    fn uri(&self) -> &str {
1051        &self.uri
1052    }
1053
1054    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1055        Ok(Box::new(ContainerConsumer {
1056            config: self.config.clone(),
1057        }))
1058    }
1059
1060    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1061        let docker = self.config.connect_docker_client()?;
1062        Ok(BoxProcessor::new(ContainerProducer {
1063            config: self.config.clone(),
1064            docker,
1065        }))
1066    }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    #[test]
1074    fn test_container_config() {
1075        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1076        assert_eq!(config.operation, "run");
1077        assert_eq!(config.image.as_deref(), Some("alpine"));
1078        assert_eq!(config.host.as_deref(), Some("unix:///var/run/docker.sock"));
1079    }
1080
1081    #[test]
1082    fn test_container_config_parses_name() {
1083        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1084        assert_eq!(config.name.as_deref(), Some("my-container"));
1085    }
1086
1087    #[test]
1088    fn test_parse_producer_operation_known() {
1089        assert_eq!(
1090            parse_producer_operation("list").unwrap(),
1091            ProducerOperation::List
1092        );
1093        assert_eq!(
1094            parse_producer_operation("run").unwrap(),
1095            ProducerOperation::Run
1096        );
1097        assert_eq!(
1098            parse_producer_operation("start").unwrap(),
1099            ProducerOperation::Start
1100        );
1101        assert_eq!(
1102            parse_producer_operation("stop").unwrap(),
1103            ProducerOperation::Stop
1104        );
1105        assert_eq!(
1106            parse_producer_operation("remove").unwrap(),
1107            ProducerOperation::Remove
1108        );
1109    }
1110
1111    #[test]
1112    fn test_parse_producer_operation_unknown() {
1113        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1114        match err {
1115            CamelError::ProcessorError(msg) => {
1116                assert!(
1117                    msg.contains("Unknown container operation"),
1118                    "Unexpected error message: {}",
1119                    msg
1120                );
1121            }
1122            _ => panic!("Expected ProcessorError for unknown operation"),
1123        }
1124    }
1125
1126    #[test]
1127    fn test_resolve_container_name_header_overrides_config() {
1128        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1129        let mut exchange = Exchange::new(Message::new(""));
1130        exchange.input.set_header(
1131            HEADER_CONTAINER_NAME,
1132            serde_json::Value::String("header-name".to_string()),
1133        );
1134
1135        let resolved = resolve_container_name(&exchange, &config);
1136        assert_eq!(resolved.as_deref(), Some("header-name"));
1137    }
1138
1139    #[test]
1140    fn test_container_config_rejects_tcp_host() {
1141        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1142        let err = config.connect_docker_client().unwrap_err();
1143        match err {
1144            CamelError::ProcessorError(msg) => {
1145                assert!(
1146                    msg.to_lowercase().contains("tcp"),
1147                    "Expected TCP scheme error, got: {}",
1148                    msg
1149                );
1150            }
1151            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1152        }
1153    }
1154
1155    #[tokio::test]
1156    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1157        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1158        let remove_called_clone = remove_called.clone();
1159
1160        let result = run_container_with_cleanup(
1161            || async { Ok("container-123".to_string()) },
1162            |_id| async move {
1163                Err(CamelError::ProcessorError(
1164                    "Failed to start container".to_string(),
1165                ))
1166            },
1167            move |_id| {
1168                let remove_called_inner = remove_called_clone.clone();
1169                async move {
1170                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1171                    Ok(())
1172                }
1173            },
1174        )
1175        .await;
1176
1177        assert!(result.is_err(), "Expected start failure to bubble up");
1178        assert!(
1179            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1180            "Expected cleanup to remove container"
1181        );
1182    }
1183
1184    #[test]
1185    fn test_container_component_creates_endpoint() {
1186        let component = ContainerComponent::new();
1187        assert_eq!(component.scheme(), "container");
1188        let endpoint = component
1189            .create_endpoint("container:run?image=alpine")
1190            .unwrap();
1191        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1192    }
1193
1194    #[test]
1195    fn test_container_config_parses_ports() {
1196        let config =
1197            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1198        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1199    }
1200
1201    #[test]
1202    fn test_container_config_parses_env() {
1203        let config =
1204            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1205        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1206    }
1207
1208    #[test]
1209    fn test_container_config_parses_logs_options() {
1210        let config = ContainerConfig::from_uri(
1211            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1212        )
1213        .unwrap();
1214        assert_eq!(config.operation, "logs");
1215        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1216        assert!(config.follow);
1217        assert!(config.timestamps);
1218        assert_eq!(config.tail.as_deref(), Some("100"));
1219    }
1220
1221    #[test]
1222    fn test_container_config_logs_defaults() {
1223        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1224        assert!(config.follow); // default: true
1225        assert!(!config.timestamps); // default: false
1226        assert!(config.tail.is_none()); // default: None (all)
1227    }
1228
1229    #[test]
1230    fn test_parse_ports_single() {
1231        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1232        let (exposed, bindings) = config.parse_ports().unwrap();
1233
1234        assert!(exposed.contains_key("80/tcp"));
1235        assert!(bindings.contains_key("80/tcp"));
1236
1237        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1238        assert_eq!(binding.len(), 1);
1239        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1240    }
1241
1242    #[test]
1243    fn test_parse_ports_multiple() {
1244        let config =
1245            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1246        let (exposed, bindings) = config.parse_ports().unwrap();
1247
1248        assert!(exposed.contains_key("80/tcp"));
1249        assert!(exposed.contains_key("443/tcp"));
1250        assert_eq!(bindings.len(), 2);
1251    }
1252
1253    #[test]
1254    fn test_parse_ports_with_protocol() {
1255        let config =
1256            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1257                .unwrap();
1258        let (exposed, bindings) = config.parse_ports().unwrap();
1259
1260        assert!(exposed.contains_key("80/tcp"));
1261        assert!(exposed.contains_key("53/udp"));
1262    }
1263
1264    #[test]
1265    fn test_parse_ports_none() {
1266        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1267        assert!(config.parse_ports().is_none());
1268    }
1269
1270    #[test]
1271    fn test_parse_env_single() {
1272        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1273        let env = config.parse_env().unwrap();
1274
1275        assert_eq!(env.len(), 1);
1276        assert_eq!(env[0], "FOO=bar");
1277    }
1278
1279    #[test]
1280    fn test_parse_env_multiple() {
1281        let config =
1282            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1283                .unwrap();
1284        let env = config.parse_env().unwrap();
1285
1286        assert_eq!(env.len(), 3);
1287        assert!(env.contains(&"FOO=bar".to_string()));
1288        assert!(env.contains(&"BAZ=qux".to_string()));
1289        assert!(env.contains(&"NUM=123".to_string()));
1290    }
1291
1292    #[test]
1293    fn test_parse_env_none() {
1294        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1295        assert!(config.parse_env().is_none());
1296    }
1297
1298    mod test_helpers {
1299        use async_trait::async_trait;
1300        use camel_api::CamelError;
1301
1302        /// A no-op route controller for testing purposes.
1303        pub struct NullRouteController;
1304
1305        #[async_trait]
1306        impl camel_api::RouteController for NullRouteController {
1307            async fn start_route(&mut self, _: &str) -> Result<(), CamelError> {
1308                Ok(())
1309            }
1310            async fn stop_route(&mut self, _: &str) -> Result<(), CamelError> {
1311                Ok(())
1312            }
1313            async fn restart_route(&mut self, _: &str) -> Result<(), CamelError> {
1314                Ok(())
1315            }
1316            async fn suspend_route(&mut self, _: &str) -> Result<(), CamelError> {
1317                Ok(())
1318            }
1319            async fn resume_route(&mut self, _: &str) -> Result<(), CamelError> {
1320                Ok(())
1321            }
1322            fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1323                None
1324            }
1325            async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1326                Ok(())
1327            }
1328            async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1329                Ok(())
1330            }
1331        }
1332    }
1333
1334    use camel_api::Message;
1335    use std::sync::Arc;
1336    use test_helpers::NullRouteController;
1337    use tokio::sync::Mutex;
1338
1339    #[tokio::test]
1340    async fn test_container_producer_resolves_operation_from_header() {
1341        let component = ContainerComponent::new();
1342        let endpoint = component.create_endpoint("container:run").unwrap();
1343
1344        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1345        let mut producer = endpoint.create_producer(&ctx).unwrap();
1346
1347        let mut exchange = Exchange::new(Message::new(""));
1348        exchange
1349            .input
1350            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1351
1352        use tower::ServiceExt;
1353        let result = producer
1354            .ready()
1355            .await
1356            .unwrap()
1357            .call(exchange)
1358            .await
1359            .unwrap();
1360
1361        // For list operation, the result header should be on the input message
1362        assert_eq!(
1363            result
1364                .input
1365                .header(HEADER_ACTION_RESULT)
1366                .map(|v| v.as_str().unwrap()),
1367            Some("success")
1368        );
1369    }
1370
1371    #[tokio::test]
1372    async fn test_container_producer_connection_error_on_invalid_host() {
1373        // Test that an invalid host (nonexistent socket) results in a connection error
1374        let component = ContainerComponent::new();
1375        let endpoint = component
1376            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1377            .unwrap();
1378
1379        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1380        let result = endpoint.create_producer(&ctx);
1381
1382        // The producer should return an error because it cannot connect to the invalid socket
1383        assert!(
1384            result.is_err(),
1385            "Expected error when connecting to invalid host"
1386        );
1387        let err = result.unwrap_err();
1388        match &err {
1389            CamelError::ProcessorError(msg) => {
1390                assert!(
1391                    msg.to_lowercase().contains("connection")
1392                        || msg.to_lowercase().contains("connect")
1393                        || msg.to_lowercase().contains("socket")
1394                        || msg.contains("docker"),
1395                    "Error message should indicate connection failure, got: {}",
1396                    msg
1397                );
1398            }
1399            _ => panic!("Expected ProcessorError, got: {:?}", err),
1400        }
1401    }
1402
1403    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
1404    #[tokio::test]
1405    async fn test_container_producer_lifecycle_operations_missing_id() {
1406        // Try to connect to Docker - if fails, return early
1407        let docker = match Docker::connect_with_unix_defaults() {
1408            Ok(d) => d,
1409            Err(_) => {
1410                eprintln!("Skipping test: Could not connect to Docker daemon");
1411                return;
1412            }
1413        };
1414
1415        if docker.ping().await.is_err() {
1416            eprintln!("Skipping test: Docker daemon not responding to ping");
1417            return;
1418        }
1419
1420        let component = ContainerComponent::new();
1421        let endpoint = component.create_endpoint("container:start").unwrap();
1422        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1423        let mut producer = endpoint.create_producer(&ctx).unwrap();
1424
1425        // Test each lifecycle operation without CamelContainerId header
1426        for operation in ["start", "stop", "remove"] {
1427            let mut exchange = Exchange::new(Message::new(""));
1428            exchange.input.set_header(
1429                HEADER_ACTION,
1430                serde_json::Value::String(operation.to_string()),
1431            );
1432            // Deliberately NOT setting CamelContainerId header
1433
1434            use tower::ServiceExt;
1435            let result = producer.ready().await.unwrap().call(exchange).await;
1436
1437            assert!(
1438                result.is_err(),
1439                "Expected error for {} operation without CamelContainerId",
1440                operation
1441            );
1442            let err = result.unwrap_err();
1443            match &err {
1444                CamelError::ProcessorError(msg) => {
1445                    assert!(
1446                        msg.contains(HEADER_CONTAINER_ID),
1447                        "Error message should mention {}, got: {}",
1448                        HEADER_CONTAINER_ID,
1449                        msg
1450                    );
1451                }
1452                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1453            }
1454        }
1455    }
1456
1457    /// Test that stop operation returns an error for a nonexistent container.
1458    #[tokio::test]
1459    async fn test_container_producer_stop_nonexistent() {
1460        // Try to connect to Docker - if fails, return early
1461        let docker = match Docker::connect_with_unix_defaults() {
1462            Ok(d) => d,
1463            Err(_) => {
1464                eprintln!("Skipping test: Could not connect to Docker daemon");
1465                return;
1466            }
1467        };
1468
1469        if docker.ping().await.is_err() {
1470            eprintln!("Skipping test: Docker daemon not responding to ping");
1471            return;
1472        }
1473
1474        let component = ContainerComponent::new();
1475        let endpoint = component.create_endpoint("container:stop").unwrap();
1476        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1477        let mut producer = endpoint.create_producer(&ctx).unwrap();
1478
1479        let mut exchange = Exchange::new(Message::new(""));
1480        exchange
1481            .input
1482            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1483        exchange.input.set_header(
1484            HEADER_CONTAINER_ID,
1485            serde_json::Value::String("nonexistent-container-123".into()),
1486        );
1487
1488        use tower::ServiceExt;
1489        let result = producer.ready().await.unwrap().call(exchange).await;
1490
1491        assert!(
1492            result.is_err(),
1493            "Expected error when stopping nonexistent container"
1494        );
1495        let err = result.unwrap_err();
1496        match &err {
1497            CamelError::ProcessorError(msg) => {
1498                // Docker API returns 404 with "No such container" message
1499                assert!(
1500                    msg.to_lowercase().contains("no such container")
1501                        || msg.to_lowercase().contains("not found")
1502                        || msg.contains("404"),
1503                    "Error message should indicate container not found, got: {}",
1504                    msg
1505                );
1506            }
1507            _ => panic!("Expected ProcessorError, got: {:?}", err),
1508        }
1509    }
1510
1511    /// Test that run operation returns an error when no image is provided.
1512    #[tokio::test]
1513    async fn test_container_producer_run_missing_image() {
1514        // Try to connect to Docker - if fails, return early
1515        let docker = match Docker::connect_with_unix_defaults() {
1516            Ok(d) => d,
1517            Err(_) => {
1518                eprintln!("Skipping test: Could not connect to Docker daemon");
1519                return;
1520            }
1521        };
1522
1523        if docker.ping().await.is_err() {
1524            eprintln!("Skipping test: Docker daemon not responding to ping");
1525            return;
1526        }
1527
1528        // Create producer without an image in the URI
1529        let component = ContainerComponent::new();
1530        let endpoint = component.create_endpoint("container:run").unwrap();
1531        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1532        let mut producer = endpoint.create_producer(&ctx).unwrap();
1533
1534        let mut exchange = Exchange::new(Message::new(""));
1535        exchange
1536            .input
1537            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1538        // Deliberately NOT setting CamelContainerImage header
1539
1540        use tower::ServiceExt;
1541        let result = producer.ready().await.unwrap().call(exchange).await;
1542
1543        assert!(
1544            result.is_err(),
1545            "Expected error for run operation without image"
1546        );
1547        let err = result.unwrap_err();
1548        match &err {
1549            CamelError::ProcessorError(msg) => {
1550                assert!(
1551                    msg.to_lowercase().contains("image"),
1552                    "Error message should mention 'image', got: {}",
1553                    msg
1554                );
1555            }
1556            _ => panic!("Expected ProcessorError, got: {:?}", err),
1557        }
1558    }
1559
1560    /// Test that run operation uses image from header.
1561    #[tokio::test]
1562    async fn test_container_producer_run_image_from_header() {
1563        // Try to connect to Docker - if fails, return early
1564        let docker = match Docker::connect_with_unix_defaults() {
1565            Ok(d) => d,
1566            Err(_) => {
1567                eprintln!("Skipping test: Could not connect to Docker daemon");
1568                return;
1569            }
1570        };
1571
1572        if docker.ping().await.is_err() {
1573            eprintln!("Skipping test: Docker daemon not responding to ping");
1574            return;
1575        }
1576
1577        // Create producer without an image in the URI
1578        let component = ContainerComponent::new();
1579        let endpoint = component.create_endpoint("container:run").unwrap();
1580        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1581        let mut producer = endpoint.create_producer(&ctx).unwrap();
1582
1583        let mut exchange = Exchange::new(Message::new(""));
1584        exchange
1585            .input
1586            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1587        // Set a non-existent image to test that the run operation attempts to use it
1588        exchange.input.set_header(
1589            HEADER_IMAGE,
1590            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1591        );
1592
1593        use tower::ServiceExt;
1594        let result = producer.ready().await.unwrap().call(exchange).await;
1595
1596        // The operation should fail because the image doesn't exist
1597        assert!(
1598            result.is_err(),
1599            "Expected error when running container with nonexistent image"
1600        );
1601        let err = result.unwrap_err();
1602        match &err {
1603            CamelError::ProcessorError(msg) => {
1604                // Docker API returns an error about the missing image
1605                assert!(
1606                    msg.to_lowercase().contains("no such image")
1607                        || msg.to_lowercase().contains("not found")
1608                        || msg.to_lowercase().contains("image")
1609                        || msg.to_lowercase().contains("pull")
1610                        || msg.contains("404"),
1611                    "Error message should indicate image issue, got: {}",
1612                    msg
1613                );
1614            }
1615            _ => panic!("Expected ProcessorError, got: {:?}", err),
1616        }
1617    }
1618
1619    /// Integration test: Actually run a container with alpine:latest.
1620    /// This test verifies the full flow: create → start → set headers.
1621    #[tokio::test]
1622    async fn test_container_producer_run_alpine_container() {
1623        let docker = match Docker::connect_with_unix_defaults() {
1624            Ok(d) => d,
1625            Err(_) => {
1626                eprintln!("Skipping test: Could not connect to Docker daemon");
1627                return;
1628            }
1629        };
1630
1631        if docker.ping().await.is_err() {
1632            eprintln!("Skipping test: Docker daemon not responding to ping");
1633            return;
1634        }
1635
1636        // Pull alpine:latest if not present
1637        let images = docker.list_images::<&str>(None).await.unwrap();
1638        let has_alpine = images
1639            .iter()
1640            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1641
1642        if !has_alpine {
1643            eprintln!("Pulling alpine:latest image...");
1644            let mut stream = docker.create_image(
1645                Some(bollard::image::CreateImageOptions {
1646                    from_image: "alpine:latest",
1647                    ..Default::default()
1648                }),
1649                None,
1650                None,
1651            );
1652
1653            use futures::StreamExt;
1654            while let Some(_item) = stream.next().await {
1655                // Wait for pull to complete
1656            }
1657            eprintln!("Image pulled successfully");
1658        }
1659
1660        // Create producer
1661        let component = ContainerComponent::new();
1662        let endpoint = component.create_endpoint("container:run").unwrap();
1663        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1664        let mut producer = endpoint.create_producer(&ctx).unwrap();
1665
1666        // Run container with unique name
1667        let timestamp = std::time::SystemTime::now()
1668            .duration_since(std::time::UNIX_EPOCH)
1669            .unwrap()
1670            .as_millis();
1671        let container_name = format!("test-rust-camel-{}", timestamp);
1672        let mut exchange = Exchange::new(Message::new(""));
1673        exchange.input.set_header(
1674            HEADER_IMAGE,
1675            serde_json::Value::String("alpine:latest".into()),
1676        );
1677        exchange.input.set_header(
1678            HEADER_CONTAINER_NAME,
1679            serde_json::Value::String(container_name.clone()),
1680        );
1681
1682        use tower::ServiceExt;
1683        let result = producer
1684            .ready()
1685            .await
1686            .unwrap()
1687            .call(exchange)
1688            .await
1689            .expect("Container run should succeed");
1690
1691        // Verify container ID was set
1692        let container_id = result
1693            .input
1694            .header(HEADER_CONTAINER_ID)
1695            .and_then(|v| v.as_str().map(|s| s.to_string()))
1696            .expect("Expected container ID header");
1697        assert!(!container_id.is_empty(), "Container ID should not be empty");
1698
1699        // Verify success header
1700        assert_eq!(
1701            result
1702                .input
1703                .header(HEADER_ACTION_RESULT)
1704                .and_then(|v| v.as_str()),
1705            Some("success")
1706        );
1707
1708        // Verify container exists in Docker
1709        let inspect = docker
1710            .inspect_container(&container_id, None)
1711            .await
1712            .expect("Container should exist");
1713        assert_eq!(
1714            inspect.id.as_ref().map(|s| s.as_str()),
1715            Some(container_id.as_str())
1716        );
1717
1718        // Cleanup: remove container
1719        docker
1720            .remove_container(
1721                &container_id,
1722                Some(bollard::container::RemoveContainerOptions {
1723                    force: true,
1724                    ..Default::default()
1725                }),
1726            )
1727            .await
1728            .ok();
1729
1730        eprintln!("✅ Container {} created and cleaned up", container_id);
1731    }
1732
1733    /// Test that consumer returns an error for unsupported operations.
1734    #[tokio::test]
1735    async fn test_container_consumer_unsupported_operation() {
1736        use tokio::sync::mpsc;
1737
1738        let component = ContainerComponent::new();
1739        let endpoint = component.create_endpoint("container:run").unwrap();
1740        let mut consumer = endpoint.create_consumer().unwrap();
1741
1742        // Create a minimal ConsumerContext
1743        let (tx, _rx) = mpsc::channel(16);
1744        let cancel_token = tokio_util::sync::CancellationToken::new();
1745        let context = ConsumerContext::new(tx, cancel_token);
1746
1747        let result = consumer.start(context).await;
1748
1749        // Should return error because "run" is not a supported consumer operation
1750        assert!(
1751            result.is_err(),
1752            "Expected error for unsupported consumer operation"
1753        );
1754        let err = result.unwrap_err();
1755        match &err {
1756            CamelError::EndpointCreationFailed(msg) => {
1757                assert!(
1758                    msg.contains("Consumer only supports 'events' or 'logs'"),
1759                    "Error message should mention events or logs support, got: {}",
1760                    msg
1761                );
1762            }
1763            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1764        }
1765    }
1766
1767    #[test]
1768    fn test_container_consumer_concurrency_model_is_concurrent() {
1769        let consumer = ContainerConsumer {
1770            config: ContainerConfig::from_uri("container:events").unwrap(),
1771        };
1772
1773        assert_eq!(
1774            consumer.concurrency_model(),
1775            camel_component::ConcurrencyModel::Concurrent { max: None }
1776        );
1777    }
1778
1779    /// Test that consumer gracefully shuts down when cancellation is requested.
1780    /// This test requires a running Docker daemon. If Docker is not available, the test
1781    /// will be ignored.
1782    #[tokio::test]
1783    async fn test_container_consumer_cancellation() {
1784        use std::sync::atomic::{AtomicBool, Ordering};
1785        use tokio::sync::mpsc;
1786
1787        // Try to connect to Docker - if fails, return early
1788        let docker = match Docker::connect_with_unix_defaults() {
1789            Ok(d) => d,
1790            Err(_) => {
1791                eprintln!("Skipping test: Could not connect to Docker daemon");
1792                return;
1793            }
1794        };
1795
1796        if docker.ping().await.is_err() {
1797            eprintln!("Skipping test: Docker daemon not responding to ping");
1798            return;
1799        }
1800
1801        let component = ContainerComponent::new();
1802        let endpoint = component.create_endpoint("container:events").unwrap();
1803        let mut consumer = endpoint.create_consumer().unwrap();
1804
1805        // Create a ConsumerContext
1806        let (tx, _rx) = mpsc::channel(16);
1807        let cancel_token = tokio_util::sync::CancellationToken::new();
1808        let context = ConsumerContext::new(tx, cancel_token.clone());
1809
1810        // Track if the consumer task has completed
1811        let completed = Arc::new(AtomicBool::new(false));
1812        let completed_clone = completed.clone();
1813
1814        // Spawn consumer in background
1815        let handle = tokio::spawn(async move {
1816            let result = consumer.start(context).await;
1817            // Mark as completed when done
1818            completed_clone.store(true, Ordering::SeqCst);
1819            result
1820        });
1821
1822        // Wait a bit for the consumer to start
1823        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1824
1825        // Consumer should still be running (not completed)
1826        assert!(
1827            !completed.load(Ordering::SeqCst),
1828            "Consumer should still be running before cancellation"
1829        );
1830
1831        // Request cancellation
1832        cancel_token.cancel();
1833
1834        // Wait for the task to complete (with timeout)
1835        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1836
1837        // Task should have completed (not timed out)
1838        assert!(
1839            result.is_ok(),
1840            "Consumer should gracefully shut down after cancellation"
1841        );
1842
1843        // Verify the consumer completed
1844        assert!(
1845            completed.load(Ordering::SeqCst),
1846            "Consumer should have completed after cancellation"
1847        );
1848    }
1849
1850    /// Integration test for listing containers.
1851    /// This test requires a running Docker daemon. If Docker is not available, the test
1852    /// will return early and be effectively ignored.
1853    #[tokio::test]
1854    async fn test_container_producer_list_containers() {
1855        // Try to connect to Docker using default config
1856        // If ping fails, return early (effectively ignoring this test)
1857        let docker = match Docker::connect_with_unix_defaults() {
1858            Ok(d) => d,
1859            Err(_) => {
1860                eprintln!("Skipping test: Could not connect to Docker daemon");
1861                return;
1862            }
1863        };
1864
1865        if docker.ping().await.is_err() {
1866            eprintln!("Skipping test: Docker daemon not responding to ping");
1867            return;
1868        }
1869
1870        // Create producer with list operation
1871        let component = ContainerComponent::new();
1872        let endpoint = component.create_endpoint("container:list").unwrap();
1873
1874        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1875        let mut producer = endpoint.create_producer(&ctx).unwrap();
1876
1877        // Create exchange with list operation in header
1878        let mut exchange = Exchange::new(Message::new(""));
1879        exchange
1880            .input
1881            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1882
1883        // Call the producer
1884        use tower::ServiceExt;
1885        let result = producer
1886            .ready()
1887            .await
1888            .unwrap()
1889            .call(exchange)
1890            .await
1891            .expect("Producer should succeed when Docker is available");
1892
1893        // Assert that the input exchange body is a JSON array
1894        // (because list_containers should put the JSON result in the body)
1895        match &result.input.body {
1896            camel_api::Body::Json(json_value) => {
1897                assert!(
1898                    json_value.is_array(),
1899                    "Expected input body to be a JSON array, got: {:?}",
1900                    json_value
1901                );
1902            }
1903            other => panic!("Expected Body::Json with array, got: {:?}", other),
1904        }
1905    }
1906}