Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21    ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24    CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25    ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26    StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::parse_uri;
30use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
31use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
32use tower::Service;
33
34/// Global tracker for containers created by this component.
35/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
36static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
37    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
38
39/// Registers a container ID for tracking (will be cleaned up on shutdown).
40fn track_container(id: String) {
41    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
42        tracker.insert(id);
43    }
44}
45
46/// Removes a container ID from tracking (when it's been removed naturally).
47fn untrack_container(id: &str) {
48    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
49        tracker.remove(id);
50    }
51}
52
53/// Cleans up all tracked containers. Call this on application shutdown.
54pub async fn cleanup_tracked_containers() {
55    let ids: Vec<String> = {
56        match CONTAINER_TRACKER.lock() {
57            Ok(tracker) => tracker.iter().cloned().collect(),
58            Err(_) => return,
59        }
60    };
61
62    if ids.is_empty() {
63        return;
64    }
65
66    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
67
68    let docker = match Docker::connect_with_local_defaults() {
69        Ok(d) => d,
70        Err(e) => {
71            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
72            return;
73        }
74    };
75
76    for id in ids {
77        match docker
78            .remove_container(
79                &id,
80                Some(RemoveContainerOptions {
81                    force: true,
82                    ..Default::default()
83                }),
84            )
85            .await
86        {
87            Ok(_) => {
88                tracing::debug!("Cleaned up container {}", id);
89                untrack_container(&id);
90            }
91            Err(e) => {
92                tracing::warn!("Failed to cleanup container {}: {}", id, e);
93            }
94        }
95    }
96}
97
98// Header constants for container operations
99
100/// Timeout (seconds) for connecting to the Docker daemon.
101const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
102
103/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
104pub const HEADER_ACTION: &str = "CamelContainerAction";
105
106/// Header key for specifying the container image to use for "run" operations.
107pub const HEADER_IMAGE: &str = "CamelContainerImage";
108
109/// Header key for specifying or receiving the container ID.
110pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
111
112/// Header key for the log stream type (stdout or stderr).
113pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
114
115/// Header key for the log timestamp.
116pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
117
118/// Header key for specifying the container name for "run" operations.
119pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
120
121/// Header key for the result status of a container operation (e.g., "success").
122pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
123
124/// Header key for specifying the command to execute in a container.
125pub const HEADER_CMD: &str = "CamelContainerCmd";
126
127/// Header key for specifying the network name for network operations.
128pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
129
130/// Header key for the exit code of an exec operation.
131pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
132
133/// Header key for specifying volume mounts.
134pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
135
136/// Header key for the exec instance ID.
137pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
138
139// ---------------------------------------------------------------------------
140// ContainerGlobalConfig
141// ---------------------------------------------------------------------------
142
143/// Global configuration for Container component.
144/// Supports serde deserialization with defaults and builder methods.
145/// These are the fallback defaults when URI params are not set.
146#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
147#[serde(default)]
148pub struct ContainerGlobalConfig {
149    /// The Docker host URL (default: "unix:///var/run/docker.sock").
150    pub docker_host: String,
151}
152
153impl Default for ContainerGlobalConfig {
154    fn default() -> Self {
155        Self {
156            docker_host: "unix:///var/run/docker.sock".to_string(),
157        }
158    }
159}
160
161impl ContainerGlobalConfig {
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
167        self.docker_host = v.into();
168        self
169    }
170}
171
172// ---------------------------------------------------------------------------
173// ContainerConfig (endpoint configuration)
174// ---------------------------------------------------------------------------
175
176/// Configuration for the container component endpoint.
177///
178/// This struct holds the parsed URI configuration including the operation type,
179/// optional container image, and Docker host connection details.
180#[derive(Debug, Clone)]
181pub struct ContainerConfig {
182    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
183    pub operation: String,
184    /// The container image to use for "run" operations (can be overridden via header).
185    pub image: Option<String>,
186    /// The container name to use for "run" operations (can be overridden via header).
187    pub name: Option<String>,
188    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
189    pub host: Option<String>,
190    /// Command to run in the container (e.g., "sleep 30").
191    pub cmd: Option<String>,
192    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
193    pub ports: Option<String>,
194    /// Environment variables in format "KEY=value,KEY2=value2".
195    pub env: Option<String>,
196    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
197    pub network: Option<String>,
198    /// Container ID or name for logs consumer.
199    pub container_id: Option<String>,
200    /// Follow log output (default: true for consumer).
201    pub follow: bool,
202    /// Include timestamps in logs (default: false).
203    pub timestamps: bool,
204    /// Number of lines to show from the end of logs (default: all).
205    pub tail: Option<String>,
206    /// Automatically pull the image if not present (default: true).
207    pub auto_pull: bool,
208    /// Automatically remove the container when it exits (default: true).
209    pub auto_remove: bool,
210    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
211    pub volumes: Option<String>,
212    /// User to run the container or exec command as (e.g., "root").
213    pub user: Option<String>,
214    /// Working directory inside the container.
215    pub workdir: Option<String>,
216    /// Whether to detach from the exec process (default: false).
217    pub detach: bool,
218    /// Network driver for network-create (e.g., "bridge", "overlay").
219    pub driver: Option<String>,
220    /// Whether to force the operation (default: false).
221    pub force: bool,
222}
223
224impl ContainerConfig {
225    /// Parses a container URI into a `ContainerConfig`.
226    ///
227    /// # Arguments
228    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
229    ///
230    /// # Errors
231    /// Returns an error if the URI scheme is not "container".
232    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
233        let parts = parse_uri(uri)?;
234        if parts.scheme != "container" {
235            return Err(CamelError::InvalidUri(format!(
236                "expected scheme 'container', got '{}'",
237                parts.scheme
238            )));
239        }
240
241        let image = parts.params.get("image").cloned();
242        let name = parts.params.get("name").cloned();
243        let cmd = parts.params.get("cmd").cloned();
244        let ports = parts.params.get("ports").cloned();
245        let env = parts.params.get("env").cloned();
246        let network = parts.params.get("network").cloned();
247        let container_id = parts.params.get("containerId").cloned();
248        let follow = parts
249            .params
250            .get("follow")
251            .map(|v| v.eq_ignore_ascii_case("true"))
252            .unwrap_or(true);
253        let timestamps = parts
254            .params
255            .get("timestamps")
256            .map(|v| v.eq_ignore_ascii_case("true"))
257            .unwrap_or(false);
258        let tail = parts.params.get("tail").cloned();
259        let auto_pull = parts
260            .params
261            .get("autoPull")
262            .map(|v| v.eq_ignore_ascii_case("true"))
263            .unwrap_or(true);
264        let auto_remove = parts
265            .params
266            .get("autoRemove")
267            .map(|v| v.eq_ignore_ascii_case("true"))
268            .unwrap_or(true);
269        // host is only set from URI param; global config defaults are applied later
270        let host = parts.params.get("host").cloned();
271        let volumes = parts.params.get("volumes").cloned();
272        let user = parts.params.get("user").cloned();
273        let workdir = parts.params.get("workdir").cloned();
274        let detach = parts
275            .params
276            .get("detach")
277            .map(|v| v.eq_ignore_ascii_case("true"))
278            .unwrap_or(false);
279        let driver = parts.params.get("driver").cloned();
280        let force = parts
281            .params
282            .get("force")
283            .map(|v| v.eq_ignore_ascii_case("true"))
284            .unwrap_or(false);
285
286        Ok(Self {
287            operation: parts.path,
288            image,
289            name,
290            host,
291            cmd,
292            ports,
293            env,
294            network,
295            container_id,
296            follow,
297            timestamps,
298            tail,
299            auto_pull,
300            auto_remove,
301            volumes,
302            user,
303            workdir,
304            detach,
305            driver,
306            force,
307        })
308    }
309
310    /// Apply global config defaults to this endpoint config.
311    /// Only sets values that are currently `None`.
312    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
313        if self.host.is_none() {
314            self.host = Some(global.docker_host.clone());
315        }
316    }
317
318    fn docker_socket_path(&self) -> Result<&str, CamelError> {
319        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
320            "npipe:////./pipe/docker_engine"
321        } else {
322            "unix:///var/run/docker.sock"
323        });
324
325        if host.starts_with("unix://") || host.starts_with("npipe://") {
326            return Ok(host);
327        }
328
329        if host.contains("://") {
330            return Err(CamelError::ProcessorError(format!(
331                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
332                host
333            )));
334        }
335
336        Ok(host)
337    }
338
339    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
340        let socket_path = self.docker_socket_path()?;
341        Docker::connect_with_socket(
342            socket_path,
343            DOCKER_CONNECT_TIMEOUT_SECS,
344            bollard::API_DEFAULT_VERSION,
345        )
346        .map_err(|e| {
347            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
348        })
349    }
350
351    /// Connects to the Docker daemon using the configured host.
352    ///
353    /// This method establishes a Unix socket connection to Docker and verifies
354    /// the connection by sending a ping request.
355    ///
356    /// # Errors
357    /// Returns an error if the connection fails or the ping request fails.
358    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
359        let docker = self.connect_docker_client()?;
360        docker
361            .ping()
362            .await
363            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
364        Ok(docker)
365    }
366
367    #[allow(clippy::type_complexity)]
368    fn parse_ports(
369        &self,
370    ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
371        let ports_str = match self.ports.as_ref() {
372            Some(s) => s,
373            None => return Ok((Vec::new(), HashMap::new())),
374        };
375
376        let mut exposed_ports: Vec<String> = Vec::new();
377        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
378
379        for mapping in ports_str.split(',') {
380            let mapping = mapping.trim();
381            if mapping.is_empty() {
382                continue;
383            }
384
385            let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
386                CamelError::ProcessorError(format!(
387                    "malformed port mapping '{}': expected hostPort:containerPort",
388                    mapping
389                ))
390            })?;
391
392            let (container_port, protocol) = if container_spec.contains('/') {
393                let parts: Vec<&str> = container_spec.split('/').collect();
394                (parts[0], parts[1])
395            } else {
396                (container_spec, "tcp")
397            };
398
399            let container_key = format!("{}/{}", container_port, protocol);
400
401            exposed_ports.push(container_key.clone());
402
403            port_bindings.insert(
404                container_key,
405                Some(vec![PortBinding {
406                    host_ip: None,
407                    host_port: Some(host_port.to_string()),
408                }]),
409            );
410        }
411
412        Ok((exposed_ports, port_bindings))
413    }
414
415    fn parse_env(&self) -> Option<Vec<String>> {
416        let env_str = self.env.as_ref()?;
417
418        let env_vars: Vec<String> = env_str
419            .split(',')
420            .map(|s| s.trim().to_string())
421            .filter(|s| !s.is_empty())
422            .collect();
423
424        if env_vars.is_empty() {
425            None
426        } else {
427            Some(env_vars)
428        }
429    }
430
431    #[cfg(test)]
432    #[allow(clippy::type_complexity)]
433    fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
434        self.volumes.as_deref().and_then(parse_volume_str)
435    }
436}
437
438/// Parses a volume specification string into bind mounts and anonymous volumes.
439///
440/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
441/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
442#[allow(clippy::type_complexity)]
443fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
444    let mut binds: Vec<String> = Vec::new();
445    let mut anonymous_volumes: Vec<String> = Vec::new();
446
447    for entry in volumes_str.split(',') {
448        let entry = entry.trim();
449        if entry.is_empty() {
450            continue;
451        }
452
453        let segments: Vec<&str> = entry.split(':').collect();
454
455        match segments.len() {
456            3 => {
457                let source = segments[0];
458                let target = segments[1];
459                let mode = segments[2];
460                if mode != "ro" && mode != "rw" {
461                    continue;
462                }
463                binds.push(format!("{}:{}:{}", source, target, mode));
464            }
465            2 => {
466                let a = segments[0];
467                let b = segments[1];
468                if b == "ro" || b == "rw" {
469                    anonymous_volumes.push(a.to_string());
470                } else {
471                    binds.push(format!("{}:{}", a, b));
472                }
473            }
474            1 => {
475                anonymous_volumes.push(segments[0].to_string());
476            }
477            _ => continue,
478        }
479    }
480
481    if binds.is_empty() && anonymous_volumes.is_empty() {
482        None
483    } else {
484        Some((binds, anonymous_volumes))
485    }
486}
487
488#[derive(Debug, Clone, Copy, PartialEq, Eq)]
489enum ProducerOperation {
490    List,
491    Run,
492    Start,
493    Stop,
494    Remove,
495    Exec,
496    NetworkCreate,
497    NetworkConnect,
498    NetworkDisconnect,
499    NetworkRemove,
500    NetworkList,
501}
502
503fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
504    match operation {
505        "list" => Ok(ProducerOperation::List),
506        "run" => Ok(ProducerOperation::Run),
507        "start" => Ok(ProducerOperation::Start),
508        "stop" => Ok(ProducerOperation::Stop),
509        "remove" => Ok(ProducerOperation::Remove),
510        "exec" => Ok(ProducerOperation::Exec),
511        "network-create" => Ok(ProducerOperation::NetworkCreate),
512        "network-connect" => Ok(ProducerOperation::NetworkConnect),
513        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
514        "network-remove" => Ok(ProducerOperation::NetworkRemove),
515        "network-list" => Ok(ProducerOperation::NetworkList),
516        _ => Err(CamelError::ProcessorError(format!(
517            "Unknown container operation: {}",
518            operation
519        ))),
520    }
521}
522
523fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
524    exchange
525        .input
526        .header(HEADER_CONTAINER_NAME)
527        .and_then(|v| v.as_str().map(|s| s.to_string()))
528        .or_else(|| config.name.clone())
529}
530
531async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
532    let images = docker
533        .list_images(None::<ListImagesOptions>)
534        .await
535        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
536
537    Ok(images.iter().any(|img| {
538        img.repo_tags
539            .iter()
540            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
541    }))
542}
543
544async fn pull_image_with_progress(
545    docker: &Docker,
546    image: &str,
547    timeout_secs: u64,
548) -> Result<(), CamelError> {
549    use futures::StreamExt;
550
551    tracing::info!("Pulling image: {}", image);
552
553    let mut stream = docker.create_image(
554        Some(CreateImageOptions {
555            from_image: Some(image.to_string()),
556            ..Default::default()
557        }),
558        None,
559        None,
560    );
561
562    let start = std::time::Instant::now();
563    let mut last_progress = std::time::Instant::now();
564
565    while let Some(item) = stream.next().await {
566        if start.elapsed().as_secs() > timeout_secs {
567            return Err(CamelError::ProcessorError(format!(
568                "Image pull timeout after {}s. Try manually: docker pull {}",
569                timeout_secs, image
570            )));
571        }
572
573        match item {
574            Ok(update) => {
575                // Log progress every 2 seconds
576                if last_progress.elapsed().as_secs() >= 2 {
577                    if let Some(status) = update.status {
578                        tracing::debug!("Pull progress: {}", status);
579                    }
580                    last_progress = std::time::Instant::now();
581                }
582            }
583            Err(e) => {
584                let err_str = e.to_string().to_lowercase();
585                if err_str.contains("unauthorized") || err_str.contains("401") {
586                    return Err(CamelError::ProcessorError(format!(
587                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
588                        image
589                    )));
590                }
591                if err_str.contains("not found") || err_str.contains("404") {
592                    return Err(CamelError::ProcessorError(format!(
593                        "Image '{}' not found in registry. Check the image name and tag",
594                        image
595                    )));
596                }
597                return Err(CamelError::ProcessorError(format!(
598                    "Failed to pull image '{}': {}",
599                    image, e
600                )));
601            }
602        }
603    }
604
605    tracing::info!("Successfully pulled image: {}", image);
606    Ok(())
607}
608
609async fn ensure_image_available(
610    docker: &Docker,
611    image: &str,
612    auto_pull: bool,
613    timeout_secs: u64,
614) -> Result<(), CamelError> {
615    if image_exists_locally(docker, image).await? {
616        tracing::debug!("Image '{}' already available locally", image);
617        return Ok(());
618    }
619
620    if !auto_pull {
621        return Err(CamelError::ProcessorError(format!(
622            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
623            image, image
624        )));
625    }
626
627    pull_image_with_progress(docker, image, timeout_secs).await
628}
629
630fn format_docker_event(event: &bollard::models::EventMessage) -> String {
631    let action = event.action.as_deref().unwrap_or("unknown");
632    let actor = event.actor.as_ref();
633
634    let container_name = actor
635        .and_then(|a| a.attributes.as_ref())
636        .and_then(|attrs| attrs.get("name"))
637        .map(|s| s.as_str())
638        .unwrap_or("unknown");
639
640    let image = actor
641        .and_then(|a| a.attributes.as_ref())
642        .and_then(|attrs| attrs.get("image"))
643        .map(|s| s.as_str())
644        .unwrap_or("");
645
646    let exit_code = actor
647        .and_then(|a| a.attributes.as_ref())
648        .and_then(|attrs| attrs.get("exitCode"))
649        .map(|s| s.as_str());
650
651    match action {
652        "create" => {
653            if image.is_empty() {
654                format!("[CREATE] Container {}", container_name)
655            } else {
656                format!("[CREATE] Container {} ({})", container_name, image)
657            }
658        }
659        "start" => format!("[START]  Container {}", container_name),
660        "die" => {
661            if let Some(code) = exit_code {
662                format!("[DIE]    Container {} (exit: {})", container_name, code)
663            } else {
664                format!("[DIE]    Container {}", container_name)
665            }
666        }
667        "destroy" => format!("[DESTROY] Container {}", container_name),
668        "stop" => format!("[STOP]   Container {}", container_name),
669        "pause" => format!("[PAUSE]  Container {}", container_name),
670        "unpause" => format!("[UNPAUSE] Container {}", container_name),
671        "restart" => format!("[RESTART] Container {}", container_name),
672        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
673    }
674}
675
676async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
677    create: CreateFn,
678    start: StartFn,
679    remove: RemoveFn,
680) -> Result<String, CamelError>
681where
682    CreateFn: FnOnce() -> CreateFut,
683    CreateFut: Future<Output = Result<String, CamelError>>,
684    StartFn: FnOnce(String) -> StartFut,
685    StartFut: Future<Output = Result<(), CamelError>>,
686    RemoveFn: FnOnce(String) -> RemoveFut,
687    RemoveFut: Future<Output = Result<(), CamelError>>,
688{
689    let container_id = create().await?;
690    if let Err(start_err) = start(container_id.clone()).await {
691        if let Err(remove_err) = remove(container_id.clone()).await {
692            return Err(CamelError::ProcessorError(format!(
693                "Failed to start container: {}. Cleanup failed: {}",
694                start_err, remove_err
695            )));
696        }
697        return Err(start_err);
698    }
699
700    Ok(container_id)
701}
702
703async fn handle_list(
704    docker: Docker,
705    _config: ContainerConfig,
706    exchange: &mut Exchange,
707) -> Result<(), CamelError> {
708    let containers = docker
709        .list_containers(None::<ListContainersOptions>)
710        .await
711        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
712
713    let json_value = serde_json::to_value(&containers).map_err(|e| {
714        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
715    })?;
716
717    exchange.input.body = Body::Json(json_value);
718    exchange.input.set_header(
719        HEADER_ACTION_RESULT,
720        serde_json::Value::String("success".to_string()),
721    );
722    Ok(())
723}
724
725async fn handle_run(
726    docker: Docker,
727    config: ContainerConfig,
728    exchange: &mut Exchange,
729) -> Result<(), CamelError> {
730    let image = exchange
731        .input
732        .header(HEADER_IMAGE)
733        .and_then(|v| v.as_str().map(|s| s.to_string()))
734        .or(config.image.clone())
735        .ok_or_else(|| {
736            CamelError::ProcessorError(
737                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
738            )
739        })?;
740
741    let image = if !image.contains(':') && !image.contains('@') {
742        format!("{}:latest", image)
743    } else {
744        image
745    };
746
747    let pull_timeout = 300;
748    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
749        .await
750        .map_err(|e| {
751            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
752        })?;
753
754    let container_name = resolve_container_name(exchange, &config);
755    let container_name_ref = container_name.as_deref().unwrap_or("");
756    let cmd_parts: Option<Vec<String>> = config
757        .cmd
758        .as_ref()
759        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
760    let auto_remove = config.auto_remove;
761    let (exposed_ports, port_bindings) = config.parse_ports()?;
762    let env_vars = config.parse_env();
763    let network_mode = config.network.clone();
764
765    let volumes_str = exchange
766        .input
767        .header(HEADER_VOLUMES)
768        .and_then(|v| v.as_str().map(|s| s.to_string()))
769        .or(config.volumes.clone());
770    let (binds, anon_volumes) = volumes_str
771        .as_deref()
772        .and_then(parse_volume_str)
773        .unwrap_or_default();
774
775    let docker_create = docker.clone();
776    let docker_start = docker.clone();
777    let docker_remove = docker.clone();
778
779    let container_id = run_container_with_cleanup(
780        move || async move {
781            let create_options = CreateContainerOptions {
782                name: Some(container_name_ref.to_string()),
783                ..Default::default()
784            };
785            let container_config = ContainerCreateBody {
786                image: Some(image.clone()),
787                cmd: cmd_parts,
788                env: env_vars,
789                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
790                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
791                host_config: Some(HostConfig {
792                    auto_remove: Some(auto_remove),
793                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
794                    network_mode,
795                    binds: if binds.is_empty() { None } else { Some(binds) },
796                    ..Default::default()
797                }),
798                ..Default::default()
799            };
800
801            let create_response = docker_create
802                .create_container(Some(create_options), container_config)
803                .await
804                .map_err(|e| {
805                    let err_str = e.to_string().to_lowercase();
806                    if err_str.contains("409") || err_str.contains("conflict") {
807                        CamelError::ProcessorError(format!(
808                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
809                            container_name_ref
810                        ))
811                    } else {
812                        CamelError::ProcessorError(format!(
813                            "Failed to create container: {}",
814                            e
815                        ))
816                    }
817                })?;
818
819            Ok(create_response.id)
820        },
821        move |container_id| async move {
822            docker_start
823                .start_container(&container_id, None::<StartContainerOptions>)
824                .await
825                .map_err(|e| {
826                    CamelError::ProcessorError(format!(
827                        "Failed to start container: {}",
828                        e
829                    ))
830                })
831        },
832        move |container_id| async move {
833            docker_remove
834                .remove_container(&container_id, None)
835                .await
836                .map_err(|e| {
837                    CamelError::ProcessorError(format!(
838                        "Failed to remove container after start failure: {}",
839                        e
840                    ))
841                })
842        },
843    )
844    .await?;
845
846    track_container(container_id.clone());
847
848    exchange
849        .input
850        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
851    exchange.input.set_header(
852        HEADER_ACTION_RESULT,
853        serde_json::Value::String("success".to_string()),
854    );
855    Ok(())
856}
857
858async fn handle_lifecycle(
859    docker: Docker,
860    _config: ContainerConfig,
861    exchange: &mut Exchange,
862    operation: ProducerOperation,
863    operation_name: &str,
864) -> Result<(), CamelError> {
865    let container_id = exchange
866        .input
867        .header(HEADER_CONTAINER_ID)
868        .and_then(|v| v.as_str().map(|s| s.to_string()))
869        .ok_or_else(|| {
870            CamelError::ProcessorError(format!(
871                "{} header is required for {} operation",
872                HEADER_CONTAINER_ID, operation_name
873            ))
874        })?;
875
876    match operation {
877        ProducerOperation::Start => {
878            docker
879                .start_container(&container_id, None::<StartContainerOptions>)
880                .await
881                .map_err(|e| {
882                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
883                })?;
884        }
885        ProducerOperation::Stop => {
886            docker
887                .stop_container(&container_id, None)
888                .await
889                .map_err(|e| {
890                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
891                })?;
892        }
893        ProducerOperation::Remove => {
894            docker
895                .remove_container(&container_id, None)
896                .await
897                .map_err(|e| {
898                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
899                })?;
900            untrack_container(&container_id);
901        }
902        _ => {}
903    }
904
905    exchange.input.set_header(
906        HEADER_ACTION_RESULT,
907        serde_json::Value::String("success".to_string()),
908    );
909    Ok(())
910}
911
912async fn handle_exec(
913    docker: Docker,
914    config: ContainerConfig,
915    exchange: &mut Exchange,
916) -> Result<(), CamelError> {
917    let container_id = exchange
918        .input
919        .header(HEADER_CONTAINER_ID)
920        .and_then(|v| v.as_str().map(|s| s.to_string()))
921        .or(config.container_id.clone())
922        .ok_or_else(|| {
923            CamelError::ProcessorError(format!(
924                "{} header or containerId param is required for exec operation",
925                HEADER_CONTAINER_ID
926            ))
927        })?;
928
929    let cmd = exchange
930        .input
931        .header(HEADER_CMD)
932        .and_then(|v| v.as_str().map(|s| s.to_string()))
933        .or(config.cmd.clone())
934        .ok_or_else(|| {
935            CamelError::ProcessorError(
936                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
937            )
938        })?;
939
940    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
941    let env_vars = config.parse_env();
942
943    let exec_config = bollard::exec::CreateExecOptions {
944        cmd: Some(cmd_parts),
945        env: env_vars,
946        user: config.user.clone(),
947        working_dir: config.workdir.clone(),
948        attach_stdout: Some(true),
949        attach_stderr: Some(true),
950        ..Default::default()
951    };
952
953    let create_result = docker
954        .create_exec(&container_id, exec_config)
955        .await
956        .map_err(|e| {
957            let err_str = e.to_string().to_lowercase();
958            if err_str.contains("404") || err_str.contains("no such") {
959                CamelError::ProcessorError(format!(
960                    "Container '{}' not found for exec",
961                    container_id
962                ))
963            } else {
964                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
965            }
966        })?;
967
968    let exec_id = create_result.id;
969
970    if config.detach {
971        docker
972            .start_exec(
973                &exec_id,
974                Some(bollard::exec::StartExecOptions {
975                    detach: true,
976                    ..Default::default()
977                }),
978            )
979            .await
980            .map_err(|e| {
981                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
982            })?;
983
984        exchange
985            .input
986            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
987        exchange
988            .input
989            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
990    } else {
991        let start_result = docker
992            .start_exec(&exec_id, None)
993            .await
994            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
995
996        let mut output = String::new();
997
998        match start_result {
999            bollard::exec::StartExecResults::Attached {
1000                output: mut stream, ..
1001            } => {
1002                use futures::StreamExt;
1003                while let Some(msg) = stream.next().await {
1004                    match msg {
1005                        Ok(bollard::container::LogOutput::StdOut { message }) => {
1006                            output.push_str(&String::from_utf8_lossy(&message));
1007                        }
1008                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1009                            output.push_str(&String::from_utf8_lossy(&message));
1010                        }
1011                        Ok(_) => {}
1012                        Err(e) => {
1013                            output.push_str(&format!("[error reading stream: {}]", e));
1014                        }
1015                    }
1016                }
1017            }
1018            bollard::exec::StartExecResults::Detached => {}
1019        }
1020
1021        let inspect = docker
1022            .inspect_exec(&exec_id)
1023            .await
1024            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1025
1026        let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1027            CamelError::ProcessorError("container exec returned no exit code".into())
1028        })?;
1029
1030        let output = output.trim_end().to_string();
1031        exchange.input.body = Body::Text(output);
1032        exchange.input.set_header(
1033            HEADER_EXIT_CODE,
1034            serde_json::Value::Number(exit_code.into()),
1035        );
1036        exchange
1037            .input
1038            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1039    }
1040
1041    exchange.input.set_header(
1042        HEADER_ACTION_RESULT,
1043        serde_json::Value::String("success".to_string()),
1044    );
1045    Ok(())
1046}
1047
1048async fn handle_network_create(
1049    docker: Docker,
1050    config: ContainerConfig,
1051    exchange: &mut Exchange,
1052) -> Result<(), CamelError> {
1053    let network_name = exchange
1054        .input
1055        .header(HEADER_CONTAINER_NAME)
1056        .and_then(|v| v.as_str().map(|s| s.to_string()))
1057        .or(config.name.clone())
1058        .ok_or_else(|| {
1059            CamelError::ProcessorError(
1060                "CamelContainerName header or name param is required for network-create"
1061                    .to_string(),
1062            )
1063        })?;
1064
1065    let driver = config.driver.as_deref().unwrap_or("bridge");
1066
1067    let options = NetworkCreateRequest {
1068        name: network_name.clone(),
1069        driver: Some(driver.to_string()),
1070        ..Default::default()
1071    };
1072
1073    let result = docker.create_network(options).await.map_err(|e| {
1074        let err_str = e.to_string().to_lowercase();
1075        if err_str.contains("409") || err_str.contains("already exists") {
1076            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1077        } else {
1078            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1079        }
1080    })?;
1081
1082    let network_id = result.id.clone();
1083    let json_value = serde_json::to_value(&result).map_err(|e| {
1084        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1085    })?;
1086
1087    exchange.input.body = Body::Json(json_value);
1088    exchange
1089        .input
1090        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1091    exchange.input.set_header(
1092        HEADER_ACTION_RESULT,
1093        serde_json::Value::String("success".to_string()),
1094    );
1095    Ok(())
1096}
1097
1098async fn handle_network_connect(
1099    docker: Docker,
1100    config: ContainerConfig,
1101    exchange: &mut Exchange,
1102) -> Result<(), CamelError> {
1103    let network = exchange
1104        .input
1105        .header(HEADER_NETWORK)
1106        .and_then(|v| v.as_str().map(|s| s.to_string()))
1107        .or(config.network.clone())
1108        .ok_or_else(|| {
1109            CamelError::ProcessorError(
1110                "CamelContainerNetwork header or network param is required for network-connect"
1111                    .to_string(),
1112            )
1113        })?;
1114
1115    let container = exchange
1116        .input
1117        .header(HEADER_CONTAINER_ID)
1118        .and_then(|v| v.as_str().map(|s| s.to_string()))
1119        .or(config.container_id.clone())
1120        .ok_or_else(|| {
1121            CamelError::ProcessorError(
1122                "CamelContainerId header or container param is required for network-connect"
1123                    .to_string(),
1124            )
1125        })?;
1126
1127    docker
1128        .connect_network(
1129            &network,
1130            NetworkConnectRequest {
1131                container,
1132                ..Default::default()
1133            },
1134        )
1135        .await
1136        .map_err(|e| {
1137            let err_str = e.to_string().to_lowercase();
1138            if err_str.contains("404") || err_str.contains("not found") {
1139                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1140            } else {
1141                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1142            }
1143        })?;
1144
1145    exchange.input.set_header(
1146        HEADER_ACTION_RESULT,
1147        serde_json::Value::String("success".to_string()),
1148    );
1149    Ok(())
1150}
1151
1152async fn handle_network_disconnect(
1153    docker: Docker,
1154    config: ContainerConfig,
1155    exchange: &mut Exchange,
1156) -> Result<(), CamelError> {
1157    let network = exchange
1158        .input
1159        .header(HEADER_NETWORK)
1160        .and_then(|v| v.as_str().map(|s| s.to_string()))
1161        .or(config.network.clone())
1162        .ok_or_else(|| {
1163            CamelError::ProcessorError(
1164                "CamelContainerNetwork header or network param is required for network-disconnect"
1165                    .to_string(),
1166            )
1167        })?;
1168
1169    let container = exchange
1170        .input
1171        .header(HEADER_CONTAINER_ID)
1172        .and_then(|v| v.as_str().map(|s| s.to_string()))
1173        .or(config.container_id.clone())
1174        .ok_or_else(|| {
1175            CamelError::ProcessorError(
1176                "CamelContainerId header or container param is required for network-disconnect"
1177                    .to_string(),
1178            )
1179        })?;
1180
1181    docker
1182        .disconnect_network(
1183            &network,
1184            NetworkDisconnectRequest {
1185                container,
1186                force: Some(config.force),
1187            },
1188        )
1189        .await
1190        .map_err(|e| {
1191            let err_str = e.to_string().to_lowercase();
1192            if err_str.contains("404") || err_str.contains("not found") {
1193                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1194            } else {
1195                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1196            }
1197        })?;
1198
1199    exchange.input.set_header(
1200        HEADER_ACTION_RESULT,
1201        serde_json::Value::String("success".to_string()),
1202    );
1203    Ok(())
1204}
1205
1206async fn handle_network_remove(
1207    docker: Docker,
1208    config: ContainerConfig,
1209    exchange: &mut Exchange,
1210) -> Result<(), CamelError> {
1211    let network = exchange
1212        .input
1213        .header(HEADER_NETWORK)
1214        .and_then(|v| v.as_str().map(|s| s.to_string()))
1215        .or(config.network.clone())
1216        .ok_or_else(|| {
1217            CamelError::ProcessorError(
1218                "CamelContainerNetwork header or network param is required for network-remove"
1219                    .to_string(),
1220            )
1221        })?;
1222
1223    docker.remove_network(&network).await.map_err(|e| {
1224        let err_str = e.to_string().to_lowercase();
1225        if err_str.contains("404") || err_str.contains("not found") {
1226            CamelError::ProcessorError(format!("Network '{}' not found", network))
1227        } else if err_str.contains("409") || err_str.contains("in use") {
1228            CamelError::ProcessorError(format!(
1229                "Network '{}' is in use and cannot be removed",
1230                network
1231            ))
1232        } else {
1233            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1234        }
1235    })?;
1236
1237    exchange.input.set_header(
1238        HEADER_ACTION_RESULT,
1239        serde_json::Value::String("success".to_string()),
1240    );
1241    Ok(())
1242}
1243
1244async fn handle_network_list(
1245    docker: Docker,
1246    _config: ContainerConfig,
1247    exchange: &mut Exchange,
1248) -> Result<(), CamelError> {
1249    let networks = docker
1250        .list_networks(None::<ListNetworksOptions>)
1251        .await
1252        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1253
1254    let json_value = serde_json::to_value(&networks)
1255        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1256
1257    exchange.input.body = Body::Json(json_value);
1258    exchange.input.set_header(
1259        HEADER_ACTION_RESULT,
1260        serde_json::Value::String("success".to_string()),
1261    );
1262    Ok(())
1263}
1264
1265/// Producer for executing container operations.
1266///
1267/// This producer handles synchronous container operations like listing,
1268/// creating, starting, stopping, and removing containers.
1269#[derive(Clone)]
1270pub struct ContainerProducer {
1271    config: ContainerConfig,
1272    docker: Docker,
1273}
1274
1275impl Service<Exchange> for ContainerProducer {
1276    type Response = Exchange;
1277    type Error = CamelError;
1278    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1279
1280    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1281        Poll::Ready(Ok(()))
1282    }
1283
1284    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1285        let config = self.config.clone();
1286        let docker = self.docker.clone();
1287        Box::pin(async move {
1288            let operation_name = exchange
1289                .input
1290                .header(HEADER_ACTION)
1291                .and_then(|v| v.as_str().map(|s| s.to_string()))
1292                .unwrap_or_else(|| config.operation.clone());
1293
1294            let operation = parse_producer_operation(&operation_name)?;
1295
1296            match operation {
1297                ProducerOperation::List => {
1298                    handle_list(docker, config, &mut exchange).await?;
1299                }
1300                ProducerOperation::Run => {
1301                    handle_run(docker, config, &mut exchange).await?;
1302                }
1303                ProducerOperation::Start => {
1304                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1305                        .await?;
1306                }
1307                ProducerOperation::Stop => {
1308                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1309                        .await?;
1310                }
1311                ProducerOperation::Remove => {
1312                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1313                        .await?;
1314                }
1315                ProducerOperation::Exec => {
1316                    handle_exec(docker, config, &mut exchange).await?;
1317                }
1318                ProducerOperation::NetworkCreate => {
1319                    handle_network_create(docker, config, &mut exchange).await?;
1320                }
1321                ProducerOperation::NetworkConnect => {
1322                    handle_network_connect(docker, config, &mut exchange).await?;
1323                }
1324                ProducerOperation::NetworkDisconnect => {
1325                    handle_network_disconnect(docker, config, &mut exchange).await?;
1326                }
1327                ProducerOperation::NetworkRemove => {
1328                    handle_network_remove(docker, config, &mut exchange).await?;
1329                }
1330                ProducerOperation::NetworkList => {
1331                    handle_network_list(docker, config, &mut exchange).await?;
1332                }
1333            }
1334
1335            Ok(exchange)
1336        })
1337    }
1338}
1339
1340/// Consumer for receiving Docker container events or logs.
1341///
1342/// This consumer subscribes to Docker events or container logs and forwards them
1343/// to the route as exchanges. It implements automatic reconnection on connection failures.
1344pub struct ContainerConsumer {
1345    config: ContainerConfig,
1346}
1347
1348#[async_trait]
1349impl Consumer for ContainerConsumer {
1350    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1351        match self.config.operation.as_str() {
1352            "events" => self.start_events_consumer(context).await,
1353            "logs" => self.start_logs_consumer(context).await,
1354            _ => Err(CamelError::EndpointCreationFailed(format!(
1355                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1356                self.config.operation
1357            ))),
1358        }
1359    }
1360
1361    async fn stop(&mut self) -> Result<(), CamelError> {
1362        Ok(())
1363    }
1364
1365    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1366        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1367    }
1368}
1369
1370impl ContainerConsumer {
1371    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1372        use futures::StreamExt;
1373
1374        loop {
1375            if context.is_cancelled() {
1376                tracing::info!("Container events consumer shutting down");
1377                return Ok(());
1378            }
1379
1380            let docker = match self.config.connect_docker().await {
1381                Ok(d) => d,
1382                Err(e) => {
1383                    tracing::error!(
1384                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
1385                        e
1386                    );
1387                    tokio::select! {
1388                        _ = context.cancelled() => {
1389                            tracing::info!("Container events consumer shutting down");
1390                            return Ok(());
1391                        }
1392                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1393                    }
1394                    continue;
1395                }
1396            };
1397
1398            let mut event_stream = docker.events(None::<EventsOptions>);
1399
1400            loop {
1401                tokio::select! {
1402                    _ = context.cancelled() => {
1403                        tracing::info!("Container events consumer shutting down");
1404                        return Ok(());
1405                    }
1406
1407                    msg = event_stream.next() => {
1408                        match msg {
1409                            Some(Ok(event)) => {
1410                                let formatted = format_docker_event(&event);
1411                                let message = Message::new(Body::Text(formatted));
1412                                let exchange = Exchange::new(message);
1413
1414                                if let Err(e) = context.send(exchange).await {
1415                                    tracing::error!("Failed to send exchange: {:?}", e);
1416                                    break;
1417                                }
1418                            }
1419                            Some(Err(e)) => {
1420                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1421                                break;
1422                            }
1423                            None => {
1424                                tracing::info!("Docker event stream ended. Reconnecting...");
1425                                break;
1426                            }
1427                        }
1428                    }
1429                }
1430            }
1431
1432            tokio::select! {
1433                _ = context.cancelled() => {
1434                    tracing::info!("Container events consumer shutting down");
1435                    return Ok(());
1436                }
1437                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1438            }
1439        }
1440    }
1441
1442    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1443        use futures::StreamExt;
1444
1445        let container_id = self.config.container_id.clone().ok_or_else(|| {
1446            CamelError::EndpointCreationFailed(
1447                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1448                    .to_string(),
1449            )
1450        })?;
1451
1452        loop {
1453            if context.is_cancelled() {
1454                tracing::info!("Container logs consumer shutting down");
1455                return Ok(());
1456            }
1457
1458            let docker = match self.config.connect_docker().await {
1459                Ok(d) => d,
1460                Err(e) => {
1461                    tracing::error!(
1462                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1463                        e
1464                    );
1465                    tokio::select! {
1466                        _ = context.cancelled() => {
1467                            tracing::info!("Container logs consumer shutting down");
1468                            return Ok(());
1469                        }
1470                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1471                    }
1472                    continue;
1473                }
1474            };
1475
1476            let tail = self
1477                .config
1478                .tail
1479                .clone()
1480                .unwrap_or_else(|| "all".to_string());
1481
1482            let options = LogsOptions {
1483                follow: self.config.follow,
1484                stdout: true,
1485                stderr: true,
1486                timestamps: self.config.timestamps,
1487                tail,
1488                ..Default::default()
1489            };
1490
1491            let mut log_stream = docker.logs(&container_id, Some(options));
1492            let container_id_header = container_id.clone();
1493
1494            loop {
1495                tokio::select! {
1496                    _ = context.cancelled() => {
1497                        tracing::info!("Container logs consumer shutting down");
1498                        return Ok(());
1499                    }
1500
1501                    msg = log_stream.next() => {
1502                        match msg {
1503                            Some(Ok(log_output)) => {
1504                                let (stream_type, content) = match log_output {
1505                                    bollard::container::LogOutput::StdOut { message } => {
1506                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1507                                    }
1508                                    bollard::container::LogOutput::StdErr { message } => {
1509                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1510                                    }
1511                                    bollard::container::LogOutput::Console { message } => {
1512                                        ("console", String::from_utf8_lossy(&message).into_owned())
1513                                    }
1514                                    bollard::container::LogOutput::StdIn { message } => {
1515                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1516                                    }
1517                                };
1518
1519                                let content = content.trim_end();
1520                                if content.is_empty() {
1521                                    continue;
1522                                }
1523
1524                                let mut message = Message::new(Body::Text(content.to_string()));
1525                                message.set_header(
1526                                    HEADER_CONTAINER_ID,
1527                                    serde_json::Value::String(container_id_header.clone()),
1528                                );
1529                                message.set_header(
1530                                    HEADER_LOG_STREAM,
1531                                    serde_json::Value::String(stream_type.to_string()),
1532                                );
1533
1534                                if self.config.timestamps
1535                                    && let Some(ts) = extract_timestamp(content) {
1536                                        message.set_header(
1537                                            HEADER_LOG_TIMESTAMP,
1538                                            serde_json::Value::String(ts),
1539                                        );
1540                                    }
1541
1542                                let exchange = Exchange::new(message);
1543
1544                                if let Err(e) = context.send(exchange).await {
1545                                    tracing::error!("Failed to send log exchange: {:?}", e);
1546                                    break;
1547                                }
1548                            }
1549                            Some(Err(e)) => {
1550                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1551                                break;
1552                            }
1553                            None => {
1554                                if self.config.follow {
1555                                    tracing::info!("Docker log stream ended. Reconnecting...");
1556                                    break;
1557                                } else {
1558                                    tracing::info!("Container logs consumer finished (follow=false)");
1559                                    return Ok(());
1560                                }
1561                            }
1562                        }
1563                    }
1564                }
1565            }
1566
1567            tokio::select! {
1568                _ = context.cancelled() => {
1569                    tracing::info!("Container logs consumer shutting down");
1570                    return Ok(());
1571                }
1572                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1573            }
1574        }
1575    }
1576}
1577
1578fn extract_timestamp(log_line: &str) -> Option<String> {
1579    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1580    if parts.len() > 1 && parts[0].contains('T') {
1581        Some(parts[0].to_string())
1582    } else {
1583        None
1584    }
1585}
1586
1587/// Component for creating container endpoints.
1588///
1589/// This component handles URIs with the "container" scheme and creates
1590/// appropriate producer and consumer endpoints for Docker operations.
1591///
1592/// Containers created via `run` operation are tracked globally and can be
1593/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1594pub struct ContainerComponent {
1595    config: Option<ContainerGlobalConfig>,
1596}
1597
1598impl ContainerComponent {
1599    /// Creates a new container component instance without global config.
1600    pub fn new() -> Self {
1601        Self { config: None }
1602    }
1603
1604    /// Creates a container component with the given global config.
1605    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1606        Self {
1607            config: Some(config),
1608        }
1609    }
1610
1611    /// Creates a container component with optional global config.
1612    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1613        Self { config }
1614    }
1615}
1616
1617impl Default for ContainerComponent {
1618    fn default() -> Self {
1619        Self::new()
1620    }
1621}
1622
1623impl Component for ContainerComponent {
1624    fn scheme(&self) -> &str {
1625        "container"
1626    }
1627
1628    fn create_endpoint(
1629        &self,
1630        uri: &str,
1631        ctx: &dyn camel_component_api::ComponentContext,
1632    ) -> Result<Box<dyn Endpoint>, CamelError> {
1633        let mut config = ContainerConfig::from_uri(uri)?;
1634        // Apply global defaults if present and URI didn't set them
1635        if let Some(ref global) = self.config {
1636            config.apply_global_defaults(global);
1637        }
1638        let health_check = ContainerHealthCheck::new(&config);
1639        ctx.register_current_route_health_check(Arc::new(health_check));
1640        Ok(Box::new(ContainerEndpoint {
1641            uri: uri.to_string(),
1642            config,
1643        }))
1644    }
1645}
1646
1647/// Endpoint for container operations.
1648///
1649/// This endpoint creates producers for executing container operations
1650/// and consumers for receiving container events.
1651// TODO(CON-003): Forward container health status (inspect healthcheck / Health field) to
1652// Camel's health subsystem so the route can react to unhealthy containers.
1653pub struct ContainerEndpoint {
1654    uri: String,
1655    config: ContainerConfig,
1656}
1657
1658impl ContainerEndpoint {
1659    /// Returns the Docker host configured for this endpoint.
1660    /// Returns `None` if not set (for testing purposes).
1661    pub fn docker_host(&self) -> Option<&str> {
1662        self.config.host.as_deref()
1663    }
1664}
1665
1666impl Endpoint for ContainerEndpoint {
1667    fn uri(&self) -> &str {
1668        &self.uri
1669    }
1670
1671    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1672        Ok(Box::new(ContainerConsumer {
1673            config: self.config.clone(),
1674        }))
1675    }
1676
1677    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1678        let docker = self.config.connect_docker_client()?;
1679        Ok(BoxProcessor::new(ContainerProducer {
1680            config: self.config.clone(),
1681            docker,
1682        }))
1683    }
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688    use super::*;
1689    use camel_component_api::NoOpComponentContext;
1690
1691    #[test]
1692    fn test_container_config() {
1693        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1694        assert_eq!(config.operation, "run");
1695        assert_eq!(config.image.as_deref(), Some("alpine"));
1696        // host is None by default; global config applies it later
1697        assert!(config.host.is_none());
1698    }
1699
1700    #[test]
1701    fn test_global_config_applied_to_endpoint() {
1702        // When global config is set and URI doesn't specify host,
1703        // apply_global_defaults should set host from global config.
1704        let global =
1705            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1706        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1707        assert!(
1708            config.host.is_none(),
1709            "URI without ?host= should leave host as None"
1710        );
1711        config.apply_global_defaults(&global);
1712        assert_eq!(
1713            config.host.as_deref(),
1714            Some("unix:///custom/docker.sock"),
1715            "global docker_host must be applied when URI did not set host"
1716        );
1717    }
1718
1719    #[test]
1720    fn test_uri_param_wins_over_global_config() {
1721        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1722        let global =
1723            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1724        let mut config =
1725            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1726                .unwrap();
1727        assert_eq!(
1728            config.host.as_deref(),
1729            Some("unix:///override.sock"),
1730            "URI-set host should be parsed correctly"
1731        );
1732        config.apply_global_defaults(&global);
1733        assert_eq!(
1734            config.host.as_deref(),
1735            Some("unix:///override.sock"),
1736            "global config must NOT override a host already set by URI"
1737        );
1738    }
1739
1740    #[test]
1741    fn test_container_config_parses_name() {
1742        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1743        assert_eq!(config.name.as_deref(), Some("my-container"));
1744    }
1745
1746    #[test]
1747    fn test_parse_producer_operation_known() {
1748        assert_eq!(
1749            parse_producer_operation("list").unwrap(),
1750            ProducerOperation::List
1751        );
1752        assert_eq!(
1753            parse_producer_operation("run").unwrap(),
1754            ProducerOperation::Run
1755        );
1756        assert_eq!(
1757            parse_producer_operation("start").unwrap(),
1758            ProducerOperation::Start
1759        );
1760        assert_eq!(
1761            parse_producer_operation("stop").unwrap(),
1762            ProducerOperation::Stop
1763        );
1764        assert_eq!(
1765            parse_producer_operation("remove").unwrap(),
1766            ProducerOperation::Remove
1767        );
1768    }
1769
1770    #[test]
1771    fn test_parse_producer_operation_unknown() {
1772        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1773        match err {
1774            CamelError::ProcessorError(msg) => {
1775                assert!(
1776                    msg.contains("Unknown container operation"),
1777                    "Unexpected error message: {}",
1778                    msg
1779                );
1780            }
1781            _ => panic!("Expected ProcessorError for unknown operation"),
1782        }
1783    }
1784
1785    #[test]
1786    fn test_parse_producer_operation_new_variants() {
1787        assert_eq!(
1788            parse_producer_operation("exec").unwrap(),
1789            ProducerOperation::Exec
1790        );
1791        assert_eq!(
1792            parse_producer_operation("network-create").unwrap(),
1793            ProducerOperation::NetworkCreate
1794        );
1795        assert_eq!(
1796            parse_producer_operation("network-connect").unwrap(),
1797            ProducerOperation::NetworkConnect
1798        );
1799        assert_eq!(
1800            parse_producer_operation("network-disconnect").unwrap(),
1801            ProducerOperation::NetworkDisconnect
1802        );
1803        assert_eq!(
1804            parse_producer_operation("network-remove").unwrap(),
1805            ProducerOperation::NetworkRemove
1806        );
1807        assert_eq!(
1808            parse_producer_operation("network-list").unwrap(),
1809            ProducerOperation::NetworkList
1810        );
1811    }
1812
1813    #[test]
1814    fn test_resolve_container_name_header_overrides_config() {
1815        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1816        let mut exchange = Exchange::new(Message::new(""));
1817        exchange.input.set_header(
1818            HEADER_CONTAINER_NAME,
1819            serde_json::Value::String("header-name".to_string()),
1820        );
1821
1822        let resolved = resolve_container_name(&exchange, &config);
1823        assert_eq!(resolved.as_deref(), Some("header-name"));
1824    }
1825
1826    #[test]
1827    fn test_container_config_rejects_tcp_host() {
1828        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1829        let err = config.connect_docker_client().unwrap_err();
1830        match err {
1831            CamelError::ProcessorError(msg) => {
1832                assert!(
1833                    msg.to_lowercase().contains("tcp"),
1834                    "Expected TCP scheme error, got: {}",
1835                    msg
1836                );
1837            }
1838            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1839        }
1840    }
1841
1842    #[tokio::test]
1843    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1844        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1845        let remove_called_clone = remove_called.clone();
1846
1847        let result = run_container_with_cleanup(
1848            || async { Ok("container-123".to_string()) },
1849            |_id| async move {
1850                Err(CamelError::ProcessorError(
1851                    "Failed to start container".to_string(),
1852                ))
1853            },
1854            move |_id| {
1855                let remove_called_inner = remove_called_clone.clone();
1856                async move {
1857                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1858                    Ok(())
1859                }
1860            },
1861        )
1862        .await;
1863
1864        assert!(result.is_err(), "Expected start failure to bubble up");
1865        assert!(
1866            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1867            "Expected cleanup to remove container"
1868        );
1869    }
1870
1871    #[test]
1872    fn test_container_component_creates_endpoint() {
1873        let component = ContainerComponent::new();
1874        assert_eq!(component.scheme(), "container");
1875        let ctx = NoOpComponentContext;
1876        let endpoint = component
1877            .create_endpoint("container:run?image=alpine", &ctx)
1878            .unwrap();
1879        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1880    }
1881
1882    #[test]
1883    fn test_container_config_parses_ports() {
1884        let config =
1885            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1886        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1887    }
1888
1889    #[test]
1890    fn test_container_config_parses_env() {
1891        let config =
1892            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1893        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1894    }
1895
1896    #[test]
1897    fn test_container_config_parses_logs_options() {
1898        let config = ContainerConfig::from_uri(
1899            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1900        )
1901        .unwrap();
1902        assert_eq!(config.operation, "logs");
1903        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1904        assert!(config.follow);
1905        assert!(config.timestamps);
1906        assert_eq!(config.tail.as_deref(), Some("100"));
1907    }
1908
1909    #[test]
1910    fn test_container_config_logs_defaults() {
1911        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1912        assert!(config.follow); // default: true
1913        assert!(!config.timestamps); // default: false
1914        assert!(config.tail.is_none()); // default: None (all)
1915    }
1916
1917    #[test]
1918    fn test_parse_ports_single() {
1919        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1920        let (exposed, bindings) = config.parse_ports().unwrap();
1921
1922        assert!(exposed.contains(&"80/tcp".to_string()));
1923        assert!(bindings.contains_key("80/tcp"));
1924
1925        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1926        assert_eq!(binding.len(), 1);
1927        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1928    }
1929
1930    #[test]
1931    fn test_parse_ports_multiple() {
1932        let config =
1933            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1934        let (exposed, bindings) = config.parse_ports().unwrap();
1935
1936        assert!(exposed.contains(&"80/tcp".to_string()));
1937        assert!(exposed.contains(&"443/tcp".to_string()));
1938        assert_eq!(bindings.len(), 2);
1939    }
1940
1941    #[test]
1942    fn test_parse_ports_with_protocol() {
1943        let config =
1944            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1945                .unwrap();
1946        let (exposed, _bindings) = config.parse_ports().unwrap();
1947
1948        assert!(exposed.contains(&"80/tcp".to_string()));
1949        assert!(exposed.contains(&"53/udp".to_string()));
1950    }
1951
1952    #[test]
1953    fn test_parse_ports_none() {
1954        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1955        let (exposed, bindings) = config.parse_ports().unwrap();
1956        assert!(exposed.is_empty());
1957        assert!(bindings.is_empty());
1958    }
1959
1960    #[test]
1961    fn test_parse_env_single() {
1962        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1963        let env = config.parse_env().unwrap();
1964
1965        assert_eq!(env.len(), 1);
1966        assert_eq!(env[0], "FOO=bar");
1967    }
1968
1969    #[test]
1970    fn test_parse_env_multiple() {
1971        let config =
1972            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1973                .unwrap();
1974        let env = config.parse_env().unwrap();
1975
1976        assert_eq!(env.len(), 3);
1977        assert!(env.contains(&"FOO=bar".to_string()));
1978        assert!(env.contains(&"BAZ=qux".to_string()));
1979        assert!(env.contains(&"NUM=123".to_string()));
1980    }
1981
1982    #[test]
1983    fn test_parse_env_none() {
1984        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1985        assert!(config.parse_env().is_none());
1986    }
1987
1988    use camel_component_api::Message;
1989    use std::sync::Arc;
1990
1991    #[tokio::test]
1992    async fn test_container_producer_resolves_operation_from_header() {
1993        // Try to connect to Docker - if fails, return early
1994        let docker = match Docker::connect_with_local_defaults() {
1995            Ok(d) => d,
1996            Err(_) => {
1997                eprintln!("Skipping test: Could not connect to Docker daemon");
1998                return;
1999            }
2000        };
2001
2002        if docker.ping().await.is_err() {
2003            eprintln!("Skipping test: Docker daemon not responding to ping");
2004            return;
2005        }
2006
2007        let component = ContainerComponent::new();
2008        let ctx = NoOpComponentContext;
2009        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2010
2011        let ctx = ProducerContext::new();
2012        let mut producer = endpoint.create_producer(&ctx).unwrap();
2013
2014        let mut exchange = Exchange::new(Message::new(""));
2015        exchange
2016            .input
2017            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2018
2019        use tower::ServiceExt;
2020        let result = producer
2021            .ready()
2022            .await
2023            .unwrap()
2024            .call(exchange)
2025            .await
2026            .unwrap();
2027
2028        // For list operation, the result header should be on the input message
2029        assert_eq!(
2030            result
2031                .input
2032                .header(HEADER_ACTION_RESULT)
2033                .map(|v| v.as_str().unwrap()),
2034            Some("success")
2035        );
2036    }
2037
2038    #[tokio::test]
2039    async fn test_container_producer_connection_error_on_invalid_host() {
2040        // Test that an invalid host (nonexistent socket) results in a connection error
2041        let component = ContainerComponent::new();
2042        let ctx = NoOpComponentContext;
2043        let endpoint = component
2044            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2045            .unwrap();
2046
2047        let ctx = ProducerContext::new();
2048        let result = endpoint.create_producer(&ctx);
2049
2050        // The producer should return an error because it cannot connect to the invalid socket
2051        assert!(
2052            result.is_err(),
2053            "Expected error when connecting to invalid host"
2054        );
2055        let err = result.unwrap_err();
2056        match &err {
2057            CamelError::ProcessorError(msg) => {
2058                assert!(
2059                    msg.to_lowercase().contains("connection")
2060                        || msg.to_lowercase().contains("connect")
2061                        || msg.to_lowercase().contains("socket")
2062                        || msg.contains("docker"),
2063                    "Error message should indicate connection failure, got: {}",
2064                    msg
2065                );
2066            }
2067            _ => panic!("Expected ProcessorError, got: {:?}", err),
2068        }
2069    }
2070
2071    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2072    #[tokio::test]
2073    async fn test_container_producer_lifecycle_operations_missing_id() {
2074        // Try to connect to Docker - if fails, return early
2075        let docker = match Docker::connect_with_local_defaults() {
2076            Ok(d) => d,
2077            Err(_) => {
2078                eprintln!("Skipping test: Could not connect to Docker daemon");
2079                return;
2080            }
2081        };
2082
2083        if docker.ping().await.is_err() {
2084            eprintln!("Skipping test: Docker daemon not responding to ping");
2085            return;
2086        }
2087
2088        let component = ContainerComponent::new();
2089        let ctx = NoOpComponentContext;
2090        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2091        let ctx = ProducerContext::new();
2092        let mut producer = endpoint.create_producer(&ctx).unwrap();
2093
2094        // Test each lifecycle operation without CamelContainerId header
2095        for operation in ["start", "stop", "remove"] {
2096            let mut exchange = Exchange::new(Message::new(""));
2097            exchange.input.set_header(
2098                HEADER_ACTION,
2099                serde_json::Value::String(operation.to_string()),
2100            );
2101            // Deliberately NOT setting CamelContainerId header
2102
2103            use tower::ServiceExt;
2104            let result = producer.ready().await.unwrap().call(exchange).await;
2105
2106            assert!(
2107                result.is_err(),
2108                "Expected error for {} operation without CamelContainerId",
2109                operation
2110            );
2111            let err = result.unwrap_err();
2112            match &err {
2113                CamelError::ProcessorError(msg) => {
2114                    assert!(
2115                        msg.contains(HEADER_CONTAINER_ID),
2116                        "Error message should mention {}, got: {}",
2117                        HEADER_CONTAINER_ID,
2118                        msg
2119                    );
2120                }
2121                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2122            }
2123        }
2124    }
2125
2126    /// Test that stop operation returns an error for a nonexistent container.
2127    #[tokio::test]
2128    async fn test_container_producer_stop_nonexistent() {
2129        // Try to connect to Docker - if fails, return early
2130        let docker = match Docker::connect_with_local_defaults() {
2131            Ok(d) => d,
2132            Err(_) => {
2133                eprintln!("Skipping test: Could not connect to Docker daemon");
2134                return;
2135            }
2136        };
2137
2138        if docker.ping().await.is_err() {
2139            eprintln!("Skipping test: Docker daemon not responding to ping");
2140            return;
2141        }
2142
2143        let component = ContainerComponent::new();
2144        let ctx = NoOpComponentContext;
2145        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2146        let ctx = ProducerContext::new();
2147        let mut producer = endpoint.create_producer(&ctx).unwrap();
2148
2149        let mut exchange = Exchange::new(Message::new(""));
2150        exchange
2151            .input
2152            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2153        exchange.input.set_header(
2154            HEADER_CONTAINER_ID,
2155            serde_json::Value::String("nonexistent-container-123".into()),
2156        );
2157
2158        use tower::ServiceExt;
2159        let result = producer.ready().await.unwrap().call(exchange).await;
2160
2161        assert!(
2162            result.is_err(),
2163            "Expected error when stopping nonexistent container"
2164        );
2165        let err = result.unwrap_err();
2166        match &err {
2167            CamelError::ProcessorError(msg) => {
2168                // Docker API returns 404 with "No such container" message
2169                assert!(
2170                    msg.to_lowercase().contains("no such container")
2171                        || msg.to_lowercase().contains("not found")
2172                        || msg.contains("404"),
2173                    "Error message should indicate container not found, got: {}",
2174                    msg
2175                );
2176            }
2177            _ => panic!("Expected ProcessorError, got: {:?}", err),
2178        }
2179    }
2180
2181    /// Test that run operation returns an error when no image is provided.
2182    #[tokio::test]
2183    async fn test_container_producer_run_missing_image() {
2184        // Try to connect to Docker - if fails, return early
2185        let docker = match Docker::connect_with_local_defaults() {
2186            Ok(d) => d,
2187            Err(_) => {
2188                eprintln!("Skipping test: Could not connect to Docker daemon");
2189                return;
2190            }
2191        };
2192
2193        if docker.ping().await.is_err() {
2194            eprintln!("Skipping test: Docker daemon not responding to ping");
2195            return;
2196        }
2197
2198        // Create producer without an image in the URI
2199        let component = ContainerComponent::new();
2200        let ctx = NoOpComponentContext;
2201        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2202        let ctx = ProducerContext::new();
2203        let mut producer = endpoint.create_producer(&ctx).unwrap();
2204
2205        let mut exchange = Exchange::new(Message::new(""));
2206        exchange
2207            .input
2208            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2209        // Deliberately NOT setting CamelContainerImage header
2210
2211        use tower::ServiceExt;
2212        let result = producer.ready().await.unwrap().call(exchange).await;
2213
2214        assert!(
2215            result.is_err(),
2216            "Expected error for run operation without image"
2217        );
2218        let err = result.unwrap_err();
2219        match &err {
2220            CamelError::ProcessorError(msg) => {
2221                assert!(
2222                    msg.to_lowercase().contains("image"),
2223                    "Error message should mention 'image', got: {}",
2224                    msg
2225                );
2226            }
2227            _ => panic!("Expected ProcessorError, got: {:?}", err),
2228        }
2229    }
2230
2231    /// Test that run operation uses image from header.
2232    #[tokio::test]
2233    async fn test_container_producer_run_image_from_header() {
2234        // Try to connect to Docker - if fails, return early
2235        let docker = match Docker::connect_with_local_defaults() {
2236            Ok(d) => d,
2237            Err(_) => {
2238                eprintln!("Skipping test: Could not connect to Docker daemon");
2239                return;
2240            }
2241        };
2242
2243        if docker.ping().await.is_err() {
2244            eprintln!("Skipping test: Docker daemon not responding to ping");
2245            return;
2246        }
2247
2248        // Create producer without an image in the URI
2249        let component = ContainerComponent::new();
2250        let ctx = NoOpComponentContext;
2251        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2252        let ctx = ProducerContext::new();
2253        let mut producer = endpoint.create_producer(&ctx).unwrap();
2254
2255        let mut exchange = Exchange::new(Message::new(""));
2256        exchange
2257            .input
2258            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2259        // Set a non-existent image to test that the run operation attempts to use it
2260        exchange.input.set_header(
2261            HEADER_IMAGE,
2262            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2263        );
2264
2265        use tower::ServiceExt;
2266        let result = producer.ready().await.unwrap().call(exchange).await;
2267
2268        // The operation should fail because the image doesn't exist
2269        assert!(
2270            result.is_err(),
2271            "Expected error when running container with nonexistent image"
2272        );
2273        let err = result.unwrap_err();
2274        match &err {
2275            CamelError::ProcessorError(msg) => {
2276                // Docker API returns an error about the missing image
2277                assert!(
2278                    msg.to_lowercase().contains("no such image")
2279                        || msg.to_lowercase().contains("not found")
2280                        || msg.to_lowercase().contains("image")
2281                        || msg.to_lowercase().contains("pull")
2282                        || msg.contains("404"),
2283                    "Error message should indicate image issue, got: {}",
2284                    msg
2285                );
2286            }
2287            _ => panic!("Expected ProcessorError, got: {:?}", err),
2288        }
2289    }
2290
2291    /// Integration test: Actually run a container with alpine:latest.
2292    /// This test verifies the full flow: create → start → set headers.
2293    #[tokio::test]
2294    async fn test_container_producer_run_alpine_container() {
2295        let docker = match Docker::connect_with_local_defaults() {
2296            Ok(d) => d,
2297            Err(_) => {
2298                eprintln!("Skipping test: Could not connect to Docker daemon");
2299                return;
2300            }
2301        };
2302
2303        if docker.ping().await.is_err() {
2304            eprintln!("Skipping test: Docker daemon not responding to ping");
2305            return;
2306        }
2307
2308        // Pull alpine:latest if not present
2309        let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2310        let has_alpine = images
2311            .iter()
2312            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2313
2314        if !has_alpine {
2315            eprintln!("Pulling alpine:latest image...");
2316            let mut stream = docker.create_image(
2317                Some(CreateImageOptions {
2318                    from_image: Some("alpine:latest".to_string()),
2319                    ..Default::default()
2320                }),
2321                None,
2322                None,
2323            );
2324
2325            use futures::StreamExt;
2326            while let Some(_item) = stream.next().await {
2327                // Wait for pull to complete
2328            }
2329            eprintln!("Image pulled successfully");
2330        }
2331
2332        // Create producer
2333        let component = ContainerComponent::new();
2334        let ctx = NoOpComponentContext;
2335        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2336        let ctx = ProducerContext::new();
2337        let mut producer = endpoint.create_producer(&ctx).unwrap();
2338
2339        // Run container with unique name
2340        let timestamp = std::time::SystemTime::now()
2341            .duration_since(std::time::UNIX_EPOCH)
2342            .unwrap()
2343            .as_millis();
2344        let container_name = format!("test-rust-camel-{}", timestamp);
2345        let mut exchange = Exchange::new(Message::new(""));
2346        exchange.input.set_header(
2347            HEADER_IMAGE,
2348            serde_json::Value::String("alpine:latest".into()),
2349        );
2350        exchange.input.set_header(
2351            HEADER_CONTAINER_NAME,
2352            serde_json::Value::String(container_name.clone()),
2353        );
2354
2355        use tower::ServiceExt;
2356        let result = producer
2357            .ready()
2358            .await
2359            .unwrap()
2360            .call(exchange)
2361            .await
2362            .expect("Container run should succeed");
2363
2364        // Verify container ID was set
2365        let container_id = result
2366            .input
2367            .header(HEADER_CONTAINER_ID)
2368            .and_then(|v| v.as_str().map(|s| s.to_string()))
2369            .expect("Expected container ID header");
2370        assert!(!container_id.is_empty(), "Container ID should not be empty");
2371
2372        // Verify success header
2373        assert_eq!(
2374            result
2375                .input
2376                .header(HEADER_ACTION_RESULT)
2377                .and_then(|v| v.as_str()),
2378            Some("success")
2379        );
2380
2381        // Verify container exists in Docker
2382        let inspect = docker
2383            .inspect_container(&container_id, None)
2384            .await
2385            .expect("Container should exist");
2386        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2387
2388        // Cleanup: remove container
2389        docker
2390            .remove_container(
2391                &container_id,
2392                Some(RemoveContainerOptions {
2393                    force: true,
2394                    ..Default::default()
2395                }),
2396            )
2397            .await
2398            .ok();
2399
2400        eprintln!("✅ Container {} created and cleaned up", container_id);
2401    }
2402
2403    /// Test that consumer returns an error for unsupported operations.
2404    #[tokio::test]
2405    async fn test_container_consumer_unsupported_operation() {
2406        use tokio::sync::mpsc;
2407
2408        let component = ContainerComponent::new();
2409        let ctx = NoOpComponentContext;
2410        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2411        let mut consumer = endpoint.create_consumer().unwrap();
2412
2413        // Create a minimal ConsumerContext
2414        let (tx, _rx) = mpsc::channel(16);
2415        let cancel_token = tokio_util::sync::CancellationToken::new();
2416        let context = ConsumerContext::new(tx, cancel_token);
2417
2418        let result = consumer.start(context).await;
2419
2420        // Should return error because "run" is not a supported consumer operation
2421        assert!(
2422            result.is_err(),
2423            "Expected error for unsupported consumer operation"
2424        );
2425        let err = result.unwrap_err();
2426        match &err {
2427            CamelError::EndpointCreationFailed(msg) => {
2428                assert!(
2429                    msg.contains("Consumer only supports 'events' or 'logs'"),
2430                    "Error message should mention events or logs support, got: {}",
2431                    msg
2432                );
2433            }
2434            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2435        }
2436    }
2437
2438    #[test]
2439    fn test_container_consumer_concurrency_model_is_concurrent() {
2440        let consumer = ContainerConsumer {
2441            config: ContainerConfig::from_uri("container:events").unwrap(),
2442        };
2443
2444        assert_eq!(
2445            consumer.concurrency_model(),
2446            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2447        );
2448    }
2449
2450    /// Test that consumer gracefully shuts down when cancellation is requested.
2451    /// This test requires a running Docker daemon. If Docker is not available, the test
2452    /// will be ignored.
2453    #[tokio::test]
2454    async fn test_container_consumer_cancellation() {
2455        use std::sync::atomic::{AtomicBool, Ordering};
2456        use tokio::sync::mpsc;
2457
2458        // Try to connect to Docker - if fails, return early
2459        let docker = match Docker::connect_with_local_defaults() {
2460            Ok(d) => d,
2461            Err(_) => {
2462                eprintln!("Skipping test: Could not connect to Docker daemon");
2463                return;
2464            }
2465        };
2466
2467        if docker.ping().await.is_err() {
2468            eprintln!("Skipping test: Docker daemon not responding to ping");
2469            return;
2470        }
2471
2472        let component = ContainerComponent::new();
2473        let ctx = NoOpComponentContext;
2474        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2475        let mut consumer = endpoint.create_consumer().unwrap();
2476
2477        // Create a ConsumerContext
2478        let (tx, _rx) = mpsc::channel(16);
2479        let cancel_token = tokio_util::sync::CancellationToken::new();
2480        let context = ConsumerContext::new(tx, cancel_token.clone());
2481
2482        // Track if the consumer task has completed
2483        let completed = Arc::new(AtomicBool::new(false));
2484        let completed_clone = completed.clone();
2485
2486        // Spawn consumer in background
2487        let handle = tokio::spawn(async move {
2488            let result = consumer.start(context).await;
2489            // Mark as completed when done
2490            completed_clone.store(true, Ordering::SeqCst);
2491            result
2492        });
2493
2494        // Wait a bit for the consumer to start
2495        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2496
2497        // Consumer should still be running (not completed)
2498        assert!(
2499            !completed.load(Ordering::SeqCst),
2500            "Consumer should still be running before cancellation"
2501        );
2502
2503        // Request cancellation
2504        cancel_token.cancel();
2505
2506        // Wait for the task to complete (with timeout)
2507        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2508
2509        // Task should have completed (not timed out)
2510        assert!(
2511            result.is_ok(),
2512            "Consumer should gracefully shut down after cancellation"
2513        );
2514
2515        // Verify the consumer completed
2516        assert!(
2517            completed.load(Ordering::SeqCst),
2518            "Consumer should have completed after cancellation"
2519        );
2520    }
2521
2522    /// Integration test for listing containers.
2523    /// This test requires a running Docker daemon. If Docker is not available, the test
2524    /// will return early and be effectively ignored.
2525    #[tokio::test]
2526    async fn test_container_producer_list_containers() {
2527        // Try to connect to Docker using default config
2528        // If ping fails, return early (effectively ignoring this test)
2529        let docker = match Docker::connect_with_local_defaults() {
2530            Ok(d) => d,
2531            Err(_) => {
2532                eprintln!("Skipping test: Could not connect to Docker daemon");
2533                return;
2534            }
2535        };
2536
2537        if docker.ping().await.is_err() {
2538            eprintln!("Skipping test: Docker daemon not responding to ping");
2539            return;
2540        }
2541
2542        // Create producer with list operation
2543        let component = ContainerComponent::new();
2544        let ctx = NoOpComponentContext;
2545        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2546
2547        let ctx = ProducerContext::new();
2548        let mut producer = endpoint.create_producer(&ctx).unwrap();
2549
2550        // Create exchange with list operation in header
2551        let mut exchange = Exchange::new(Message::new(""));
2552        exchange
2553            .input
2554            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2555
2556        // Call the producer
2557        use tower::ServiceExt;
2558        let result = producer
2559            .ready()
2560            .await
2561            .unwrap()
2562            .call(exchange)
2563            .await
2564            .expect("Producer should succeed when Docker is available");
2565
2566        // Assert that the input exchange body is a JSON array
2567        // (because list_containers should put the JSON result in the body)
2568        match &result.input.body {
2569            camel_component_api::Body::Json(json_value) => {
2570                assert!(
2571                    json_value.is_array(),
2572                    "Expected input body to be a JSON array, got: {:?}",
2573                    json_value
2574                );
2575            }
2576            other => panic!("Expected Body::Json with array, got: {:?}", other),
2577        }
2578    }
2579
2580    #[test]
2581    fn test_container_config_parses_volumes() {
2582        let config = ContainerConfig::from_uri(
2583            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2584        )
2585        .unwrap();
2586        assert_eq!(
2587            config.volumes.as_deref(),
2588            Some("./html:/usr/share/nginx/html:ro")
2589        );
2590    }
2591
2592    #[test]
2593    fn test_container_config_parses_exec_params() {
2594        let config = ContainerConfig::from_uri(
2595            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2596        )
2597        .unwrap();
2598        assert_eq!(config.operation, "exec");
2599        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2600        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2601        assert_eq!(config.user.as_deref(), Some("root"));
2602        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2603        assert!(config.detach);
2604    }
2605
2606    #[test]
2607    fn test_container_config_parses_network_create_params() {
2608        let config =
2609            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2610                .unwrap();
2611        assert_eq!(config.operation, "network-create");
2612        assert_eq!(config.name.as_deref(), Some("my-net"));
2613        assert_eq!(config.driver.as_deref(), Some("bridge"));
2614    }
2615
2616    #[test]
2617    fn test_container_config_defaults_new_fields() {
2618        let config = ContainerConfig::from_uri("container:list").unwrap();
2619        assert!(config.volumes.is_none());
2620        assert!(config.user.is_none());
2621        assert!(config.workdir.is_none());
2622        assert!(!config.detach);
2623        assert!(config.driver.is_none());
2624        assert!(!config.force);
2625    }
2626
2627    #[test]
2628    fn test_parse_volumes_bind_mount() {
2629        let config = ContainerConfig::from_uri(
2630            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2631        )
2632        .unwrap();
2633        let (binds, anon) = config.parse_volumes().unwrap();
2634        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2635        assert!(anon.is_empty());
2636    }
2637
2638    #[test]
2639    fn test_parse_volumes_named_volume() {
2640        let config =
2641            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2642                .unwrap();
2643        let (binds, anon) = config.parse_volumes().unwrap();
2644        assert_eq!(binds, vec!["data:/var/lib/data"]);
2645        assert!(anon.is_empty());
2646    }
2647
2648    #[test]
2649    fn test_parse_volumes_anonymous() {
2650        let config =
2651            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2652        let (binds, anon) = config.parse_volumes().unwrap();
2653        assert!(binds.is_empty());
2654        assert!(anon.contains(&"/tmp/app-data".to_string()));
2655    }
2656
2657    #[test]
2658    fn test_parse_volumes_anonymous_with_mode() {
2659        let config =
2660            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2661                .unwrap();
2662        let (binds, anon) = config.parse_volumes().unwrap();
2663        assert!(binds.is_empty());
2664        assert!(anon.contains(&"/tmp/app-data".to_string()));
2665    }
2666
2667    #[test]
2668    fn test_parse_volumes_multiple() {
2669        let config = ContainerConfig::from_uri(
2670            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2671        )
2672        .unwrap();
2673        let (binds, anon) = config.parse_volumes().unwrap();
2674        assert_eq!(binds.len(), 2);
2675        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2676        assert!(binds.contains(&"data:/var/log/app".to_string()));
2677        assert!(anon.is_empty());
2678    }
2679
2680    #[test]
2681    fn test_parse_volumes_mixed() {
2682        let config = ContainerConfig::from_uri(
2683            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2684        )
2685        .unwrap();
2686        let (binds, anon) = config.parse_volumes().unwrap();
2687        assert_eq!(binds.len(), 1);
2688        assert!(anon.contains(&"/tmp/cache".to_string()));
2689    }
2690
2691    #[test]
2692    fn test_parse_volumes_none() {
2693        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2694        assert!(config.parse_volumes().is_none());
2695    }
2696
2697    #[test]
2698    fn test_parse_volumes_empty_entry_skipped() {
2699        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2700        assert!(config.parse_volumes().is_none());
2701    }
2702
2703    #[test]
2704    fn test_parse_volumes_rw_mode() {
2705        let config =
2706            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2707                .unwrap();
2708        let (binds, _) = config.parse_volumes().unwrap();
2709        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2710    }
2711
2712    #[test]
2713    fn test_container_config_from_uri_parses_false_flags() {
2714        let config = ContainerConfig::from_uri(
2715            "container:logs?containerId=a&follow=false&timestamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2716        )
2717        .unwrap();
2718        assert!(!config.follow);
2719        assert!(!config.timestamps);
2720        assert!(!config.auto_pull);
2721        assert!(!config.auto_remove);
2722        assert!(config.detach);
2723        assert!(config.force);
2724    }
2725
2726    #[test]
2727    fn test_docker_socket_path_validation() {
2728        let unix_cfg =
2729            ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2730        assert_eq!(
2731            unix_cfg.docker_socket_path().unwrap(),
2732            "unix:///tmp/docker.sock"
2733        );
2734
2735        let npipe_cfg =
2736            ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2737                .unwrap();
2738        assert_eq!(
2739            npipe_cfg.docker_socket_path().unwrap(),
2740            "npipe:////./pipe/docker_engine"
2741        );
2742
2743        let plain_cfg =
2744            ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2745        assert_eq!(
2746            plain_cfg.docker_socket_path().unwrap(),
2747            "/var/run/docker.sock"
2748        );
2749
2750        let bad_cfg =
2751            ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2752        assert!(bad_cfg.docker_socket_path().is_err());
2753    }
2754
2755    #[test]
2756    fn test_parse_ports_invalid_and_whitespace_entries() {
2757        // "8080" without colon is malformed — must fail-fast
2758        let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2759        let err = cfg.parse_ports().unwrap_err();
2760        assert!(
2761            err.to_string().contains("malformed port mapping"),
2762            "expected malformed port error, got: {}",
2763            err
2764        );
2765
2766        let cfg =
2767            ContainerConfig::from_uri("container:run?ports= 8080:80 ,  5353:53/udp ").unwrap();
2768        let (exposed, bindings) = cfg.parse_ports().unwrap();
2769        assert!(exposed.contains(&"80/tcp".to_string()));
2770        assert!(exposed.contains(&"53/udp".to_string()));
2771        assert_eq!(bindings.len(), 2);
2772    }
2773
2774    #[test]
2775    fn test_parse_ports_fail_fast_on_malformed() {
2776        // First entry valid, second malformed — must fail on the malformed one
2777        let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2778        let err = cfg.parse_ports().unwrap_err();
2779        assert!(
2780            err.to_string()
2781                .contains("malformed port mapping 'badentry'"),
2782            "expected fail-fast on 'badentry', got: {}",
2783            err
2784        );
2785    }
2786
2787    #[test]
2788    fn test_parse_env_trims_and_filters_empty_items() {
2789        let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2790        let env = cfg.parse_env().unwrap();
2791        assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2792
2793        let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2794        assert!(cfg.parse_env().is_none());
2795    }
2796
2797    #[test]
2798    fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2799        assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2800        let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2801        assert!(binds.contains(&"a:/b:rw".to_string()));
2802        assert!(anon.contains(&"/tmp/cache".to_string()));
2803        assert!(anon.contains(&"/tmp/logs".to_string()));
2804    }
2805
2806    #[test]
2807    fn test_format_docker_event_variants_and_timestamp_extraction() {
2808        let mut attrs = std::collections::HashMap::new();
2809        attrs.insert("name".to_string(), "demo".to_string());
2810        attrs.insert("image".to_string(), "alpine:latest".to_string());
2811        attrs.insert("exitCode".to_string(), "137".to_string());
2812        let actor = bollard::models::EventActor {
2813            id: None,
2814            attributes: Some(attrs),
2815        };
2816
2817        let create_event = bollard::models::EventMessage {
2818            action: Some("create".to_string()),
2819            actor: Some(actor.clone()),
2820            ..Default::default()
2821        };
2822        assert_eq!(
2823            format_docker_event(&create_event),
2824            "[CREATE] Container demo (alpine:latest)"
2825        );
2826
2827        let die_event = bollard::models::EventMessage {
2828            action: Some("die".to_string()),
2829            actor: Some(actor),
2830            ..Default::default()
2831        };
2832        assert_eq!(
2833            format_docker_event(&die_event),
2834            "[DIE]    Container demo (exit: 137)"
2835        );
2836
2837        let other_event = bollard::models::EventMessage {
2838            action: Some("oom".to_string()),
2839            actor: None,
2840            ..Default::default()
2841        };
2842        assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2843
2844        assert_eq!(
2845            extract_timestamp("2024-01-01T00:00:00Z hello"),
2846            Some("2024-01-01T00:00:00Z".to_string())
2847        );
2848        assert_eq!(extract_timestamp("hello world"), None);
2849    }
2850
2851    #[tokio::test]
2852    async fn test_run_container_with_cleanup_error_paths() {
2853        let create_fail = run_container_with_cleanup(
2854            || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2855            |_id| async move { Ok(()) },
2856            |_id| async move { Ok(()) },
2857        )
2858        .await;
2859        assert!(
2860            matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2861        );
2862
2863        let cleanup_fail = run_container_with_cleanup(
2864            || async { Ok("cid-1".to_string()) },
2865            |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2866            |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2867        )
2868        .await;
2869        match cleanup_fail {
2870            Err(CamelError::ProcessorError(msg)) => {
2871                assert!(msg.contains("Failed to start container"));
2872                assert!(msg.contains("Cleanup failed"));
2873            }
2874            other => panic!("unexpected result: {:?}", other),
2875        }
2876    }
2877
2878    #[tokio::test]
2879    async fn test_container_producer_network_lifecycle() {
2880        let docker = match Docker::connect_with_local_defaults() {
2881            Ok(d) => d,
2882            Err(_) => {
2883                eprintln!("Skipping test: Could not connect to Docker daemon");
2884                return;
2885            }
2886        };
2887        if docker.ping().await.is_err() {
2888            eprintln!("Skipping test: Docker daemon not responding to ping");
2889            return;
2890        }
2891
2892        let network_name = format!("camel-test-{}", std::process::id());
2893
2894        let component = ContainerComponent::new();
2895        let component_ctx = NoOpComponentContext;
2896
2897        // Create network
2898        let endpoint = component
2899            .create_endpoint(
2900                &format!("container:network-create?name={}", network_name),
2901                &component_ctx,
2902            )
2903            .unwrap();
2904        let ctx = ProducerContext::new();
2905        let mut producer = endpoint.create_producer(&ctx).unwrap();
2906
2907        let mut exchange = Exchange::new(Message::new(""));
2908        exchange.input.set_header(
2909            HEADER_ACTION,
2910            serde_json::Value::String("network-create".into()),
2911        );
2912
2913        use tower::ServiceExt;
2914        let result = producer
2915            .ready()
2916            .await
2917            .unwrap()
2918            .call(exchange)
2919            .await
2920            .expect("Network create should succeed");
2921
2922        let network_id = result
2923            .input
2924            .header(HEADER_NETWORK)
2925            .and_then(|v| v.as_str().map(|s| s.to_string()))
2926            .expect("Should have network ID");
2927
2928        assert!(!network_id.is_empty());
2929
2930        // List networks
2931        let endpoint = component
2932            .create_endpoint("container:network-list", &component_ctx)
2933            .unwrap();
2934        let mut producer = endpoint.create_producer(&ctx).unwrap();
2935
2936        let mut exchange = Exchange::new(Message::new(""));
2937        exchange.input.set_header(
2938            HEADER_ACTION,
2939            serde_json::Value::String("network-list".into()),
2940        );
2941
2942        let list_result = producer
2943            .ready()
2944            .await
2945            .unwrap()
2946            .call(exchange)
2947            .await
2948            .expect("Network list should succeed");
2949
2950        match &list_result.input.body {
2951            Body::Json(json_value) => {
2952                assert!(json_value.is_array(), "Expected JSON array");
2953            }
2954            other => panic!("Expected Body::Json, got: {:?}", other),
2955        }
2956
2957        // Remove network
2958        let endpoint = component
2959            .create_endpoint(
2960                &format!("container:network-remove?network={}", network_name),
2961                &component_ctx,
2962            )
2963            .unwrap();
2964        let mut producer = endpoint.create_producer(&ctx).unwrap();
2965
2966        let mut exchange = Exchange::new(Message::new(""));
2967        exchange.input.set_header(
2968            HEADER_ACTION,
2969            serde_json::Value::String("network-remove".into()),
2970        );
2971
2972        let remove_result = producer
2973            .ready()
2974            .await
2975            .unwrap()
2976            .call(exchange)
2977            .await
2978            .expect("Network remove should succeed");
2979
2980        let action_result = remove_result
2981            .input
2982            .header(HEADER_ACTION_RESULT)
2983            .and_then(|v| v.as_str());
2984        assert_eq!(action_result, Some("success"));
2985    }
2986
2987    #[tokio::test]
2988    async fn test_logs_consumer_requires_container_id() {
2989        use tokio::sync::mpsc;
2990
2991        let mut consumer = ContainerConsumer {
2992            config: ContainerConfig::from_uri("container:logs").unwrap(),
2993        };
2994        let (tx, _rx) = mpsc::channel(4);
2995        let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
2996
2997        let err = consumer.start(context).await.unwrap_err();
2998        match err {
2999            CamelError::EndpointCreationFailed(msg) => {
3000                assert!(msg.contains("containerId is required for logs consumer"));
3001            }
3002            other => panic!("unexpected error: {:?}", other),
3003        }
3004    }
3005
3006    #[tokio::test]
3007    async fn test_events_consumer_stops_immediately_when_cancelled() {
3008        use tokio::sync::mpsc;
3009
3010        let mut consumer = ContainerConsumer {
3011            config: ContainerConfig::from_uri("container:events").unwrap(),
3012        };
3013
3014        let (tx, _rx) = mpsc::channel(4);
3015        let cancel = tokio_util::sync::CancellationToken::new();
3016        cancel.cancel();
3017        let context = ConsumerContext::new(tx, cancel);
3018
3019        let result = consumer.start(context).await;
3020        assert!(result.is_ok());
3021    }
3022
3023    #[test]
3024    fn test_global_config_constructors_and_endpoint_docker_host() {
3025        let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3026        let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3027        cfg.apply_global_defaults(&global);
3028
3029        let endpoint = ContainerEndpoint {
3030            uri: "container:list".to_string(),
3031            config: cfg,
3032        };
3033
3034        assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3035
3036        let component = ContainerComponent::with_config(global);
3037        assert_eq!(component.scheme(), "container");
3038    }
3039}