Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20/// Global tracker for containers created by this component.
21/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
22static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25/// Registers a container ID for tracking (will be cleaned up on shutdown).
26fn track_container(id: String) {
27    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28        tracker.insert(id);
29    }
30}
31
32/// Removes a container ID from tracking (when it's been removed naturally).
33fn untrack_container(id: &str) {
34    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35        tracker.remove(id);
36    }
37}
38
39/// Cleans up all tracked containers. Call this on application shutdown.
40pub async fn cleanup_tracked_containers() {
41    let ids: Vec<String> = {
42        match CONTAINER_TRACKER.lock() {
43            Ok(tracker) => tracker.iter().cloned().collect(),
44            Err(_) => return,
45        }
46    };
47
48    if ids.is_empty() {
49        return;
50    }
51
52    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54    let docker = match Docker::connect_with_local_defaults() {
55        Ok(d) => d,
56        Err(e) => {
57            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58            return;
59        }
60    };
61
62    for id in ids {
63        match docker
64            .remove_container(
65                &id,
66                Some(bollard::container::RemoveContainerOptions {
67                    force: true,
68                    ..Default::default()
69                }),
70            )
71            .await
72        {
73            Ok(_) => {
74                tracing::debug!("Cleaned up container {}", id);
75                untrack_container(&id);
76            }
77            Err(e) => {
78                tracing::warn!("Failed to cleanup container {}: {}", id, e);
79            }
80        }
81    }
82}
83
84// Header constants for container operations
85
86/// Timeout (seconds) for connecting to the Docker daemon.
87const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
90pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92/// Header key for specifying the container image to use for "run" operations.
93pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95/// Header key for specifying or receiving the container ID.
96pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98/// Header key for the log stream type (stdout or stderr).
99pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101/// Header key for the log timestamp.
102pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104/// Header key for specifying the container name for "run" operations.
105pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107/// Header key for the result status of a container operation (e.g., "success").
108pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110// ---------------------------------------------------------------------------
111// ContainerGlobalConfig
112// ---------------------------------------------------------------------------
113
114/// Global configuration for Container component.
115/// Plain Rust, no serde, with Default impl and builder methods.
116/// These are the fallback defaults when URI params are not set.
117#[derive(Debug, Clone, PartialEq)]
118pub struct ContainerGlobalConfig {
119    /// The Docker host URL (default: "unix:///var/run/docker.sock").
120    pub docker_host: String,
121}
122
123impl Default for ContainerGlobalConfig {
124    fn default() -> Self {
125        Self {
126            docker_host: "unix:///var/run/docker.sock".to_string(),
127        }
128    }
129}
130
131impl ContainerGlobalConfig {
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
137        self.docker_host = v.into();
138        self
139    }
140}
141
142// ---------------------------------------------------------------------------
143// ContainerConfig (endpoint configuration)
144// ---------------------------------------------------------------------------
145
146/// Configuration for the container component endpoint.
147///
148/// This struct holds the parsed URI configuration including the operation type,
149/// optional container image, and Docker host connection details.
150#[derive(Debug, Clone)]
151pub struct ContainerConfig {
152    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
153    pub operation: String,
154    /// The container image to use for "run" operations (can be overridden via header).
155    pub image: Option<String>,
156    /// The container name to use for "run" operations (can be overridden via header).
157    pub name: Option<String>,
158    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
159    pub host: Option<String>,
160    /// Command to run in the container (e.g., "sleep 30").
161    pub cmd: Option<String>,
162    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
163    pub ports: Option<String>,
164    /// Environment variables in format "KEY=value,KEY2=value2".
165    pub env: Option<String>,
166    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
167    pub network: Option<String>,
168    /// Container ID or name for logs consumer.
169    pub container_id: Option<String>,
170    /// Follow log output (default: true for consumer).
171    pub follow: bool,
172    /// Include timestamps in logs (default: false).
173    pub timestamps: bool,
174    /// Number of lines to show from the end of logs (default: all).
175    pub tail: Option<String>,
176    /// Automatically pull the image if not present (default: true).
177    pub auto_pull: bool,
178    /// Automatically remove the container when it exits (default: true).
179    pub auto_remove: bool,
180}
181
182impl ContainerConfig {
183    /// Parses a container URI into a `ContainerConfig`.
184    ///
185    /// # Arguments
186    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
187    ///
188    /// # Errors
189    /// Returns an error if the URI scheme is not "container".
190    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
191        let parts = parse_uri(uri)?;
192        if parts.scheme != "container" {
193            return Err(CamelError::InvalidUri(format!(
194                "expected scheme 'container', got '{}'",
195                parts.scheme
196            )));
197        }
198
199        let image = parts.params.get("image").cloned();
200        let name = parts.params.get("name").cloned();
201        let cmd = parts.params.get("cmd").cloned();
202        let ports = parts.params.get("ports").cloned();
203        let env = parts.params.get("env").cloned();
204        let network = parts.params.get("network").cloned();
205        let container_id = parts.params.get("containerId").cloned();
206        let follow = parts
207            .params
208            .get("follow")
209            .map(|v| v.eq_ignore_ascii_case("true"))
210            .unwrap_or(true);
211        let timestamps = parts
212            .params
213            .get("timestamps")
214            .map(|v| v.eq_ignore_ascii_case("true"))
215            .unwrap_or(false);
216        let tail = parts.params.get("tail").cloned();
217        let auto_pull = parts
218            .params
219            .get("autoPull")
220            .map(|v| v.eq_ignore_ascii_case("true"))
221            .unwrap_or(true);
222        let auto_remove = parts
223            .params
224            .get("autoRemove")
225            .map(|v| v.eq_ignore_ascii_case("true"))
226            .unwrap_or(true);
227        // host is only set from URI param; global config defaults are applied later
228        let host = parts.params.get("host").cloned();
229
230        Ok(Self {
231            operation: parts.path,
232            image,
233            name,
234            host,
235            cmd,
236            ports,
237            env,
238            network,
239            container_id,
240            follow,
241            timestamps,
242            tail,
243            auto_pull,
244            auto_remove,
245        })
246    }
247
248    /// Apply global config defaults to this endpoint config.
249    /// Only sets values that are currently `None`.
250    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
251        if self.host.is_none() {
252            self.host = Some(global.docker_host.clone());
253        }
254    }
255
256    fn docker_socket_path(&self) -> Result<&str, CamelError> {
257        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
258            "npipe:////./pipe/docker_engine"
259        } else {
260            "unix:///var/run/docker.sock"
261        });
262
263        if host.starts_with("unix://") || host.starts_with("npipe://") {
264            return Ok(host);
265        }
266
267        if host.contains("://") {
268            return Err(CamelError::ProcessorError(format!(
269                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
270                host
271            )));
272        }
273
274        Ok(host)
275    }
276
277    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
278        let socket_path = self.docker_socket_path()?;
279        Docker::connect_with_socket(
280            socket_path,
281            DOCKER_CONNECT_TIMEOUT_SECS,
282            bollard::API_DEFAULT_VERSION,
283        )
284        .map_err(|e| {
285            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
286        })
287    }
288
289    /// Connects to the Docker daemon using the configured host.
290    ///
291    /// This method establishes a Unix socket connection to Docker and verifies
292    /// the connection by sending a ping request.
293    ///
294    /// # Errors
295    /// Returns an error if the connection fails or the ping request fails.
296    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
297        let docker = self.connect_docker_client()?;
298        docker
299            .ping()
300            .await
301            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
302        Ok(docker)
303    }
304
305    #[allow(clippy::type_complexity)]
306    fn parse_ports(
307        &self,
308    ) -> Option<(
309        HashMap<String, HashMap<(), ()>>,
310        HashMap<String, Option<Vec<PortBinding>>>,
311    )> {
312        let ports_str = self.ports.as_ref()?;
313
314        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
315        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
316
317        for mapping in ports_str.split(',') {
318            let mapping = mapping.trim();
319            if mapping.is_empty() {
320                continue;
321            }
322
323            let (host_port, container_spec) = mapping.split_once(':')?;
324
325            let (container_port, protocol) = if container_spec.contains('/') {
326                let parts: Vec<&str> = container_spec.split('/').collect();
327                (parts[0], parts[1])
328            } else {
329                (container_spec, "tcp")
330            };
331
332            let container_key = format!("{}/{}", container_port, protocol);
333
334            exposed_ports.insert(container_key.clone(), HashMap::new());
335
336            port_bindings.insert(
337                container_key,
338                Some(vec![PortBinding {
339                    host_ip: None,
340                    host_port: Some(host_port.to_string()),
341                }]),
342            );
343        }
344
345        if exposed_ports.is_empty() {
346            None
347        } else {
348            Some((exposed_ports, port_bindings))
349        }
350    }
351
352    fn parse_env(&self) -> Option<Vec<String>> {
353        let env_str = self.env.as_ref()?;
354
355        let env_vars: Vec<String> = env_str
356            .split(',')
357            .map(|s| s.trim().to_string())
358            .filter(|s| !s.is_empty())
359            .collect();
360
361        if env_vars.is_empty() {
362            None
363        } else {
364            Some(env_vars)
365        }
366    }
367}
368
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370enum ProducerOperation {
371    List,
372    Run,
373    Start,
374    Stop,
375    Remove,
376}
377
378fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
379    match operation {
380        "list" => Ok(ProducerOperation::List),
381        "run" => Ok(ProducerOperation::Run),
382        "start" => Ok(ProducerOperation::Start),
383        "stop" => Ok(ProducerOperation::Stop),
384        "remove" => Ok(ProducerOperation::Remove),
385        _ => Err(CamelError::ProcessorError(format!(
386            "Unknown container operation: {}",
387            operation
388        ))),
389    }
390}
391
392fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
393    exchange
394        .input
395        .header(HEADER_CONTAINER_NAME)
396        .and_then(|v| v.as_str().map(|s| s.to_string()))
397        .or_else(|| config.name.clone())
398}
399
400async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
401    let images = docker
402        .list_images::<&str>(None)
403        .await
404        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
405
406    Ok(images.iter().any(|img| {
407        img.repo_tags
408            .iter()
409            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
410    }))
411}
412
413async fn pull_image_with_progress(
414    docker: &Docker,
415    image: &str,
416    timeout_secs: u64,
417) -> Result<(), CamelError> {
418    use futures::StreamExt;
419
420    tracing::info!("Pulling image: {}", image);
421
422    let mut stream = docker.create_image(
423        Some(bollard::image::CreateImageOptions {
424            from_image: image,
425            ..Default::default()
426        }),
427        None,
428        None,
429    );
430
431    let start = std::time::Instant::now();
432    let mut last_progress = std::time::Instant::now();
433
434    while let Some(item) = stream.next().await {
435        if start.elapsed().as_secs() > timeout_secs {
436            return Err(CamelError::ProcessorError(format!(
437                "Image pull timeout after {}s. Try manually: docker pull {}",
438                timeout_secs, image
439            )));
440        }
441
442        match item {
443            Ok(update) => {
444                // Log progress every 2 seconds
445                if last_progress.elapsed().as_secs() >= 2 {
446                    if let Some(status) = update.status {
447                        tracing::debug!("Pull progress: {}", status);
448                    }
449                    last_progress = std::time::Instant::now();
450                }
451            }
452            Err(e) => {
453                let err_str = e.to_string().to_lowercase();
454                if err_str.contains("unauthorized") || err_str.contains("401") {
455                    return Err(CamelError::ProcessorError(format!(
456                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
457                        image
458                    )));
459                }
460                if err_str.contains("not found") || err_str.contains("404") {
461                    return Err(CamelError::ProcessorError(format!(
462                        "Image '{}' not found in registry. Check the image name and tag",
463                        image
464                    )));
465                }
466                return Err(CamelError::ProcessorError(format!(
467                    "Failed to pull image '{}': {}",
468                    image, e
469                )));
470            }
471        }
472    }
473
474    tracing::info!("Successfully pulled image: {}", image);
475    Ok(())
476}
477
478async fn ensure_image_available(
479    docker: &Docker,
480    image: &str,
481    auto_pull: bool,
482    timeout_secs: u64,
483) -> Result<(), CamelError> {
484    if image_exists_locally(docker, image).await? {
485        tracing::debug!("Image '{}' already available locally", image);
486        return Ok(());
487    }
488
489    if !auto_pull {
490        return Err(CamelError::ProcessorError(format!(
491            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
492            image, image
493        )));
494    }
495
496    pull_image_with_progress(docker, image, timeout_secs).await
497}
498
499fn format_docker_event(event: &bollard::models::EventMessage) -> String {
500    let action = event.action.as_deref().unwrap_or("unknown");
501    let actor = event.actor.as_ref();
502
503    let container_name = actor
504        .and_then(|a| a.attributes.as_ref())
505        .and_then(|attrs| attrs.get("name"))
506        .map(|s| s.as_str())
507        .unwrap_or("unknown");
508
509    let image = actor
510        .and_then(|a| a.attributes.as_ref())
511        .and_then(|attrs| attrs.get("image"))
512        .map(|s| s.as_str())
513        .unwrap_or("");
514
515    let exit_code = actor
516        .and_then(|a| a.attributes.as_ref())
517        .and_then(|attrs| attrs.get("exitCode"))
518        .map(|s| s.as_str());
519
520    match action {
521        "create" => {
522            if image.is_empty() {
523                format!("[CREATE] Container {}", container_name)
524            } else {
525                format!("[CREATE] Container {} ({})", container_name, image)
526            }
527        }
528        "start" => format!("[START]  Container {}", container_name),
529        "die" => {
530            if let Some(code) = exit_code {
531                format!("[DIE]    Container {} (exit: {})", container_name, code)
532            } else {
533                format!("[DIE]    Container {}", container_name)
534            }
535        }
536        "destroy" => format!("[DESTROY] Container {}", container_name),
537        "stop" => format!("[STOP]   Container {}", container_name),
538        "pause" => format!("[PAUSE]  Container {}", container_name),
539        "unpause" => format!("[UNPAUSE] Container {}", container_name),
540        "restart" => format!("[RESTART] Container {}", container_name),
541        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
542    }
543}
544
545async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
546    create: CreateFn,
547    start: StartFn,
548    remove: RemoveFn,
549) -> Result<String, CamelError>
550where
551    CreateFn: FnOnce() -> CreateFut,
552    CreateFut: Future<Output = Result<String, CamelError>>,
553    StartFn: FnOnce(String) -> StartFut,
554    StartFut: Future<Output = Result<(), CamelError>>,
555    RemoveFn: FnOnce(String) -> RemoveFut,
556    RemoveFut: Future<Output = Result<(), CamelError>>,
557{
558    let container_id = create().await?;
559    if let Err(start_err) = start(container_id.clone()).await {
560        if let Err(remove_err) = remove(container_id.clone()).await {
561            return Err(CamelError::ProcessorError(format!(
562                "Failed to start container: {}. Cleanup failed: {}",
563                start_err, remove_err
564            )));
565        }
566        return Err(start_err);
567    }
568
569    Ok(container_id)
570}
571
572/// Producer for executing container operations.
573///
574/// This producer handles synchronous container operations like listing,
575/// creating, starting, stopping, and removing containers.
576#[derive(Clone)]
577pub struct ContainerProducer {
578    config: ContainerConfig,
579    docker: Docker,
580}
581
582impl Service<Exchange> for ContainerProducer {
583    type Response = Exchange;
584    type Error = CamelError;
585    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
586
587    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
588        Poll::Ready(Ok(()))
589    }
590
591    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
592        let config = self.config.clone();
593        let docker = self.docker.clone();
594        Box::pin(async move {
595            // Extract operation from header or use config
596            let operation_name = exchange
597                .input
598                .header(HEADER_ACTION)
599                .and_then(|v| v.as_str().map(|s| s.to_string()))
600                .unwrap_or_else(|| config.operation.clone());
601
602            let operation = parse_producer_operation(&operation_name)?;
603
604            // Execute operation
605            match operation {
606                ProducerOperation::List => {
607                    let containers = docker.list_containers::<String>(None).await.map_err(|e| {
608                        CamelError::ProcessorError(format!("Failed to list containers: {}", e))
609                    })?;
610
611                    let json_value = serde_json::to_value(&containers).map_err(|e| {
612                        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
613                    })?;
614
615                    exchange.input.body = Body::Json(json_value);
616                    exchange.input.set_header(
617                        HEADER_ACTION_RESULT,
618                        serde_json::Value::String("success".to_string()),
619                    );
620                }
621                ProducerOperation::Run => {
622                    // Run operation: create and start a container from an image
623                    let image = exchange
624                        .input
625                        .header(HEADER_IMAGE)
626                        .and_then(|v| v.as_str().map(|s| s.to_string()))
627                        .or(config.image.clone())
628                        .ok_or_else(|| {
629                            CamelError::ProcessorError(
630                                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
631                            )
632                        })?;
633
634                    // Ensure image is available (auto-pull if needed)
635                    let pull_timeout = 300; // 5 minutes default
636                    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
637                        .await
638                        .map_err(|e| {
639                            CamelError::ProcessorError(format!(
640                                "Image '{}' not available: {}",
641                                image, e
642                            ))
643                        })?;
644
645                    let container_name = resolve_container_name(&exchange, &config);
646                    let container_name_ref = container_name.as_deref().unwrap_or("");
647                    let cmd_parts: Option<Vec<String>> = config
648                        .cmd
649                        .as_ref()
650                        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
651                    let auto_remove = config.auto_remove;
652                    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
653                    let env_vars = config.parse_env();
654                    let network_mode = config.network.clone();
655
656                    let docker_create = docker.clone();
657                    let docker_start = docker.clone();
658                    let docker_remove = docker.clone();
659
660                    let container_id = run_container_with_cleanup(
661                        move || async move {
662                            let create_options = bollard::container::CreateContainerOptions {
663                                name: container_name_ref,
664                                ..Default::default()
665                            };
666                            let container_config = bollard::container::Config::<String> {
667                                image: Some(image.clone()),
668                                cmd: cmd_parts,
669                                env: env_vars,
670                                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
671                                host_config: Some(HostConfig {
672                                    auto_remove: Some(auto_remove),
673                                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
674                                    network_mode,
675                                    ..Default::default()
676                                }),
677                                ..Default::default()
678                            };
679
680                            let create_response = docker_create
681                                .create_container(Some(create_options), container_config)
682                                .await
683                                .map_err(|e| {
684                                    let err_str = e.to_string().to_lowercase();
685                                    if err_str.contains("409") || err_str.contains("conflict") {
686                                        CamelError::ProcessorError(format!(
687                                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
688                                            container_name_ref
689                                        ))
690                                    } else {
691                                        CamelError::ProcessorError(format!(
692                                            "Failed to create container: {}",
693                                            e
694                                        ))
695                                    }
696                                })?;
697
698                            Ok(create_response.id)
699                        },
700                        move |container_id| async move {
701                            docker_start
702                                .start_container::<String>(&container_id, None)
703                                .await
704                                .map_err(|e| {
705                                    CamelError::ProcessorError(format!(
706                                        "Failed to start container: {}",
707                                        e
708                                    ))
709                                })
710                        },
711                        move |container_id| async move {
712                            docker_remove
713                                .remove_container(&container_id, None)
714                                .await
715                                .map_err(|e| {
716                                    CamelError::ProcessorError(format!(
717                                        "Failed to remove container after start failure: {}",
718                                        e
719                                    ))
720                                })
721                        },
722                    )
723                    .await?;
724
725                    track_container(container_id.clone());
726
727                    exchange
728                        .input
729                        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
730                    exchange.input.set_header(
731                        HEADER_ACTION_RESULT,
732                        serde_json::Value::String("success".to_string()),
733                    );
734                }
735                ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
736                    // Lifecycle operations require a container ID
737                    let container_id = exchange
738                        .input
739                        .header(HEADER_CONTAINER_ID)
740                        .and_then(|v| v.as_str().map(|s| s.to_string()))
741                        .ok_or_else(|| {
742                            CamelError::ProcessorError(format!(
743                                "{} header is required for {} operation",
744                                HEADER_CONTAINER_ID, operation_name
745                            ))
746                        })?;
747
748                    match operation {
749                        ProducerOperation::Start => {
750                            docker
751                                .start_container::<String>(&container_id, None)
752                                .await
753                                .map_err(|e| {
754                                    CamelError::ProcessorError(format!(
755                                        "Failed to start container: {}",
756                                        e
757                                    ))
758                                })?;
759                        }
760                        ProducerOperation::Stop => {
761                            docker
762                                .stop_container(&container_id, None)
763                                .await
764                                .map_err(|e| {
765                                    CamelError::ProcessorError(format!(
766                                        "Failed to stop container: {}",
767                                        e
768                                    ))
769                                })?;
770                        }
771                        ProducerOperation::Remove => {
772                            docker
773                                .remove_container(&container_id, None)
774                                .await
775                                .map_err(|e| {
776                                    CamelError::ProcessorError(format!(
777                                        "Failed to remove container: {}",
778                                        e
779                                    ))
780                                })?;
781                            untrack_container(&container_id);
782                        }
783                        _ => {}
784                    }
785
786                    // Set success result
787                    exchange.input.set_header(
788                        HEADER_ACTION_RESULT,
789                        serde_json::Value::String("success".to_string()),
790                    );
791                }
792            }
793
794            Ok(exchange)
795        })
796    }
797}
798
799/// Consumer for receiving Docker container events or logs.
800///
801/// This consumer subscribes to Docker events or container logs and forwards them
802/// to the route as exchanges. It implements automatic reconnection on connection failures.
803pub struct ContainerConsumer {
804    config: ContainerConfig,
805}
806
807#[async_trait]
808impl Consumer for ContainerConsumer {
809    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
810        match self.config.operation.as_str() {
811            "events" => self.start_events_consumer(context).await,
812            "logs" => self.start_logs_consumer(context).await,
813            _ => Err(CamelError::EndpointCreationFailed(format!(
814                "Consumer only supports 'events' or 'logs' operations, got '{}'",
815                self.config.operation
816            ))),
817        }
818    }
819
820    async fn stop(&mut self) -> Result<(), CamelError> {
821        Ok(())
822    }
823
824    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
825        camel_component::ConcurrencyModel::Concurrent { max: None }
826    }
827}
828
829impl ContainerConsumer {
830    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
831        use futures::StreamExt;
832
833        loop {
834            if context.is_cancelled() {
835                tracing::info!("Container events consumer shutting down");
836                return Ok(());
837            }
838
839            let docker = match self.config.connect_docker().await {
840                Ok(d) => d,
841                Err(e) => {
842                    tracing::error!(
843                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
844                        e
845                    );
846                    tokio::select! {
847                        _ = context.cancelled() => {
848                            tracing::info!("Container events consumer shutting down");
849                            return Ok(());
850                        }
851                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
852                    }
853                    continue;
854                }
855            };
856
857            let mut event_stream = docker.events::<String>(None);
858
859            loop {
860                tokio::select! {
861                    _ = context.cancelled() => {
862                        tracing::info!("Container events consumer shutting down");
863                        return Ok(());
864                    }
865
866                    msg = event_stream.next() => {
867                        match msg {
868                            Some(Ok(event)) => {
869                                let formatted = format_docker_event(&event);
870                                let message = Message::new(Body::Text(formatted));
871                                let exchange = Exchange::new(message);
872
873                                if let Err(e) = context.send(exchange).await {
874                                    tracing::error!("Failed to send exchange: {:?}", e);
875                                    break;
876                                }
877                            }
878                            Some(Err(e)) => {
879                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
880                                break;
881                            }
882                            None => {
883                                tracing::info!("Docker event stream ended. Reconnecting...");
884                                break;
885                            }
886                        }
887                    }
888                }
889            }
890
891            tokio::select! {
892                _ = context.cancelled() => {
893                    tracing::info!("Container events consumer shutting down");
894                    return Ok(());
895                }
896                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
897            }
898        }
899    }
900
901    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
902        use futures::StreamExt;
903
904        let container_id = self.config.container_id.clone().ok_or_else(|| {
905            CamelError::EndpointCreationFailed(
906                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
907                    .to_string(),
908            )
909        })?;
910
911        loop {
912            if context.is_cancelled() {
913                tracing::info!("Container logs consumer shutting down");
914                return Ok(());
915            }
916
917            let docker = match self.config.connect_docker().await {
918                Ok(d) => d,
919                Err(e) => {
920                    tracing::error!(
921                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
922                        e
923                    );
924                    tokio::select! {
925                        _ = context.cancelled() => {
926                            tracing::info!("Container logs consumer shutting down");
927                            return Ok(());
928                        }
929                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
930                    }
931                    continue;
932                }
933            };
934
935            let tail = self
936                .config
937                .tail
938                .clone()
939                .unwrap_or_else(|| "all".to_string());
940
941            let options = bollard::container::LogsOptions::<String> {
942                follow: self.config.follow,
943                stdout: true,
944                stderr: true,
945                timestamps: self.config.timestamps,
946                tail,
947                ..Default::default()
948            };
949
950            let mut log_stream = docker.logs(&container_id, Some(options));
951            let container_id_header = container_id.clone();
952
953            loop {
954                tokio::select! {
955                    _ = context.cancelled() => {
956                        tracing::info!("Container logs consumer shutting down");
957                        return Ok(());
958                    }
959
960                    msg = log_stream.next() => {
961                        match msg {
962                            Some(Ok(log_output)) => {
963                                let (stream_type, content) = match log_output {
964                                    bollard::container::LogOutput::StdOut { message } => {
965                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
966                                    }
967                                    bollard::container::LogOutput::StdErr { message } => {
968                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
969                                    }
970                                    bollard::container::LogOutput::Console { message } => {
971                                        ("console", String::from_utf8_lossy(&message).into_owned())
972                                    }
973                                    bollard::container::LogOutput::StdIn { message } => {
974                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
975                                    }
976                                };
977
978                                let content = content.trim_end();
979                                if content.is_empty() {
980                                    continue;
981                                }
982
983                                let mut message = Message::new(Body::Text(content.to_string()));
984                                message.set_header(
985                                    HEADER_CONTAINER_ID,
986                                    serde_json::Value::String(container_id_header.clone()),
987                                );
988                                message.set_header(
989                                    HEADER_LOG_STREAM,
990                                    serde_json::Value::String(stream_type.to_string()),
991                                );
992
993                                if self.config.timestamps
994                                    && let Some(ts) = extract_timestamp(content) {
995                                        message.set_header(
996                                            HEADER_LOG_TIMESTAMP,
997                                            serde_json::Value::String(ts),
998                                        );
999                                    }
1000
1001                                let exchange = Exchange::new(message);
1002
1003                                if let Err(e) = context.send(exchange).await {
1004                                    tracing::error!("Failed to send log exchange: {:?}", e);
1005                                    break;
1006                                }
1007                            }
1008                            Some(Err(e)) => {
1009                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1010                                break;
1011                            }
1012                            None => {
1013                                if self.config.follow {
1014                                    tracing::info!("Docker log stream ended. Reconnecting...");
1015                                    break;
1016                                } else {
1017                                    tracing::info!("Container logs consumer finished (follow=false)");
1018                                    return Ok(());
1019                                }
1020                            }
1021                        }
1022                    }
1023                }
1024            }
1025
1026            tokio::select! {
1027                _ = context.cancelled() => {
1028                    tracing::info!("Container logs consumer shutting down");
1029                    return Ok(());
1030                }
1031                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1032            }
1033        }
1034    }
1035}
1036
1037fn extract_timestamp(log_line: &str) -> Option<String> {
1038    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1039    if parts.len() > 1 && parts[0].contains('T') {
1040        Some(parts[0].to_string())
1041    } else {
1042        None
1043    }
1044}
1045
1046/// Component for creating container endpoints.
1047///
1048/// This component handles URIs with the "container" scheme and creates
1049/// appropriate producer and consumer endpoints for Docker operations.
1050///
1051/// Containers created via `run` operation are tracked globally and can be
1052/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1053pub struct ContainerComponent {
1054    config: Option<ContainerGlobalConfig>,
1055}
1056
1057impl ContainerComponent {
1058    /// Creates a new container component instance without global config.
1059    pub fn new() -> Self {
1060        Self { config: None }
1061    }
1062
1063    /// Creates a container component with the given global config.
1064    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1065        Self {
1066            config: Some(config),
1067        }
1068    }
1069
1070    /// Creates a container component with optional global config.
1071    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1072        Self { config }
1073    }
1074}
1075
1076impl Default for ContainerComponent {
1077    fn default() -> Self {
1078        Self::new()
1079    }
1080}
1081
1082impl Component for ContainerComponent {
1083    fn scheme(&self) -> &str {
1084        "container"
1085    }
1086
1087    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1088        let mut config = ContainerConfig::from_uri(uri)?;
1089        // Apply global defaults if present and URI didn't set them
1090        if let Some(ref global) = self.config {
1091            config.apply_global_defaults(global);
1092        }
1093        Ok(Box::new(ContainerEndpoint {
1094            uri: uri.to_string(),
1095            config,
1096        }))
1097    }
1098}
1099
1100/// Endpoint for container operations.
1101///
1102/// This endpoint creates producers for executing container operations
1103/// and consumers for receiving container events.
1104pub struct ContainerEndpoint {
1105    uri: String,
1106    config: ContainerConfig,
1107}
1108
1109impl ContainerEndpoint {
1110    /// Returns the Docker host configured for this endpoint.
1111    /// Returns `None` if not set (for testing purposes).
1112    pub fn docker_host(&self) -> Option<&str> {
1113        self.config.host.as_deref()
1114    }
1115}
1116
1117impl Endpoint for ContainerEndpoint {
1118    fn uri(&self) -> &str {
1119        &self.uri
1120    }
1121
1122    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1123        Ok(Box::new(ContainerConsumer {
1124            config: self.config.clone(),
1125        }))
1126    }
1127
1128    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1129        let docker = self.config.connect_docker_client()?;
1130        Ok(BoxProcessor::new(ContainerProducer {
1131            config: self.config.clone(),
1132            docker,
1133        }))
1134    }
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139    use super::*;
1140
1141    #[test]
1142    fn test_container_config() {
1143        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1144        assert_eq!(config.operation, "run");
1145        assert_eq!(config.image.as_deref(), Some("alpine"));
1146        // host is None by default; global config applies it later
1147        assert!(config.host.is_none());
1148    }
1149
1150    #[test]
1151    fn test_global_config_applied_to_endpoint() {
1152        // When global config is set and URI doesn't specify host,
1153        // apply_global_defaults should set host from global config.
1154        let global =
1155            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1156        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1157        assert!(
1158            config.host.is_none(),
1159            "URI without ?host= should leave host as None"
1160        );
1161        config.apply_global_defaults(&global);
1162        assert_eq!(
1163            config.host.as_deref(),
1164            Some("unix:///custom/docker.sock"),
1165            "global docker_host must be applied when URI did not set host"
1166        );
1167    }
1168
1169    #[test]
1170    fn test_uri_param_wins_over_global_config() {
1171        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1172        let global =
1173            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1174        let mut config =
1175            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1176                .unwrap();
1177        assert_eq!(
1178            config.host.as_deref(),
1179            Some("unix:///override.sock"),
1180            "URI-set host should be parsed correctly"
1181        );
1182        config.apply_global_defaults(&global);
1183        assert_eq!(
1184            config.host.as_deref(),
1185            Some("unix:///override.sock"),
1186            "global config must NOT override a host already set by URI"
1187        );
1188    }
1189
1190    #[test]
1191    fn test_container_config_parses_name() {
1192        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1193        assert_eq!(config.name.as_deref(), Some("my-container"));
1194    }
1195
1196    #[test]
1197    fn test_parse_producer_operation_known() {
1198        assert_eq!(
1199            parse_producer_operation("list").unwrap(),
1200            ProducerOperation::List
1201        );
1202        assert_eq!(
1203            parse_producer_operation("run").unwrap(),
1204            ProducerOperation::Run
1205        );
1206        assert_eq!(
1207            parse_producer_operation("start").unwrap(),
1208            ProducerOperation::Start
1209        );
1210        assert_eq!(
1211            parse_producer_operation("stop").unwrap(),
1212            ProducerOperation::Stop
1213        );
1214        assert_eq!(
1215            parse_producer_operation("remove").unwrap(),
1216            ProducerOperation::Remove
1217        );
1218    }
1219
1220    #[test]
1221    fn test_parse_producer_operation_unknown() {
1222        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1223        match err {
1224            CamelError::ProcessorError(msg) => {
1225                assert!(
1226                    msg.contains("Unknown container operation"),
1227                    "Unexpected error message: {}",
1228                    msg
1229                );
1230            }
1231            _ => panic!("Expected ProcessorError for unknown operation"),
1232        }
1233    }
1234
1235    #[test]
1236    fn test_resolve_container_name_header_overrides_config() {
1237        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1238        let mut exchange = Exchange::new(Message::new(""));
1239        exchange.input.set_header(
1240            HEADER_CONTAINER_NAME,
1241            serde_json::Value::String("header-name".to_string()),
1242        );
1243
1244        let resolved = resolve_container_name(&exchange, &config);
1245        assert_eq!(resolved.as_deref(), Some("header-name"));
1246    }
1247
1248    #[test]
1249    fn test_container_config_rejects_tcp_host() {
1250        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1251        let err = config.connect_docker_client().unwrap_err();
1252        match err {
1253            CamelError::ProcessorError(msg) => {
1254                assert!(
1255                    msg.to_lowercase().contains("tcp"),
1256                    "Expected TCP scheme error, got: {}",
1257                    msg
1258                );
1259            }
1260            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1261        }
1262    }
1263
1264    #[tokio::test]
1265    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1266        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1267        let remove_called_clone = remove_called.clone();
1268
1269        let result = run_container_with_cleanup(
1270            || async { Ok("container-123".to_string()) },
1271            |_id| async move {
1272                Err(CamelError::ProcessorError(
1273                    "Failed to start container".to_string(),
1274                ))
1275            },
1276            move |_id| {
1277                let remove_called_inner = remove_called_clone.clone();
1278                async move {
1279                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1280                    Ok(())
1281                }
1282            },
1283        )
1284        .await;
1285
1286        assert!(result.is_err(), "Expected start failure to bubble up");
1287        assert!(
1288            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1289            "Expected cleanup to remove container"
1290        );
1291    }
1292
1293    #[test]
1294    fn test_container_component_creates_endpoint() {
1295        let component = ContainerComponent::new();
1296        assert_eq!(component.scheme(), "container");
1297        let endpoint = component
1298            .create_endpoint("container:run?image=alpine")
1299            .unwrap();
1300        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1301    }
1302
1303    #[test]
1304    fn test_container_config_parses_ports() {
1305        let config =
1306            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1307        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1308    }
1309
1310    #[test]
1311    fn test_container_config_parses_env() {
1312        let config =
1313            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1314        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1315    }
1316
1317    #[test]
1318    fn test_container_config_parses_logs_options() {
1319        let config = ContainerConfig::from_uri(
1320            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1321        )
1322        .unwrap();
1323        assert_eq!(config.operation, "logs");
1324        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1325        assert!(config.follow);
1326        assert!(config.timestamps);
1327        assert_eq!(config.tail.as_deref(), Some("100"));
1328    }
1329
1330    #[test]
1331    fn test_container_config_logs_defaults() {
1332        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1333        assert!(config.follow); // default: true
1334        assert!(!config.timestamps); // default: false
1335        assert!(config.tail.is_none()); // default: None (all)
1336    }
1337
1338    #[test]
1339    fn test_parse_ports_single() {
1340        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1341        let (exposed, bindings) = config.parse_ports().unwrap();
1342
1343        assert!(exposed.contains_key("80/tcp"));
1344        assert!(bindings.contains_key("80/tcp"));
1345
1346        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1347        assert_eq!(binding.len(), 1);
1348        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1349    }
1350
1351    #[test]
1352    fn test_parse_ports_multiple() {
1353        let config =
1354            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1355        let (exposed, bindings) = config.parse_ports().unwrap();
1356
1357        assert!(exposed.contains_key("80/tcp"));
1358        assert!(exposed.contains_key("443/tcp"));
1359        assert_eq!(bindings.len(), 2);
1360    }
1361
1362    #[test]
1363    fn test_parse_ports_with_protocol() {
1364        let config =
1365            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1366                .unwrap();
1367        let (exposed, _bindings) = config.parse_ports().unwrap();
1368
1369        assert!(exposed.contains_key("80/tcp"));
1370        assert!(exposed.contains_key("53/udp"));
1371    }
1372
1373    #[test]
1374    fn test_parse_ports_none() {
1375        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1376        assert!(config.parse_ports().is_none());
1377    }
1378
1379    #[test]
1380    fn test_parse_env_single() {
1381        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1382        let env = config.parse_env().unwrap();
1383
1384        assert_eq!(env.len(), 1);
1385        assert_eq!(env[0], "FOO=bar");
1386    }
1387
1388    #[test]
1389    fn test_parse_env_multiple() {
1390        let config =
1391            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1392                .unwrap();
1393        let env = config.parse_env().unwrap();
1394
1395        assert_eq!(env.len(), 3);
1396        assert!(env.contains(&"FOO=bar".to_string()));
1397        assert!(env.contains(&"BAZ=qux".to_string()));
1398        assert!(env.contains(&"NUM=123".to_string()));
1399    }
1400
1401    #[test]
1402    fn test_parse_env_none() {
1403        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1404        assert!(config.parse_env().is_none());
1405    }
1406
1407    use camel_api::Message;
1408    use std::sync::Arc;
1409
1410    #[tokio::test]
1411    async fn test_container_producer_resolves_operation_from_header() {
1412        // Try to connect to Docker - if fails, return early
1413        let docker = match Docker::connect_with_local_defaults() {
1414            Ok(d) => d,
1415            Err(_) => {
1416                eprintln!("Skipping test: Could not connect to Docker daemon");
1417                return;
1418            }
1419        };
1420
1421        if docker.ping().await.is_err() {
1422            eprintln!("Skipping test: Docker daemon not responding to ping");
1423            return;
1424        }
1425
1426        let component = ContainerComponent::new();
1427        let endpoint = component.create_endpoint("container:run").unwrap();
1428
1429        let ctx = ProducerContext::new();
1430        let mut producer = endpoint.create_producer(&ctx).unwrap();
1431
1432        let mut exchange = Exchange::new(Message::new(""));
1433        exchange
1434            .input
1435            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1436
1437        use tower::ServiceExt;
1438        let result = producer
1439            .ready()
1440            .await
1441            .unwrap()
1442            .call(exchange)
1443            .await
1444            .unwrap();
1445
1446        // For list operation, the result header should be on the input message
1447        assert_eq!(
1448            result
1449                .input
1450                .header(HEADER_ACTION_RESULT)
1451                .map(|v| v.as_str().unwrap()),
1452            Some("success")
1453        );
1454    }
1455
1456    #[tokio::test]
1457    async fn test_container_producer_connection_error_on_invalid_host() {
1458        // Test that an invalid host (nonexistent socket) results in a connection error
1459        let component = ContainerComponent::new();
1460        let endpoint = component
1461            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1462            .unwrap();
1463
1464        let ctx = ProducerContext::new();
1465        let result = endpoint.create_producer(&ctx);
1466
1467        // The producer should return an error because it cannot connect to the invalid socket
1468        assert!(
1469            result.is_err(),
1470            "Expected error when connecting to invalid host"
1471        );
1472        let err = result.unwrap_err();
1473        match &err {
1474            CamelError::ProcessorError(msg) => {
1475                assert!(
1476                    msg.to_lowercase().contains("connection")
1477                        || msg.to_lowercase().contains("connect")
1478                        || msg.to_lowercase().contains("socket")
1479                        || msg.contains("docker"),
1480                    "Error message should indicate connection failure, got: {}",
1481                    msg
1482                );
1483            }
1484            _ => panic!("Expected ProcessorError, got: {:?}", err),
1485        }
1486    }
1487
1488    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
1489    #[tokio::test]
1490    async fn test_container_producer_lifecycle_operations_missing_id() {
1491        // Try to connect to Docker - if fails, return early
1492        let docker = match Docker::connect_with_local_defaults() {
1493            Ok(d) => d,
1494            Err(_) => {
1495                eprintln!("Skipping test: Could not connect to Docker daemon");
1496                return;
1497            }
1498        };
1499
1500        if docker.ping().await.is_err() {
1501            eprintln!("Skipping test: Docker daemon not responding to ping");
1502            return;
1503        }
1504
1505        let component = ContainerComponent::new();
1506        let endpoint = component.create_endpoint("container:start").unwrap();
1507        let ctx = ProducerContext::new();
1508        let mut producer = endpoint.create_producer(&ctx).unwrap();
1509
1510        // Test each lifecycle operation without CamelContainerId header
1511        for operation in ["start", "stop", "remove"] {
1512            let mut exchange = Exchange::new(Message::new(""));
1513            exchange.input.set_header(
1514                HEADER_ACTION,
1515                serde_json::Value::String(operation.to_string()),
1516            );
1517            // Deliberately NOT setting CamelContainerId header
1518
1519            use tower::ServiceExt;
1520            let result = producer.ready().await.unwrap().call(exchange).await;
1521
1522            assert!(
1523                result.is_err(),
1524                "Expected error for {} operation without CamelContainerId",
1525                operation
1526            );
1527            let err = result.unwrap_err();
1528            match &err {
1529                CamelError::ProcessorError(msg) => {
1530                    assert!(
1531                        msg.contains(HEADER_CONTAINER_ID),
1532                        "Error message should mention {}, got: {}",
1533                        HEADER_CONTAINER_ID,
1534                        msg
1535                    );
1536                }
1537                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1538            }
1539        }
1540    }
1541
1542    /// Test that stop operation returns an error for a nonexistent container.
1543    #[tokio::test]
1544    async fn test_container_producer_stop_nonexistent() {
1545        // Try to connect to Docker - if fails, return early
1546        let docker = match Docker::connect_with_local_defaults() {
1547            Ok(d) => d,
1548            Err(_) => {
1549                eprintln!("Skipping test: Could not connect to Docker daemon");
1550                return;
1551            }
1552        };
1553
1554        if docker.ping().await.is_err() {
1555            eprintln!("Skipping test: Docker daemon not responding to ping");
1556            return;
1557        }
1558
1559        let component = ContainerComponent::new();
1560        let endpoint = component.create_endpoint("container:stop").unwrap();
1561        let ctx = ProducerContext::new();
1562        let mut producer = endpoint.create_producer(&ctx).unwrap();
1563
1564        let mut exchange = Exchange::new(Message::new(""));
1565        exchange
1566            .input
1567            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1568        exchange.input.set_header(
1569            HEADER_CONTAINER_ID,
1570            serde_json::Value::String("nonexistent-container-123".into()),
1571        );
1572
1573        use tower::ServiceExt;
1574        let result = producer.ready().await.unwrap().call(exchange).await;
1575
1576        assert!(
1577            result.is_err(),
1578            "Expected error when stopping nonexistent container"
1579        );
1580        let err = result.unwrap_err();
1581        match &err {
1582            CamelError::ProcessorError(msg) => {
1583                // Docker API returns 404 with "No such container" message
1584                assert!(
1585                    msg.to_lowercase().contains("no such container")
1586                        || msg.to_lowercase().contains("not found")
1587                        || msg.contains("404"),
1588                    "Error message should indicate container not found, got: {}",
1589                    msg
1590                );
1591            }
1592            _ => panic!("Expected ProcessorError, got: {:?}", err),
1593        }
1594    }
1595
1596    /// Test that run operation returns an error when no image is provided.
1597    #[tokio::test]
1598    async fn test_container_producer_run_missing_image() {
1599        // Try to connect to Docker - if fails, return early
1600        let docker = match Docker::connect_with_local_defaults() {
1601            Ok(d) => d,
1602            Err(_) => {
1603                eprintln!("Skipping test: Could not connect to Docker daemon");
1604                return;
1605            }
1606        };
1607
1608        if docker.ping().await.is_err() {
1609            eprintln!("Skipping test: Docker daemon not responding to ping");
1610            return;
1611        }
1612
1613        // Create producer without an image in the URI
1614        let component = ContainerComponent::new();
1615        let endpoint = component.create_endpoint("container:run").unwrap();
1616        let ctx = ProducerContext::new();
1617        let mut producer = endpoint.create_producer(&ctx).unwrap();
1618
1619        let mut exchange = Exchange::new(Message::new(""));
1620        exchange
1621            .input
1622            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1623        // Deliberately NOT setting CamelContainerImage header
1624
1625        use tower::ServiceExt;
1626        let result = producer.ready().await.unwrap().call(exchange).await;
1627
1628        assert!(
1629            result.is_err(),
1630            "Expected error for run operation without image"
1631        );
1632        let err = result.unwrap_err();
1633        match &err {
1634            CamelError::ProcessorError(msg) => {
1635                assert!(
1636                    msg.to_lowercase().contains("image"),
1637                    "Error message should mention 'image', got: {}",
1638                    msg
1639                );
1640            }
1641            _ => panic!("Expected ProcessorError, got: {:?}", err),
1642        }
1643    }
1644
1645    /// Test that run operation uses image from header.
1646    #[tokio::test]
1647    async fn test_container_producer_run_image_from_header() {
1648        // Try to connect to Docker - if fails, return early
1649        let docker = match Docker::connect_with_local_defaults() {
1650            Ok(d) => d,
1651            Err(_) => {
1652                eprintln!("Skipping test: Could not connect to Docker daemon");
1653                return;
1654            }
1655        };
1656
1657        if docker.ping().await.is_err() {
1658            eprintln!("Skipping test: Docker daemon not responding to ping");
1659            return;
1660        }
1661
1662        // Create producer without an image in the URI
1663        let component = ContainerComponent::new();
1664        let endpoint = component.create_endpoint("container:run").unwrap();
1665        let ctx = ProducerContext::new();
1666        let mut producer = endpoint.create_producer(&ctx).unwrap();
1667
1668        let mut exchange = Exchange::new(Message::new(""));
1669        exchange
1670            .input
1671            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1672        // Set a non-existent image to test that the run operation attempts to use it
1673        exchange.input.set_header(
1674            HEADER_IMAGE,
1675            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1676        );
1677
1678        use tower::ServiceExt;
1679        let result = producer.ready().await.unwrap().call(exchange).await;
1680
1681        // The operation should fail because the image doesn't exist
1682        assert!(
1683            result.is_err(),
1684            "Expected error when running container with nonexistent image"
1685        );
1686        let err = result.unwrap_err();
1687        match &err {
1688            CamelError::ProcessorError(msg) => {
1689                // Docker API returns an error about the missing image
1690                assert!(
1691                    msg.to_lowercase().contains("no such image")
1692                        || msg.to_lowercase().contains("not found")
1693                        || msg.to_lowercase().contains("image")
1694                        || msg.to_lowercase().contains("pull")
1695                        || msg.contains("404"),
1696                    "Error message should indicate image issue, got: {}",
1697                    msg
1698                );
1699            }
1700            _ => panic!("Expected ProcessorError, got: {:?}", err),
1701        }
1702    }
1703
1704    /// Integration test: Actually run a container with alpine:latest.
1705    /// This test verifies the full flow: create → start → set headers.
1706    #[tokio::test]
1707    async fn test_container_producer_run_alpine_container() {
1708        let docker = match Docker::connect_with_local_defaults() {
1709            Ok(d) => d,
1710            Err(_) => {
1711                eprintln!("Skipping test: Could not connect to Docker daemon");
1712                return;
1713            }
1714        };
1715
1716        if docker.ping().await.is_err() {
1717            eprintln!("Skipping test: Docker daemon not responding to ping");
1718            return;
1719        }
1720
1721        // Pull alpine:latest if not present
1722        let images = docker.list_images::<&str>(None).await.unwrap();
1723        let has_alpine = images
1724            .iter()
1725            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1726
1727        if !has_alpine {
1728            eprintln!("Pulling alpine:latest image...");
1729            let mut stream = docker.create_image(
1730                Some(bollard::image::CreateImageOptions {
1731                    from_image: "alpine:latest",
1732                    ..Default::default()
1733                }),
1734                None,
1735                None,
1736            );
1737
1738            use futures::StreamExt;
1739            while let Some(_item) = stream.next().await {
1740                // Wait for pull to complete
1741            }
1742            eprintln!("Image pulled successfully");
1743        }
1744
1745        // Create producer
1746        let component = ContainerComponent::new();
1747        let endpoint = component.create_endpoint("container:run").unwrap();
1748        let ctx = ProducerContext::new();
1749        let mut producer = endpoint.create_producer(&ctx).unwrap();
1750
1751        // Run container with unique name
1752        let timestamp = std::time::SystemTime::now()
1753            .duration_since(std::time::UNIX_EPOCH)
1754            .unwrap()
1755            .as_millis();
1756        let container_name = format!("test-rust-camel-{}", timestamp);
1757        let mut exchange = Exchange::new(Message::new(""));
1758        exchange.input.set_header(
1759            HEADER_IMAGE,
1760            serde_json::Value::String("alpine:latest".into()),
1761        );
1762        exchange.input.set_header(
1763            HEADER_CONTAINER_NAME,
1764            serde_json::Value::String(container_name.clone()),
1765        );
1766
1767        use tower::ServiceExt;
1768        let result = producer
1769            .ready()
1770            .await
1771            .unwrap()
1772            .call(exchange)
1773            .await
1774            .expect("Container run should succeed");
1775
1776        // Verify container ID was set
1777        let container_id = result
1778            .input
1779            .header(HEADER_CONTAINER_ID)
1780            .and_then(|v| v.as_str().map(|s| s.to_string()))
1781            .expect("Expected container ID header");
1782        assert!(!container_id.is_empty(), "Container ID should not be empty");
1783
1784        // Verify success header
1785        assert_eq!(
1786            result
1787                .input
1788                .header(HEADER_ACTION_RESULT)
1789                .and_then(|v| v.as_str()),
1790            Some("success")
1791        );
1792
1793        // Verify container exists in Docker
1794        let inspect = docker
1795            .inspect_container(&container_id, None)
1796            .await
1797            .expect("Container should exist");
1798        assert_eq!(
1799            inspect.id.as_ref().map(|s| s.as_str()),
1800            Some(container_id.as_str())
1801        );
1802
1803        // Cleanup: remove container
1804        docker
1805            .remove_container(
1806                &container_id,
1807                Some(bollard::container::RemoveContainerOptions {
1808                    force: true,
1809                    ..Default::default()
1810                }),
1811            )
1812            .await
1813            .ok();
1814
1815        eprintln!("✅ Container {} created and cleaned up", container_id);
1816    }
1817
1818    /// Test that consumer returns an error for unsupported operations.
1819    #[tokio::test]
1820    async fn test_container_consumer_unsupported_operation() {
1821        use tokio::sync::mpsc;
1822
1823        let component = ContainerComponent::new();
1824        let endpoint = component.create_endpoint("container:run").unwrap();
1825        let mut consumer = endpoint.create_consumer().unwrap();
1826
1827        // Create a minimal ConsumerContext
1828        let (tx, _rx) = mpsc::channel(16);
1829        let cancel_token = tokio_util::sync::CancellationToken::new();
1830        let context = ConsumerContext::new(tx, cancel_token);
1831
1832        let result = consumer.start(context).await;
1833
1834        // Should return error because "run" is not a supported consumer operation
1835        assert!(
1836            result.is_err(),
1837            "Expected error for unsupported consumer operation"
1838        );
1839        let err = result.unwrap_err();
1840        match &err {
1841            CamelError::EndpointCreationFailed(msg) => {
1842                assert!(
1843                    msg.contains("Consumer only supports 'events' or 'logs'"),
1844                    "Error message should mention events or logs support, got: {}",
1845                    msg
1846                );
1847            }
1848            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1849        }
1850    }
1851
1852    #[test]
1853    fn test_container_consumer_concurrency_model_is_concurrent() {
1854        let consumer = ContainerConsumer {
1855            config: ContainerConfig::from_uri("container:events").unwrap(),
1856        };
1857
1858        assert_eq!(
1859            consumer.concurrency_model(),
1860            camel_component::ConcurrencyModel::Concurrent { max: None }
1861        );
1862    }
1863
1864    /// Test that consumer gracefully shuts down when cancellation is requested.
1865    /// This test requires a running Docker daemon. If Docker is not available, the test
1866    /// will be ignored.
1867    #[tokio::test]
1868    async fn test_container_consumer_cancellation() {
1869        use std::sync::atomic::{AtomicBool, Ordering};
1870        use tokio::sync::mpsc;
1871
1872        // Try to connect to Docker - if fails, return early
1873        let docker = match Docker::connect_with_local_defaults() {
1874            Ok(d) => d,
1875            Err(_) => {
1876                eprintln!("Skipping test: Could not connect to Docker daemon");
1877                return;
1878            }
1879        };
1880
1881        if docker.ping().await.is_err() {
1882            eprintln!("Skipping test: Docker daemon not responding to ping");
1883            return;
1884        }
1885
1886        let component = ContainerComponent::new();
1887        let endpoint = component.create_endpoint("container:events").unwrap();
1888        let mut consumer = endpoint.create_consumer().unwrap();
1889
1890        // Create a ConsumerContext
1891        let (tx, _rx) = mpsc::channel(16);
1892        let cancel_token = tokio_util::sync::CancellationToken::new();
1893        let context = ConsumerContext::new(tx, cancel_token.clone());
1894
1895        // Track if the consumer task has completed
1896        let completed = Arc::new(AtomicBool::new(false));
1897        let completed_clone = completed.clone();
1898
1899        // Spawn consumer in background
1900        let handle = tokio::spawn(async move {
1901            let result = consumer.start(context).await;
1902            // Mark as completed when done
1903            completed_clone.store(true, Ordering::SeqCst);
1904            result
1905        });
1906
1907        // Wait a bit for the consumer to start
1908        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1909
1910        // Consumer should still be running (not completed)
1911        assert!(
1912            !completed.load(Ordering::SeqCst),
1913            "Consumer should still be running before cancellation"
1914        );
1915
1916        // Request cancellation
1917        cancel_token.cancel();
1918
1919        // Wait for the task to complete (with timeout)
1920        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1921
1922        // Task should have completed (not timed out)
1923        assert!(
1924            result.is_ok(),
1925            "Consumer should gracefully shut down after cancellation"
1926        );
1927
1928        // Verify the consumer completed
1929        assert!(
1930            completed.load(Ordering::SeqCst),
1931            "Consumer should have completed after cancellation"
1932        );
1933    }
1934
1935    /// Integration test for listing containers.
1936    /// This test requires a running Docker daemon. If Docker is not available, the test
1937    /// will return early and be effectively ignored.
1938    #[tokio::test]
1939    async fn test_container_producer_list_containers() {
1940        // Try to connect to Docker using default config
1941        // If ping fails, return early (effectively ignoring this test)
1942        let docker = match Docker::connect_with_local_defaults() {
1943            Ok(d) => d,
1944            Err(_) => {
1945                eprintln!("Skipping test: Could not connect to Docker daemon");
1946                return;
1947            }
1948        };
1949
1950        if docker.ping().await.is_err() {
1951            eprintln!("Skipping test: Docker daemon not responding to ping");
1952            return;
1953        }
1954
1955        // Create producer with list operation
1956        let component = ContainerComponent::new();
1957        let endpoint = component.create_endpoint("container:list").unwrap();
1958
1959        let ctx = ProducerContext::new();
1960        let mut producer = endpoint.create_producer(&ctx).unwrap();
1961
1962        // Create exchange with list operation in header
1963        let mut exchange = Exchange::new(Message::new(""));
1964        exchange
1965            .input
1966            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1967
1968        // Call the producer
1969        use tower::ServiceExt;
1970        let result = producer
1971            .ready()
1972            .await
1973            .unwrap()
1974            .call(exchange)
1975            .await
1976            .expect("Producer should succeed when Docker is available");
1977
1978        // Assert that the input exchange body is a JSON array
1979        // (because list_containers should put the JSON result in the body)
1980        match &result.input.body {
1981            camel_api::Body::Json(json_value) => {
1982                assert!(
1983                    json_value.is_array(),
1984                    "Expected input body to be a JSON array, got: {:?}",
1985                    json_value
1986                );
1987            }
1988            other => panic!("Expected Body::Json with array, got: {:?}", other),
1989        }
1990    }
1991}