Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21    ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24    CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25    ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26    StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::NetworkRetryPolicy;
30use camel_component_api::parse_uri;
31use camel_component_api::retry_async_cancelable;
32use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
33use camel_component_api::{
34    Component, Consumer, ConsumerContext, Endpoint, ProducerContext, RuntimeObservability,
35};
36use tower::Service;
37
38/// Global tracker for containers created by this component.
39/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
40static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
41    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
42
43/// Registers a container ID for tracking (will be cleaned up on shutdown).
44fn track_container(id: String) {
45    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
46        tracker.insert(id);
47    }
48}
49
50/// Removes a container ID from tracking (when it's been removed naturally).
51fn untrack_container(id: &str) {
52    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
53        tracker.remove(id);
54    }
55}
56
57/// Cleans up all tracked containers. Call this on application shutdown.
58pub async fn cleanup_tracked_containers() {
59    let ids: Vec<String> = {
60        match CONTAINER_TRACKER.lock() {
61            Ok(tracker) => tracker.iter().cloned().collect(),
62            Err(_) => return,
63        }
64    };
65
66    if ids.is_empty() {
67        return;
68    }
69
70    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
71
72    let docker = match Docker::connect_with_local_defaults() {
73        Ok(d) => d,
74        Err(e) => {
75            // log-policy: system-broken
76            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
77            return;
78        }
79    };
80
81    for id in ids {
82        match docker
83            .remove_container(
84                &id,
85                Some(RemoveContainerOptions {
86                    force: true,
87                    ..Default::default()
88                }),
89            )
90            .await
91        {
92            Ok(_) => {
93                tracing::debug!("Cleaned up container {}", id);
94                untrack_container(&id);
95            }
96            Err(e) => {
97                tracing::warn!("Failed to cleanup container {}: {}", id, e);
98            }
99        }
100    }
101}
102
103// Header constants for container operations
104
105/// Timeout (seconds) for connecting to the Docker daemon.
106const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
107
108/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
109pub const HEADER_ACTION: &str = "CamelContainerAction";
110
111/// Header key for specifying the container image to use for "run" operations.
112pub const HEADER_IMAGE: &str = "CamelContainerImage";
113
114/// Header key for specifying or receiving the container ID.
115pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
116
117/// Header key for the log stream type (stdout or stderr).
118pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
119
120/// Header key for the log timestamp.
121pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
122
123/// Header key for specifying the container name for "run" operations.
124pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
125
126/// Header key for the result status of a container operation (e.g., "success").
127pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
128
129/// Header key for specifying the command to execute in a container.
130pub const HEADER_CMD: &str = "CamelContainerCmd";
131
132/// Header key for specifying the network name for network operations.
133pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
134
135/// Header key for the exit code of an exec operation.
136pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
137
138/// Header key for specifying volume mounts.
139pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
140
141/// Header key for the exec instance ID.
142pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
143
144// ---------------------------------------------------------------------------
145// ContainerGlobalConfig
146// ---------------------------------------------------------------------------
147
148/// Per-component reconnect default: unlimited retries (max_attempts=0)
149/// with a fixed 5s delay, preserving the old infinite-reconnect behavior.
150/// Operators can opt into bounded retry via TOML `[reconnect]`.
151fn container_reconnect_default() -> NetworkRetryPolicy {
152    NetworkRetryPolicy {
153        enabled: true,
154        max_attempts: 0, // unlimited
155        initial_delay: std::time::Duration::from_secs(5),
156        multiplier: 1.0, // fixed delay (old behavior was no backoff)
157        max_delay: std::time::Duration::from_secs(5),
158        jitter_factor: 0.0,
159    }
160}
161
162/// Global configuration for Container component.
163/// Supports serde deserialization with defaults and builder methods.
164/// These are the fallback defaults when URI params are not set.
165#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
166#[serde(default)]
167pub struct ContainerGlobalConfig {
168    /// The Docker host URL (default: "unix:///var/run/docker.sock").
169    pub docker_host: String,
170    /// Reconnection policy for events/logs consumers (default: unlimited, 5s fixed delay).
171    #[serde(default = "container_reconnect_default")]
172    pub reconnect: NetworkRetryPolicy,
173}
174
175impl Default for ContainerGlobalConfig {
176    fn default() -> Self {
177        Self {
178            docker_host: "unix:///var/run/docker.sock".to_string(),
179            reconnect: container_reconnect_default(),
180        }
181    }
182}
183
184impl ContainerGlobalConfig {
185    pub fn new() -> Self {
186        Self::default()
187    }
188
189    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
190        self.docker_host = v.into();
191        self
192    }
193}
194
195// ---------------------------------------------------------------------------
196// ContainerConfig (endpoint configuration)
197// ---------------------------------------------------------------------------
198
199/// Configuration for the container component endpoint.
200///
201/// This struct holds the parsed URI configuration including the operation type,
202/// optional container image, and Docker host connection details.
203#[derive(Debug, Clone)]
204pub struct ContainerConfig {
205    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
206    pub operation: String,
207    /// The container image to use for "run" operations (can be overridden via header).
208    pub image: Option<String>,
209    /// The container name to use for "run" operations (can be overridden via header).
210    pub name: Option<String>,
211    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
212    pub host: Option<String>,
213    /// Command to run in the container (e.g., "sleep 30").
214    pub cmd: Option<String>,
215    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
216    pub ports: Option<String>,
217    /// Environment variables in format "KEY=value,KEY2=value2".
218    pub env: Option<String>,
219    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
220    pub network: Option<String>,
221    /// Container ID or name for logs consumer.
222    pub container_id: Option<String>,
223    /// Follow log output (default: true for consumer).
224    pub follow: bool,
225    /// Include timestamps in logs (default: false).
226    pub timestamps: bool,
227    /// Number of lines to show from the end of logs (default: all).
228    pub tail: Option<String>,
229    /// Automatically pull the image if not present (default: true).
230    pub auto_pull: bool,
231    /// Automatically remove the container when it exits (default: true).
232    pub auto_remove: bool,
233    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
234    pub volumes: Option<String>,
235    /// User to run the container or exec command as (e.g., "root").
236    pub user: Option<String>,
237    /// Working directory inside the container.
238    pub workdir: Option<String>,
239    /// Whether to detach from the exec process (default: false).
240    pub detach: bool,
241    /// Network driver for network-create (e.g., "bridge", "overlay").
242    pub driver: Option<String>,
243    /// Whether to force the operation (default: false).
244    pub force: bool,
245    /// Reconnection policy for events/logs consumers (applied from global config).
246    pub reconnect: NetworkRetryPolicy,
247}
248
249impl ContainerConfig {
250    /// Parses a container URI into a `ContainerConfig`.
251    ///
252    /// # Arguments
253    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
254    ///
255    /// # Errors
256    /// Returns an error if the URI scheme is not "container".
257    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
258        let parts = parse_uri(uri)?;
259        if parts.scheme != "container" {
260            return Err(CamelError::InvalidUri(format!(
261                "expected scheme 'container', got '{}'",
262                parts.scheme
263            )));
264        }
265
266        let image = parts.params.get("image").cloned();
267        let name = parts.params.get("name").cloned();
268        let cmd = parts.params.get("cmd").cloned();
269        let ports = parts.params.get("ports").cloned();
270        let env = parts.params.get("env").cloned();
271        let network = parts.params.get("network").cloned();
272        let container_id = parts.params.get("containerId").cloned();
273        let follow = parts
274            .params
275            .get("follow")
276            .map(|v| v.eq_ignore_ascii_case("true"))
277            .unwrap_or(true);
278        let timestamps = parts
279            .params
280            .get("timestamps")
281            .map(|v| v.eq_ignore_ascii_case("true"))
282            .unwrap_or(false);
283        let tail = parts.params.get("tail").cloned();
284        let auto_pull = parts
285            .params
286            .get("autoPull")
287            .map(|v| v.eq_ignore_ascii_case("true"))
288            .unwrap_or(true);
289        let auto_remove = parts
290            .params
291            .get("autoRemove")
292            .map(|v| v.eq_ignore_ascii_case("true"))
293            .unwrap_or(true);
294        // host is only set from URI param; global config defaults are applied later
295        let host = parts.params.get("host").cloned();
296        let volumes = parts.params.get("volumes").cloned();
297        let user = parts.params.get("user").cloned();
298        let workdir = parts.params.get("workdir").cloned();
299        let detach = parts
300            .params
301            .get("detach")
302            .map(|v| v.eq_ignore_ascii_case("true"))
303            .unwrap_or(false);
304        let driver = parts.params.get("driver").cloned();
305        let force = parts
306            .params
307            .get("force")
308            .map(|v| v.eq_ignore_ascii_case("true"))
309            .unwrap_or(false);
310
311        Ok(Self {
312            operation: parts.path,
313            image,
314            name,
315            host,
316            cmd,
317            ports,
318            env,
319            network,
320            container_id,
321            follow,
322            timestamps,
323            tail,
324            auto_pull,
325            auto_remove,
326            volumes,
327            user,
328            workdir,
329            detach,
330            driver,
331            force,
332            reconnect: NetworkRetryPolicy::default(),
333        })
334    }
335
336    /// Apply global config defaults to this endpoint config.
337    /// Only sets values that are currently `None`.
338    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
339        if self.host.is_none() {
340            self.host = Some(global.docker_host.clone());
341        }
342        self.reconnect = global.reconnect.clone();
343    }
344
345    fn docker_socket_path(&self) -> Result<&str, CamelError> {
346        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
347            "npipe:////./pipe/docker_engine"
348        } else {
349            "unix:///var/run/docker.sock"
350        });
351
352        if host.starts_with("unix://") || host.starts_with("npipe://") {
353            return Ok(host);
354        }
355
356        if host.contains("://") {
357            return Err(CamelError::ProcessorError(format!(
358                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
359                host
360            )));
361        }
362
363        Ok(host)
364    }
365
366    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
367        let socket_path = self.docker_socket_path()?;
368        Docker::connect_with_socket(
369            socket_path,
370            DOCKER_CONNECT_TIMEOUT_SECS,
371            bollard::API_DEFAULT_VERSION,
372        )
373        .map_err(|e| {
374            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
375        })
376    }
377
378    /// Connects to the Docker daemon using the configured host.
379    ///
380    /// This method establishes a Unix socket connection to Docker and verifies
381    /// the connection by sending a ping request.
382    ///
383    /// # Errors
384    /// Returns an error if the connection fails or the ping request fails.
385    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
386        let docker = self.connect_docker_client()?;
387        docker
388            .ping()
389            .await
390            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
391        Ok(docker)
392    }
393
394    #[allow(clippy::type_complexity)]
395    fn parse_ports(
396        &self,
397    ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
398        let ports_str = match self.ports.as_ref() {
399            Some(s) => s,
400            None => return Ok((Vec::new(), HashMap::new())),
401        };
402
403        let mut exposed_ports: Vec<String> = Vec::new();
404        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
405
406        for mapping in ports_str.split(',') {
407            let mapping = mapping.trim();
408            if mapping.is_empty() {
409                continue;
410            }
411
412            let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
413                CamelError::ProcessorError(format!(
414                    "malformed port mapping '{}': expected hostPort:containerPort",
415                    mapping
416                ))
417            })?;
418
419            let (container_port, protocol) = if container_spec.contains('/') {
420                let parts: Vec<&str> = container_spec.split('/').collect();
421                (parts[0], parts[1])
422            } else {
423                (container_spec, "tcp")
424            };
425
426            let container_key = format!("{}/{}", container_port, protocol);
427
428            exposed_ports.push(container_key.clone());
429
430            port_bindings.insert(
431                container_key,
432                Some(vec![PortBinding {
433                    host_ip: None,
434                    host_port: Some(host_port.to_string()),
435                }]),
436            );
437        }
438
439        Ok((exposed_ports, port_bindings))
440    }
441
442    fn parse_env(&self) -> Option<Vec<String>> {
443        let env_str = self.env.as_ref()?;
444
445        let env_vars: Vec<String> = env_str
446            .split(',')
447            .map(|s| s.trim().to_string())
448            .filter(|s| !s.is_empty())
449            .collect();
450
451        if env_vars.is_empty() {
452            None
453        } else {
454            Some(env_vars)
455        }
456    }
457
458    #[cfg(test)]
459    #[allow(clippy::type_complexity)]
460    fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
461        self.volumes.as_deref().and_then(parse_volume_str)
462    }
463}
464
465/// Parses a volume specification string into bind mounts and anonymous volumes.
466///
467/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
468/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
469#[allow(clippy::type_complexity)]
470fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
471    let mut binds: Vec<String> = Vec::new();
472    let mut anonymous_volumes: Vec<String> = Vec::new();
473
474    for entry in volumes_str.split(',') {
475        let entry = entry.trim();
476        if entry.is_empty() {
477            continue;
478        }
479
480        let segments: Vec<&str> = entry.split(':').collect();
481
482        match segments.len() {
483            3 => {
484                let source = segments[0];
485                let target = segments[1];
486                let mode = segments[2];
487                if mode != "ro" && mode != "rw" {
488                    continue;
489                }
490                binds.push(format!("{}:{}:{}", source, target, mode));
491            }
492            2 => {
493                let a = segments[0];
494                let b = segments[1];
495                if b == "ro" || b == "rw" {
496                    anonymous_volumes.push(a.to_string());
497                } else {
498                    binds.push(format!("{}:{}", a, b));
499                }
500            }
501            1 => {
502                anonymous_volumes.push(segments[0].to_string());
503            }
504            _ => continue,
505        }
506    }
507
508    if binds.is_empty() && anonymous_volumes.is_empty() {
509        None
510    } else {
511        Some((binds, anonymous_volumes))
512    }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516enum ProducerOperation {
517    List,
518    Run,
519    Start,
520    Stop,
521    Remove,
522    Exec,
523    NetworkCreate,
524    NetworkConnect,
525    NetworkDisconnect,
526    NetworkRemove,
527    NetworkList,
528}
529
530fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
531    match operation {
532        "list" => Ok(ProducerOperation::List),
533        "run" => Ok(ProducerOperation::Run),
534        "start" => Ok(ProducerOperation::Start),
535        "stop" => Ok(ProducerOperation::Stop),
536        "remove" => Ok(ProducerOperation::Remove),
537        "exec" => Ok(ProducerOperation::Exec),
538        "network-create" => Ok(ProducerOperation::NetworkCreate),
539        "network-connect" => Ok(ProducerOperation::NetworkConnect),
540        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
541        "network-remove" => Ok(ProducerOperation::NetworkRemove),
542        "network-list" => Ok(ProducerOperation::NetworkList),
543        _ => Err(CamelError::ProcessorError(format!(
544            "Unknown container operation: {}",
545            operation
546        ))),
547    }
548}
549
550fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
551    exchange
552        .input
553        .header(HEADER_CONTAINER_NAME)
554        .and_then(|v| v.as_str().map(|s| s.to_string()))
555        .or_else(|| config.name.clone())
556}
557
558async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
559    let images = docker
560        .list_images(None::<ListImagesOptions>)
561        .await
562        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
563
564    Ok(images.iter().any(|img| {
565        img.repo_tags
566            .iter()
567            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
568    }))
569}
570
571async fn pull_image_with_progress(
572    docker: &Docker,
573    image: &str,
574    timeout_secs: u64,
575) -> Result<(), CamelError> {
576    use futures::StreamExt;
577
578    tracing::info!("Pulling image: {}", image);
579
580    let mut stream = docker.create_image(
581        Some(CreateImageOptions {
582            from_image: Some(image.to_string()),
583            ..Default::default()
584        }),
585        None,
586        None,
587    );
588
589    let start = std::time::Instant::now();
590    let mut last_progress = std::time::Instant::now();
591
592    while let Some(item) = stream.next().await {
593        if start.elapsed().as_secs() > timeout_secs {
594            return Err(CamelError::ProcessorError(format!(
595                "Image pull timeout after {}s. Try manually: docker pull {}",
596                timeout_secs, image
597            )));
598        }
599
600        match item {
601            Ok(update) => {
602                // Log progress every 2 seconds
603                if last_progress.elapsed().as_secs() >= 2 {
604                    if let Some(status) = update.status {
605                        tracing::debug!("Pull progress: {}", status);
606                    }
607                    last_progress = std::time::Instant::now();
608                }
609            }
610            Err(e) => {
611                let err_str = e.to_string().to_lowercase();
612                if err_str.contains("unauthorized") || err_str.contains("401") {
613                    return Err(CamelError::ProcessorError(format!(
614                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
615                        image
616                    )));
617                }
618                if err_str.contains("not found") || err_str.contains("404") {
619                    return Err(CamelError::ProcessorError(format!(
620                        "Image '{}' not found in registry. Check the image name and tag",
621                        image
622                    )));
623                }
624                return Err(CamelError::ProcessorError(format!(
625                    "Failed to pull image '{}': {}",
626                    image, e
627                )));
628            }
629        }
630    }
631
632    tracing::info!("Successfully pulled image: {}", image);
633    Ok(())
634}
635
636async fn ensure_image_available(
637    docker: &Docker,
638    image: &str,
639    auto_pull: bool,
640    timeout_secs: u64,
641) -> Result<(), CamelError> {
642    if image_exists_locally(docker, image).await? {
643        tracing::debug!("Image '{}' already available locally", image);
644        return Ok(());
645    }
646
647    if !auto_pull {
648        return Err(CamelError::ProcessorError(format!(
649            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
650            image, image
651        )));
652    }
653
654    pull_image_with_progress(docker, image, timeout_secs).await
655}
656
657fn format_docker_event(event: &bollard::models::EventMessage) -> String {
658    let action = event.action.as_deref().unwrap_or("unknown");
659    let actor = event.actor.as_ref();
660
661    let container_name = actor
662        .and_then(|a| a.attributes.as_ref())
663        .and_then(|attrs| attrs.get("name"))
664        .map(|s| s.as_str())
665        .unwrap_or("unknown");
666
667    let image = actor
668        .and_then(|a| a.attributes.as_ref())
669        .and_then(|attrs| attrs.get("image"))
670        .map(|s| s.as_str())
671        .unwrap_or("");
672
673    let exit_code = actor
674        .and_then(|a| a.attributes.as_ref())
675        .and_then(|attrs| attrs.get("exitCode"))
676        .map(|s| s.as_str());
677
678    match action {
679        "create" => {
680            if image.is_empty() {
681                format!("[CREATE] Container {}", container_name)
682            } else {
683                format!("[CREATE] Container {} ({})", container_name, image)
684            }
685        }
686        "start" => format!("[START]  Container {}", container_name),
687        "die" => {
688            if let Some(code) = exit_code {
689                format!("[DIE]    Container {} (exit: {})", container_name, code)
690            } else {
691                format!("[DIE]    Container {}", container_name)
692            }
693        }
694        "destroy" => format!("[DESTROY] Container {}", container_name),
695        "stop" => format!("[STOP]   Container {}", container_name),
696        "pause" => format!("[PAUSE]  Container {}", container_name),
697        "unpause" => format!("[UNPAUSE] Container {}", container_name),
698        "restart" => format!("[RESTART] Container {}", container_name),
699        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
700    }
701}
702
703async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
704    create: CreateFn,
705    start: StartFn,
706    remove: RemoveFn,
707) -> Result<String, CamelError>
708where
709    CreateFn: FnOnce() -> CreateFut,
710    CreateFut: Future<Output = Result<String, CamelError>>,
711    StartFn: FnOnce(String) -> StartFut,
712    StartFut: Future<Output = Result<(), CamelError>>,
713    RemoveFn: FnOnce(String) -> RemoveFut,
714    RemoveFut: Future<Output = Result<(), CamelError>>,
715{
716    let container_id = create().await?;
717    if let Err(start_err) = start(container_id.clone()).await {
718        if let Err(remove_err) = remove(container_id.clone()).await {
719            return Err(CamelError::ProcessorError(format!(
720                "Failed to start container: {}. Cleanup failed: {}",
721                start_err, remove_err
722            )));
723        }
724        return Err(start_err);
725    }
726
727    Ok(container_id)
728}
729
730async fn handle_list(
731    docker: Docker,
732    _config: ContainerConfig,
733    exchange: &mut Exchange,
734) -> Result<(), CamelError> {
735    let containers = docker
736        .list_containers(None::<ListContainersOptions>)
737        .await
738        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
739
740    let json_value = serde_json::to_value(&containers).map_err(|e| {
741        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
742    })?;
743
744    exchange.input.body = Body::Json(json_value);
745    exchange.input.set_header(
746        HEADER_ACTION_RESULT,
747        serde_json::Value::String("success".to_string()),
748    );
749    Ok(())
750}
751
752async fn handle_run(
753    docker: Docker,
754    config: ContainerConfig,
755    exchange: &mut Exchange,
756) -> Result<(), CamelError> {
757    let image = exchange
758        .input
759        .header(HEADER_IMAGE)
760        .and_then(|v| v.as_str().map(|s| s.to_string()))
761        .or(config.image.clone())
762        .ok_or_else(|| {
763            CamelError::ProcessorError(
764                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
765            )
766        })?;
767
768    let image = if !image.contains(':') && !image.contains('@') {
769        format!("{}:latest", image)
770    } else {
771        image
772    };
773
774    let pull_timeout = 300;
775    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
776        .await
777        .map_err(|e| {
778            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
779        })?;
780
781    let container_name = resolve_container_name(exchange, &config);
782    let container_name_ref = container_name.as_deref().unwrap_or("");
783    let cmd_parts: Option<Vec<String>> = config
784        .cmd
785        .as_ref()
786        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
787    let auto_remove = config.auto_remove;
788    let (exposed_ports, port_bindings) = config.parse_ports()?;
789    let env_vars = config.parse_env();
790    let network_mode = config.network.clone();
791
792    let volumes_str = exchange
793        .input
794        .header(HEADER_VOLUMES)
795        .and_then(|v| v.as_str().map(|s| s.to_string()))
796        .or(config.volumes.clone());
797    let (binds, anon_volumes) = volumes_str
798        .as_deref()
799        .and_then(parse_volume_str)
800        .unwrap_or_default();
801
802    let docker_create = docker.clone();
803    let docker_start = docker.clone();
804    let docker_remove = docker.clone();
805
806    let container_id = run_container_with_cleanup(
807        move || async move {
808            let create_options = CreateContainerOptions {
809                name: Some(container_name_ref.to_string()),
810                ..Default::default()
811            };
812            let container_config = ContainerCreateBody {
813                image: Some(image.clone()),
814                cmd: cmd_parts,
815                env: env_vars,
816                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
817                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
818                host_config: Some(HostConfig {
819                    auto_remove: Some(auto_remove),
820                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
821                    network_mode,
822                    binds: if binds.is_empty() { None } else { Some(binds) },
823                    ..Default::default()
824                }),
825                ..Default::default()
826            };
827
828            let create_response = docker_create
829                .create_container(Some(create_options), container_config)
830                .await
831                .map_err(|e| {
832                    let err_str = e.to_string().to_lowercase();
833                    if err_str.contains("409") || err_str.contains("conflict") {
834                        CamelError::ProcessorError(format!(
835                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
836                            container_name_ref
837                        ))
838                    } else {
839                        CamelError::ProcessorError(format!(
840                            "Failed to create container: {}",
841                            e
842                        ))
843                    }
844                })?;
845
846            Ok(create_response.id)
847        },
848        move |container_id| async move {
849            docker_start
850                .start_container(&container_id, None::<StartContainerOptions>)
851                .await
852                .map_err(|e| {
853                    CamelError::ProcessorError(format!(
854                        "Failed to start container: {}",
855                        e
856                    ))
857                })
858        },
859        move |container_id| async move {
860            docker_remove
861                .remove_container(&container_id, None)
862                .await
863                .map_err(|e| {
864                    CamelError::ProcessorError(format!(
865                        "Failed to remove container after start failure: {}",
866                        e
867                    ))
868                })
869        },
870    )
871    .await?;
872
873    track_container(container_id.clone());
874
875    exchange
876        .input
877        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
878    exchange.input.set_header(
879        HEADER_ACTION_RESULT,
880        serde_json::Value::String("success".to_string()),
881    );
882    Ok(())
883}
884
885async fn handle_lifecycle(
886    docker: Docker,
887    _config: ContainerConfig,
888    exchange: &mut Exchange,
889    operation: ProducerOperation,
890    operation_name: &str,
891) -> Result<(), CamelError> {
892    let container_id = exchange
893        .input
894        .header(HEADER_CONTAINER_ID)
895        .and_then(|v| v.as_str().map(|s| s.to_string()))
896        .ok_or_else(|| {
897            CamelError::ProcessorError(format!(
898                "{} header is required for {} operation",
899                HEADER_CONTAINER_ID, operation_name
900            ))
901        })?;
902
903    match operation {
904        ProducerOperation::Start => {
905            docker
906                .start_container(&container_id, None::<StartContainerOptions>)
907                .await
908                .map_err(|e| {
909                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
910                })?;
911        }
912        ProducerOperation::Stop => {
913            docker
914                .stop_container(&container_id, None)
915                .await
916                .map_err(|e| {
917                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
918                })?;
919        }
920        ProducerOperation::Remove => {
921            docker
922                .remove_container(&container_id, None)
923                .await
924                .map_err(|e| {
925                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
926                })?;
927            untrack_container(&container_id);
928        }
929        _ => {}
930    }
931
932    exchange.input.set_header(
933        HEADER_ACTION_RESULT,
934        serde_json::Value::String("success".to_string()),
935    );
936    Ok(())
937}
938
939async fn handle_exec(
940    docker: Docker,
941    config: ContainerConfig,
942    exchange: &mut Exchange,
943) -> Result<(), CamelError> {
944    let container_id = exchange
945        .input
946        .header(HEADER_CONTAINER_ID)
947        .and_then(|v| v.as_str().map(|s| s.to_string()))
948        .or(config.container_id.clone())
949        .ok_or_else(|| {
950            CamelError::ProcessorError(format!(
951                "{} header or containerId param is required for exec operation",
952                HEADER_CONTAINER_ID
953            ))
954        })?;
955
956    let cmd = exchange
957        .input
958        .header(HEADER_CMD)
959        .and_then(|v| v.as_str().map(|s| s.to_string()))
960        .or(config.cmd.clone())
961        .ok_or_else(|| {
962            CamelError::ProcessorError(
963                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
964            )
965        })?;
966
967    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
968    let env_vars = config.parse_env();
969
970    let exec_config = bollard::exec::CreateExecOptions {
971        cmd: Some(cmd_parts),
972        env: env_vars,
973        user: config.user.clone(),
974        working_dir: config.workdir.clone(),
975        attach_stdout: Some(true),
976        attach_stderr: Some(true),
977        ..Default::default()
978    };
979
980    let create_result = docker
981        .create_exec(&container_id, exec_config)
982        .await
983        .map_err(|e| {
984            let err_str = e.to_string().to_lowercase();
985            if err_str.contains("404") || err_str.contains("no such") {
986                CamelError::ProcessorError(format!(
987                    "Container '{}' not found for exec",
988                    container_id
989                ))
990            } else {
991                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
992            }
993        })?;
994
995    let exec_id = create_result.id;
996
997    if config.detach {
998        docker
999            .start_exec(
1000                &exec_id,
1001                Some(bollard::exec::StartExecOptions {
1002                    detach: true,
1003                    ..Default::default()
1004                }),
1005            )
1006            .await
1007            .map_err(|e| {
1008                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
1009            })?;
1010
1011        exchange
1012            .input
1013            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
1014        exchange
1015            .input
1016            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1017    } else {
1018        let start_result = docker
1019            .start_exec(&exec_id, None)
1020            .await
1021            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
1022
1023        let mut output = String::new();
1024
1025        match start_result {
1026            bollard::exec::StartExecResults::Attached {
1027                output: mut stream, ..
1028            } => {
1029                use futures::StreamExt;
1030                while let Some(msg) = stream.next().await {
1031                    match msg {
1032                        Ok(bollard::container::LogOutput::StdOut { message }) => {
1033                            output.push_str(&String::from_utf8_lossy(&message));
1034                        }
1035                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1036                            output.push_str(&String::from_utf8_lossy(&message));
1037                        }
1038                        Ok(_) => {}
1039                        Err(e) => {
1040                            output.push_str(&format!("[error reading stream: {}]", e));
1041                        }
1042                    }
1043                }
1044            }
1045            bollard::exec::StartExecResults::Detached => {}
1046        }
1047
1048        let inspect = docker
1049            .inspect_exec(&exec_id)
1050            .await
1051            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1052
1053        let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1054            CamelError::ProcessorError("container exec returned no exit code".into())
1055        })?;
1056
1057        let output = output.trim_end().to_string();
1058        exchange.input.body = Body::Text(output);
1059        exchange.input.set_header(
1060            HEADER_EXIT_CODE,
1061            serde_json::Value::Number(exit_code.into()),
1062        );
1063        exchange
1064            .input
1065            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1066    }
1067
1068    exchange.input.set_header(
1069        HEADER_ACTION_RESULT,
1070        serde_json::Value::String("success".to_string()),
1071    );
1072    Ok(())
1073}
1074
1075async fn handle_network_create(
1076    docker: Docker,
1077    config: ContainerConfig,
1078    exchange: &mut Exchange,
1079) -> Result<(), CamelError> {
1080    let network_name = exchange
1081        .input
1082        .header(HEADER_CONTAINER_NAME)
1083        .and_then(|v| v.as_str().map(|s| s.to_string()))
1084        .or(config.name.clone())
1085        .ok_or_else(|| {
1086            CamelError::ProcessorError(
1087                "CamelContainerName header or name param is required for network-create"
1088                    .to_string(),
1089            )
1090        })?;
1091
1092    let driver = config.driver.as_deref().unwrap_or("bridge");
1093
1094    let options = NetworkCreateRequest {
1095        name: network_name.clone(),
1096        driver: Some(driver.to_string()),
1097        ..Default::default()
1098    };
1099
1100    let result = docker.create_network(options).await.map_err(|e| {
1101        let err_str = e.to_string().to_lowercase();
1102        if err_str.contains("409") || err_str.contains("already exists") {
1103            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1104        } else {
1105            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1106        }
1107    })?;
1108
1109    let network_id = result.id.clone();
1110    let json_value = serde_json::to_value(&result).map_err(|e| {
1111        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1112    })?;
1113
1114    exchange.input.body = Body::Json(json_value);
1115    exchange
1116        .input
1117        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1118    exchange.input.set_header(
1119        HEADER_ACTION_RESULT,
1120        serde_json::Value::String("success".to_string()),
1121    );
1122    Ok(())
1123}
1124
1125async fn handle_network_connect(
1126    docker: Docker,
1127    config: ContainerConfig,
1128    exchange: &mut Exchange,
1129) -> Result<(), CamelError> {
1130    let network = exchange
1131        .input
1132        .header(HEADER_NETWORK)
1133        .and_then(|v| v.as_str().map(|s| s.to_string()))
1134        .or(config.network.clone())
1135        .ok_or_else(|| {
1136            CamelError::ProcessorError(
1137                "CamelContainerNetwork header or network param is required for network-connect"
1138                    .to_string(),
1139            )
1140        })?;
1141
1142    let container = exchange
1143        .input
1144        .header(HEADER_CONTAINER_ID)
1145        .and_then(|v| v.as_str().map(|s| s.to_string()))
1146        .or(config.container_id.clone())
1147        .ok_or_else(|| {
1148            CamelError::ProcessorError(
1149                "CamelContainerId header or container param is required for network-connect"
1150                    .to_string(),
1151            )
1152        })?;
1153
1154    docker
1155        .connect_network(
1156            &network,
1157            NetworkConnectRequest {
1158                container,
1159                ..Default::default()
1160            },
1161        )
1162        .await
1163        .map_err(|e| {
1164            let err_str = e.to_string().to_lowercase();
1165            if err_str.contains("404") || err_str.contains("not found") {
1166                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1167            } else {
1168                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1169            }
1170        })?;
1171
1172    exchange.input.set_header(
1173        HEADER_ACTION_RESULT,
1174        serde_json::Value::String("success".to_string()),
1175    );
1176    Ok(())
1177}
1178
1179async fn handle_network_disconnect(
1180    docker: Docker,
1181    config: ContainerConfig,
1182    exchange: &mut Exchange,
1183) -> Result<(), CamelError> {
1184    let network = exchange
1185        .input
1186        .header(HEADER_NETWORK)
1187        .and_then(|v| v.as_str().map(|s| s.to_string()))
1188        .or(config.network.clone())
1189        .ok_or_else(|| {
1190            CamelError::ProcessorError(
1191                "CamelContainerNetwork header or network param is required for network-disconnect"
1192                    .to_string(),
1193            )
1194        })?;
1195
1196    let container = exchange
1197        .input
1198        .header(HEADER_CONTAINER_ID)
1199        .and_then(|v| v.as_str().map(|s| s.to_string()))
1200        .or(config.container_id.clone())
1201        .ok_or_else(|| {
1202            CamelError::ProcessorError(
1203                "CamelContainerId header or container param is required for network-disconnect"
1204                    .to_string(),
1205            )
1206        })?;
1207
1208    docker
1209        .disconnect_network(
1210            &network,
1211            NetworkDisconnectRequest {
1212                container,
1213                force: Some(config.force),
1214            },
1215        )
1216        .await
1217        .map_err(|e| {
1218            let err_str = e.to_string().to_lowercase();
1219            if err_str.contains("404") || err_str.contains("not found") {
1220                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1221            } else {
1222                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1223            }
1224        })?;
1225
1226    exchange.input.set_header(
1227        HEADER_ACTION_RESULT,
1228        serde_json::Value::String("success".to_string()),
1229    );
1230    Ok(())
1231}
1232
1233async fn handle_network_remove(
1234    docker: Docker,
1235    config: ContainerConfig,
1236    exchange: &mut Exchange,
1237) -> Result<(), CamelError> {
1238    let network = exchange
1239        .input
1240        .header(HEADER_NETWORK)
1241        .and_then(|v| v.as_str().map(|s| s.to_string()))
1242        .or(config.network.clone())
1243        .ok_or_else(|| {
1244            CamelError::ProcessorError(
1245                "CamelContainerNetwork header or network param is required for network-remove"
1246                    .to_string(),
1247            )
1248        })?;
1249
1250    docker.remove_network(&network).await.map_err(|e| {
1251        let err_str = e.to_string().to_lowercase();
1252        if err_str.contains("404") || err_str.contains("not found") {
1253            CamelError::ProcessorError(format!("Network '{}' not found", network))
1254        } else if err_str.contains("409") || err_str.contains("in use") {
1255            CamelError::ProcessorError(format!(
1256                "Network '{}' is in use and cannot be removed",
1257                network
1258            ))
1259        } else {
1260            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1261        }
1262    })?;
1263
1264    exchange.input.set_header(
1265        HEADER_ACTION_RESULT,
1266        serde_json::Value::String("success".to_string()),
1267    );
1268    Ok(())
1269}
1270
1271async fn handle_network_list(
1272    docker: Docker,
1273    _config: ContainerConfig,
1274    exchange: &mut Exchange,
1275) -> Result<(), CamelError> {
1276    let networks = docker
1277        .list_networks(None::<ListNetworksOptions>)
1278        .await
1279        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1280
1281    let json_value = serde_json::to_value(&networks)
1282        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1283
1284    exchange.input.body = Body::Json(json_value);
1285    exchange.input.set_header(
1286        HEADER_ACTION_RESULT,
1287        serde_json::Value::String("success".to_string()),
1288    );
1289    Ok(())
1290}
1291
1292/// Producer for executing container operations.
1293///
1294/// This producer handles synchronous container operations like listing,
1295/// creating, starting, stopping, and removing containers.
1296#[derive(Clone)]
1297pub struct ContainerProducer {
1298    config: ContainerConfig,
1299    docker: Docker,
1300}
1301
1302impl Service<Exchange> for ContainerProducer {
1303    type Response = Exchange;
1304    type Error = CamelError;
1305    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1306
1307    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1308        Poll::Ready(Ok(()))
1309    }
1310
1311    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1312        let config = self.config.clone();
1313        let docker = self.docker.clone();
1314        Box::pin(async move {
1315            let operation_name = exchange
1316                .input
1317                .header(HEADER_ACTION)
1318                .and_then(|v| v.as_str().map(|s| s.to_string()))
1319                .unwrap_or_else(|| config.operation.clone());
1320
1321            let operation = parse_producer_operation(&operation_name)?;
1322
1323            match operation {
1324                ProducerOperation::List => {
1325                    handle_list(docker, config, &mut exchange).await?;
1326                }
1327                ProducerOperation::Run => {
1328                    handle_run(docker, config, &mut exchange).await?;
1329                }
1330                ProducerOperation::Start => {
1331                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1332                        .await?;
1333                }
1334                ProducerOperation::Stop => {
1335                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1336                        .await?;
1337                }
1338                ProducerOperation::Remove => {
1339                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1340                        .await?;
1341                }
1342                ProducerOperation::Exec => {
1343                    handle_exec(docker, config, &mut exchange).await?;
1344                }
1345                ProducerOperation::NetworkCreate => {
1346                    handle_network_create(docker, config, &mut exchange).await?;
1347                }
1348                ProducerOperation::NetworkConnect => {
1349                    handle_network_connect(docker, config, &mut exchange).await?;
1350                }
1351                ProducerOperation::NetworkDisconnect => {
1352                    handle_network_disconnect(docker, config, &mut exchange).await?;
1353                }
1354                ProducerOperation::NetworkRemove => {
1355                    handle_network_remove(docker, config, &mut exchange).await?;
1356                }
1357                ProducerOperation::NetworkList => {
1358                    handle_network_list(docker, config, &mut exchange).await?;
1359                }
1360            }
1361
1362            Ok(exchange)
1363        })
1364    }
1365}
1366
1367/// Consumer for receiving Docker container events or logs.
1368///
1369/// This consumer subscribes to Docker events or container logs and forwards them
1370/// to the route as exchanges. It implements automatic reconnection on connection failures.
1371pub struct ContainerConsumer {
1372    config: ContainerConfig,
1373    /// ADR-0012 observability handle: `rt.metrics().increment_errors(...)` and
1374    /// `rt.health().force_unhealthy_for_route(...)` calls.
1375    runtime: Arc<dyn RuntimeObservability>,
1376}
1377
1378impl ContainerConsumer {
1379    pub fn new(config: ContainerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
1380        Self { config, runtime }
1381    }
1382}
1383
1384#[async_trait]
1385impl Consumer for ContainerConsumer {
1386    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1387        match self.config.operation.as_str() {
1388            "events" => self.start_events_consumer(context).await,
1389            "logs" => self.start_logs_consumer(context).await,
1390            _ => Err(CamelError::EndpointCreationFailed(format!(
1391                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1392                self.config.operation
1393            ))),
1394        }
1395    }
1396
1397    async fn stop(&mut self) -> Result<(), CamelError> {
1398        Ok(())
1399    }
1400
1401    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1402        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1403    }
1404}
1405
1406impl ContainerConsumer {
1407    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1408        use futures::StreamExt;
1409
1410        let cancel = context.cancel_token();
1411
1412        // Outer reconnect loop: when the inner event stream breaks, this loop
1413        // reconnects. The connect_docker() retry is now backed by
1414        // retry_async_cancelable (migrated from manual loop in rc-k9c).
1415        loop {
1416            let docker = match retry_async_cancelable(
1417                &self.config.reconnect,
1418                Some("container-events"),
1419                || async { self.config.connect_docker().await },
1420                |_| true,
1421                &cancel,
1422            )
1423            .await
1424            {
1425                Ok(d) => d,
1426                Err(_) if context.is_cancelled() => {
1427                    tracing::info!("Container events consumer shutting down");
1428                    return Ok(());
1429                }
1430                Err(e) => {
1431                    self.runtime
1432                        .metrics()
1433                        .increment_errors(context.route_id(), "e:container:events-connect");
1434                    // log-policy: outside-contract
1435                    tracing::error!(error = %e, "Container events consumer exhausted reconnect attempts");
1436                    return Err(e);
1437                }
1438            };
1439
1440            let mut event_stream = docker.events(None::<EventsOptions>);
1441
1442            loop {
1443                tokio::select! {
1444                    _ = context.cancelled() => {
1445                        tracing::info!("Container events consumer shutting down");
1446                        return Ok(());
1447                    }
1448
1449                    msg = event_stream.next() => {
1450                        match msg {
1451                            Some(Ok(event)) => {
1452                                let formatted = format_docker_event(&event);
1453                                let message = Message::new(Body::Text(formatted));
1454                                let exchange = Exchange::new(message);
1455
1456                                if let Err(e) = context.send(exchange).await {
1457                                    // log-policy: system-broken
1458                                    tracing::error!("Failed to send exchange: {:?}", e);
1459                                    break;
1460                                }
1461                            }
1462                            Some(Err(e)) => {
1463                                self.runtime.metrics().increment_errors(context.route_id(), "e:container:events-stream");
1464                                // log-policy: outside-contract
1465                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1466                                break;
1467                            }
1468                            None => {
1469                                tracing::info!("Docker event stream ended. Reconnecting...");
1470                                break;
1471                            }
1472                        }
1473                    }
1474                }
1475            }
1476
1477            tokio::select! {
1478                _ = context.cancelled() => {
1479                    tracing::info!("Container events consumer shutting down");
1480                    return Ok(());
1481                }
1482                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1483            }
1484        }
1485    }
1486
1487    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1488        use futures::StreamExt;
1489
1490        let container_id = self.config.container_id.clone().ok_or_else(|| {
1491            CamelError::EndpointCreationFailed(
1492                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1493                    .to_string(),
1494            )
1495        })?;
1496
1497        let cancel = context.cancel_token();
1498
1499        // Outer reconnect loop: when the inner log stream breaks, this loop
1500        // reconnects. The connect_docker() retry is now backed by
1501        // retry_async_cancelable (migrated from manual loop in rc-k9c).
1502        loop {
1503            let docker = match retry_async_cancelable(
1504                &self.config.reconnect,
1505                Some("container-logs"),
1506                || async { self.config.connect_docker().await },
1507                |_| true,
1508                &cancel,
1509            )
1510            .await
1511            {
1512                Ok(d) => d,
1513                Err(_) if context.is_cancelled() => {
1514                    tracing::info!("Container logs consumer shutting down");
1515                    return Ok(());
1516                }
1517                Err(e) => {
1518                    self.runtime
1519                        .metrics()
1520                        .increment_errors(context.route_id(), "e:container:logs-connect");
1521                    // log-policy: outside-contract
1522                    tracing::error!(error = %e, "Container logs consumer exhausted reconnect attempts");
1523                    return Err(e);
1524                }
1525            };
1526
1527            let tail = self
1528                .config
1529                .tail
1530                .clone()
1531                .unwrap_or_else(|| "all".to_string());
1532
1533            let options = LogsOptions {
1534                follow: self.config.follow,
1535                stdout: true,
1536                stderr: true,
1537                timestamps: self.config.timestamps,
1538                tail,
1539                ..Default::default()
1540            };
1541
1542            let mut log_stream = docker.logs(&container_id, Some(options));
1543            let container_id_header = container_id.clone();
1544
1545            loop {
1546                tokio::select! {
1547                    _ = context.cancelled() => {
1548                        tracing::info!("Container logs consumer shutting down");
1549                        return Ok(());
1550                    }
1551
1552                    msg = log_stream.next() => {
1553                        match msg {
1554                            Some(Ok(log_output)) => {
1555                                let (stream_type, content) = match log_output {
1556                                    bollard::container::LogOutput::StdOut { message } => {
1557                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1558                                    }
1559                                    bollard::container::LogOutput::StdErr { message } => {
1560                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1561                                    }
1562                                    bollard::container::LogOutput::Console { message } => {
1563                                        ("console", String::from_utf8_lossy(&message).into_owned())
1564                                    }
1565                                    bollard::container::LogOutput::StdIn { message } => {
1566                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1567                                    }
1568                                };
1569
1570                                let content = content.trim_end();
1571                                if content.is_empty() {
1572                                    continue;
1573                                }
1574
1575                                let mut message = Message::new(Body::Text(content.to_string()));
1576                                message.set_header(
1577                                    HEADER_CONTAINER_ID,
1578                                    serde_json::Value::String(container_id_header.clone()),
1579                                );
1580                                message.set_header(
1581                                    HEADER_LOG_STREAM,
1582                                    serde_json::Value::String(stream_type.to_string()),
1583                                );
1584
1585                                if self.config.timestamps
1586                                    && let Some(ts) = extract_timestamp(content) {
1587                                        message.set_header(
1588                                            HEADER_LOG_TIMESTAMP,
1589                                            serde_json::Value::String(ts),
1590                                        );
1591                                    }
1592
1593                                let exchange = Exchange::new(message);
1594
1595                                if let Err(e) = context.send(exchange).await {
1596                                    // log-policy: system-broken
1597                                    tracing::error!("Failed to send log exchange: {:?}", e);
1598                                    break;
1599                                }
1600                            }
1601                            Some(Err(e)) => {
1602                                self.runtime.metrics().increment_errors(context.route_id(), "e:container:logs-stream");
1603                                // log-policy: outside-contract
1604                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1605                                break;
1606                            }
1607                            None => {
1608                                if self.config.follow {
1609                                    tracing::info!("Docker log stream ended. Reconnecting...");
1610                                    break;
1611                                } else {
1612                                    tracing::info!("Container logs consumer finished (follow=false)");
1613                                    return Ok(());
1614                                }
1615                            }
1616                        }
1617                    }
1618                }
1619            }
1620
1621            tokio::select! {
1622                _ = context.cancelled() => {
1623                    tracing::info!("Container logs consumer shutting down");
1624                    return Ok(());
1625                }
1626                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1627            }
1628        }
1629    }
1630}
1631
1632fn extract_timestamp(log_line: &str) -> Option<String> {
1633    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1634    if parts.len() > 1 && parts[0].contains('T') {
1635        Some(parts[0].to_string())
1636    } else {
1637        None
1638    }
1639}
1640
1641/// Component for creating container endpoints.
1642///
1643/// This component handles URIs with the "container" scheme and creates
1644/// appropriate producer and consumer endpoints for Docker operations.
1645///
1646/// Containers created via `run` operation are tracked globally and can be
1647/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1648pub struct ContainerComponent {
1649    config: Option<ContainerGlobalConfig>,
1650}
1651
1652impl ContainerComponent {
1653    /// Creates a new container component instance without global config.
1654    pub fn new() -> Self {
1655        Self { config: None }
1656    }
1657
1658    /// Creates a container component with the given global config.
1659    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1660        Self {
1661            config: Some(config),
1662        }
1663    }
1664
1665    /// Creates a container component with optional global config.
1666    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1667        Self { config }
1668    }
1669}
1670
1671impl Default for ContainerComponent {
1672    fn default() -> Self {
1673        Self::new()
1674    }
1675}
1676
1677impl Component for ContainerComponent {
1678    fn scheme(&self) -> &str {
1679        "container"
1680    }
1681
1682    fn create_endpoint(
1683        &self,
1684        uri: &str,
1685        ctx: &dyn camel_component_api::ComponentContext,
1686    ) -> Result<Box<dyn Endpoint>, CamelError> {
1687        let mut config = ContainerConfig::from_uri(uri)?;
1688        // Apply global defaults if present and URI didn't set them
1689        if let Some(ref global) = self.config {
1690            config.apply_global_defaults(global);
1691        }
1692        let health_check = ContainerHealthCheck::new(&config);
1693        ctx.register_current_route_health_check(Arc::new(health_check));
1694        Ok(Box::new(ContainerEndpoint {
1695            uri: uri.to_string(),
1696            config,
1697        }))
1698    }
1699}
1700
1701/// Endpoint for container operations.
1702///
1703/// This endpoint creates producers for executing container operations
1704/// and consumers for receiving container events.
1705// TODO(CON-003): Forward container health status (inspect healthcheck / Health field) to
1706// Camel's health subsystem so the route can react to unhealthy containers.
1707pub struct ContainerEndpoint {
1708    uri: String,
1709    config: ContainerConfig,
1710}
1711
1712impl ContainerEndpoint {
1713    /// Returns the Docker host configured for this endpoint.
1714    /// Returns `None` if not set (for testing purposes).
1715    pub fn docker_host(&self) -> Option<&str> {
1716        self.config.host.as_deref()
1717    }
1718}
1719
1720impl Endpoint for ContainerEndpoint {
1721    fn uri(&self) -> &str {
1722        &self.uri
1723    }
1724
1725    fn create_consumer(
1726        &self,
1727        rt: Arc<dyn camel_component_api::RuntimeObservability>,
1728    ) -> Result<Box<dyn Consumer>, CamelError> {
1729        Ok(Box::new(ContainerConsumer::new(self.config.clone(), rt)))
1730    }
1731
1732    fn create_producer(
1733        &self,
1734        _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1735        _ctx: &ProducerContext,
1736    ) -> Result<BoxProcessor, CamelError> {
1737        let docker = self.config.connect_docker_client()?;
1738        Ok(BoxProcessor::new(ContainerProducer {
1739            config: self.config.clone(),
1740            docker,
1741        }))
1742    }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use camel_component_api::test_support::PanicRuntimeObservability;
1748    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1749        std::sync::Arc::new(PanicRuntimeObservability)
1750    }
1751    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1752        std::sync::Arc::new(PanicRuntimeObservability)
1753    }
1754
1755    use super::*;
1756    use camel_api::MetricsCollector;
1757    use camel_component_api::HealthCheckRegistry;
1758    use camel_component_api::NoOpComponentContext;
1759
1760    #[test]
1761    fn test_container_config() {
1762        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1763        assert_eq!(config.operation, "run");
1764        assert_eq!(config.image.as_deref(), Some("alpine"));
1765        // host is None by default; global config applies it later
1766        assert!(config.host.is_none());
1767    }
1768
1769    #[test]
1770    fn test_global_config_applied_to_endpoint() {
1771        // When global config is set and URI doesn't specify host,
1772        // apply_global_defaults should set host from global config.
1773        let global =
1774            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1775        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1776        assert!(
1777            config.host.is_none(),
1778            "URI without ?host= should leave host as None"
1779        );
1780        config.apply_global_defaults(&global);
1781        assert_eq!(
1782            config.host.as_deref(),
1783            Some("unix:///custom/docker.sock"),
1784            "global docker_host must be applied when URI did not set host"
1785        );
1786    }
1787
1788    #[test]
1789    fn test_uri_param_wins_over_global_config() {
1790        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1791        let global =
1792            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1793        let mut config =
1794            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1795                .unwrap();
1796        assert_eq!(
1797            config.host.as_deref(),
1798            Some("unix:///override.sock"),
1799            "URI-set host should be parsed correctly"
1800        );
1801        config.apply_global_defaults(&global);
1802        assert_eq!(
1803            config.host.as_deref(),
1804            Some("unix:///override.sock"),
1805            "global config must NOT override a host already set by URI"
1806        );
1807    }
1808
1809    #[test]
1810    fn test_container_config_parses_name() {
1811        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1812        assert_eq!(config.name.as_deref(), Some("my-container"));
1813    }
1814
1815    #[test]
1816    fn test_parse_producer_operation_known() {
1817        assert_eq!(
1818            parse_producer_operation("list").unwrap(),
1819            ProducerOperation::List
1820        );
1821        assert_eq!(
1822            parse_producer_operation("run").unwrap(),
1823            ProducerOperation::Run
1824        );
1825        assert_eq!(
1826            parse_producer_operation("start").unwrap(),
1827            ProducerOperation::Start
1828        );
1829        assert_eq!(
1830            parse_producer_operation("stop").unwrap(),
1831            ProducerOperation::Stop
1832        );
1833        assert_eq!(
1834            parse_producer_operation("remove").unwrap(),
1835            ProducerOperation::Remove
1836        );
1837    }
1838
1839    #[test]
1840    fn test_parse_producer_operation_unknown() {
1841        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1842        match err {
1843            CamelError::ProcessorError(msg) => {
1844                assert!(
1845                    msg.contains("Unknown container operation"),
1846                    "Unexpected error message: {}",
1847                    msg
1848                );
1849            }
1850            _ => panic!("Expected ProcessorError for unknown operation"),
1851        }
1852    }
1853
1854    #[test]
1855    fn test_parse_producer_operation_new_variants() {
1856        assert_eq!(
1857            parse_producer_operation("exec").unwrap(),
1858            ProducerOperation::Exec
1859        );
1860        assert_eq!(
1861            parse_producer_operation("network-create").unwrap(),
1862            ProducerOperation::NetworkCreate
1863        );
1864        assert_eq!(
1865            parse_producer_operation("network-connect").unwrap(),
1866            ProducerOperation::NetworkConnect
1867        );
1868        assert_eq!(
1869            parse_producer_operation("network-disconnect").unwrap(),
1870            ProducerOperation::NetworkDisconnect
1871        );
1872        assert_eq!(
1873            parse_producer_operation("network-remove").unwrap(),
1874            ProducerOperation::NetworkRemove
1875        );
1876        assert_eq!(
1877            parse_producer_operation("network-list").unwrap(),
1878            ProducerOperation::NetworkList
1879        );
1880    }
1881
1882    #[test]
1883    fn test_resolve_container_name_header_overrides_config() {
1884        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1885        let mut exchange = Exchange::new(Message::new(""));
1886        exchange.input.set_header(
1887            HEADER_CONTAINER_NAME,
1888            serde_json::Value::String("header-name".to_string()),
1889        );
1890
1891        let resolved = resolve_container_name(&exchange, &config);
1892        assert_eq!(resolved.as_deref(), Some("header-name"));
1893    }
1894
1895    #[test]
1896    fn test_container_config_rejects_tcp_host() {
1897        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1898        let err = config.connect_docker_client().unwrap_err();
1899        match err {
1900            CamelError::ProcessorError(msg) => {
1901                assert!(
1902                    msg.to_lowercase().contains("tcp"),
1903                    "Expected TCP scheme error, got: {}",
1904                    msg
1905                );
1906            }
1907            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1908        }
1909    }
1910
1911    #[tokio::test]
1912    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1913        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1914        let remove_called_clone = remove_called.clone();
1915
1916        let result = run_container_with_cleanup(
1917            || async { Ok("container-123".to_string()) },
1918            |_id| async move {
1919                Err(CamelError::ProcessorError(
1920                    "Failed to start container".to_string(),
1921                ))
1922            },
1923            move |_id| {
1924                let remove_called_inner = remove_called_clone.clone();
1925                async move {
1926                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1927                    Ok(())
1928                }
1929            },
1930        )
1931        .await;
1932
1933        assert!(result.is_err(), "Expected start failure to bubble up");
1934        assert!(
1935            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1936            "Expected cleanup to remove container"
1937        );
1938    }
1939
1940    #[test]
1941    fn test_container_component_creates_endpoint() {
1942        let component = ContainerComponent::new();
1943        assert_eq!(component.scheme(), "container");
1944        let ctx = NoOpComponentContext;
1945        let endpoint = component
1946            .create_endpoint("container:run?image=alpine", &ctx)
1947            .unwrap();
1948        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1949    }
1950
1951    #[test]
1952    fn test_container_config_parses_ports() {
1953        let config =
1954            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1955        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1956    }
1957
1958    #[test]
1959    fn test_container_config_parses_env() {
1960        let config =
1961            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1962        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1963    }
1964
1965    #[test]
1966    fn test_container_config_parses_logs_options() {
1967        let config = ContainerConfig::from_uri(
1968            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1969        )
1970        .unwrap();
1971        assert_eq!(config.operation, "logs");
1972        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1973        assert!(config.follow);
1974        assert!(config.timestamps);
1975        assert_eq!(config.tail.as_deref(), Some("100"));
1976    }
1977
1978    #[test]
1979    fn test_container_config_logs_defaults() {
1980        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1981        assert!(config.follow); // default: true
1982        assert!(!config.timestamps); // default: false
1983        assert!(config.tail.is_none()); // default: None (all)
1984    }
1985
1986    #[test]
1987    fn test_parse_ports_single() {
1988        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1989        let (exposed, bindings) = config.parse_ports().unwrap();
1990
1991        assert!(exposed.contains(&"80/tcp".to_string()));
1992        assert!(bindings.contains_key("80/tcp"));
1993
1994        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1995        assert_eq!(binding.len(), 1);
1996        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1997    }
1998
1999    #[test]
2000    fn test_parse_ports_multiple() {
2001        let config =
2002            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
2003        let (exposed, bindings) = config.parse_ports().unwrap();
2004
2005        assert!(exposed.contains(&"80/tcp".to_string()));
2006        assert!(exposed.contains(&"443/tcp".to_string()));
2007        assert_eq!(bindings.len(), 2);
2008    }
2009
2010    #[test]
2011    fn test_parse_ports_with_protocol() {
2012        let config =
2013            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
2014                .unwrap();
2015        let (exposed, _bindings) = config.parse_ports().unwrap();
2016
2017        assert!(exposed.contains(&"80/tcp".to_string()));
2018        assert!(exposed.contains(&"53/udp".to_string()));
2019    }
2020
2021    #[test]
2022    fn test_parse_ports_none() {
2023        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2024        let (exposed, bindings) = config.parse_ports().unwrap();
2025        assert!(exposed.is_empty());
2026        assert!(bindings.is_empty());
2027    }
2028
2029    #[test]
2030    fn test_parse_env_single() {
2031        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
2032        let env = config.parse_env().unwrap();
2033
2034        assert_eq!(env.len(), 1);
2035        assert_eq!(env[0], "FOO=bar");
2036    }
2037
2038    #[test]
2039    fn test_parse_env_multiple() {
2040        let config =
2041            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
2042                .unwrap();
2043        let env = config.parse_env().unwrap();
2044
2045        assert_eq!(env.len(), 3);
2046        assert!(env.contains(&"FOO=bar".to_string()));
2047        assert!(env.contains(&"BAZ=qux".to_string()));
2048        assert!(env.contains(&"NUM=123".to_string()));
2049    }
2050
2051    #[test]
2052    fn test_parse_env_none() {
2053        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2054        assert!(config.parse_env().is_none());
2055    }
2056
2057    use camel_component_api::Message;
2058    use std::sync::Arc;
2059
2060    #[tokio::test]
2061    async fn test_container_producer_resolves_operation_from_header() {
2062        // Try to connect to Docker - if fails, return early
2063        let docker = match Docker::connect_with_local_defaults() {
2064            Ok(d) => d,
2065            Err(_) => {
2066                eprintln!("Skipping test: Could not connect to Docker daemon");
2067                return;
2068            }
2069        };
2070
2071        if docker.ping().await.is_err() {
2072            eprintln!("Skipping test: Docker daemon not responding to ping");
2073            return;
2074        }
2075
2076        let component = ContainerComponent::new();
2077        let ctx = NoOpComponentContext;
2078        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2079
2080        let ctx = ProducerContext::new();
2081        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2082
2083        let mut exchange = Exchange::new(Message::new(""));
2084        exchange
2085            .input
2086            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2087
2088        use tower::ServiceExt;
2089        let result = producer
2090            .ready()
2091            .await
2092            .unwrap()
2093            .call(exchange)
2094            .await
2095            .unwrap();
2096
2097        // For list operation, the result header should be on the input message
2098        assert_eq!(
2099            result
2100                .input
2101                .header(HEADER_ACTION_RESULT)
2102                .map(|v| v.as_str().unwrap()),
2103            Some("success")
2104        );
2105    }
2106
2107    #[tokio::test]
2108    async fn test_container_producer_connection_error_on_invalid_host() {
2109        // Test that an invalid host (nonexistent socket) results in a connection error
2110        let component = ContainerComponent::new();
2111        let ctx = NoOpComponentContext;
2112        let endpoint = component
2113            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2114            .unwrap();
2115
2116        let ctx = ProducerContext::new();
2117        let result = endpoint.create_producer(rt(), &ctx);
2118
2119        // The producer should return an error because it cannot connect to the invalid socket
2120        assert!(
2121            result.is_err(),
2122            "Expected error when connecting to invalid host"
2123        );
2124        let err = result.unwrap_err();
2125        match &err {
2126            CamelError::ProcessorError(msg) => {
2127                assert!(
2128                    msg.to_lowercase().contains("connection")
2129                        || msg.to_lowercase().contains("connect")
2130                        || msg.to_lowercase().contains("socket")
2131                        || msg.contains("docker"),
2132                    "Error message should indicate connection failure, got: {}",
2133                    msg
2134                );
2135            }
2136            _ => panic!("Expected ProcessorError, got: {:?}", err),
2137        }
2138    }
2139
2140    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2141    #[tokio::test]
2142    async fn test_container_producer_lifecycle_operations_missing_id() {
2143        // Try to connect to Docker - if fails, return early
2144        let docker = match Docker::connect_with_local_defaults() {
2145            Ok(d) => d,
2146            Err(_) => {
2147                eprintln!("Skipping test: Could not connect to Docker daemon");
2148                return;
2149            }
2150        };
2151
2152        if docker.ping().await.is_err() {
2153            eprintln!("Skipping test: Docker daemon not responding to ping");
2154            return;
2155        }
2156
2157        let component = ContainerComponent::new();
2158        let ctx = NoOpComponentContext;
2159        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2160        let ctx = ProducerContext::new();
2161        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2162
2163        // Test each lifecycle operation without CamelContainerId header
2164        for operation in ["start", "stop", "remove"] {
2165            let mut exchange = Exchange::new(Message::new(""));
2166            exchange.input.set_header(
2167                HEADER_ACTION,
2168                serde_json::Value::String(operation.to_string()),
2169            );
2170            // Deliberately NOT setting CamelContainerId header
2171
2172            use tower::ServiceExt;
2173            let result = producer.ready().await.unwrap().call(exchange).await;
2174
2175            assert!(
2176                result.is_err(),
2177                "Expected error for {} operation without CamelContainerId",
2178                operation
2179            );
2180            let err = result.unwrap_err();
2181            match &err {
2182                CamelError::ProcessorError(msg) => {
2183                    assert!(
2184                        msg.contains(HEADER_CONTAINER_ID),
2185                        "Error message should mention {}, got: {}",
2186                        HEADER_CONTAINER_ID,
2187                        msg
2188                    );
2189                }
2190                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2191            }
2192        }
2193    }
2194
2195    /// Test that stop operation returns an error for a nonexistent container.
2196    #[tokio::test]
2197    async fn test_container_producer_stop_nonexistent() {
2198        // Try to connect to Docker - if fails, return early
2199        let docker = match Docker::connect_with_local_defaults() {
2200            Ok(d) => d,
2201            Err(_) => {
2202                eprintln!("Skipping test: Could not connect to Docker daemon");
2203                return;
2204            }
2205        };
2206
2207        if docker.ping().await.is_err() {
2208            eprintln!("Skipping test: Docker daemon not responding to ping");
2209            return;
2210        }
2211
2212        let component = ContainerComponent::new();
2213        let ctx = NoOpComponentContext;
2214        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2215        let ctx = ProducerContext::new();
2216        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2217
2218        let mut exchange = Exchange::new(Message::new(""));
2219        exchange
2220            .input
2221            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2222        exchange.input.set_header(
2223            HEADER_CONTAINER_ID,
2224            serde_json::Value::String("nonexistent-container-123".into()),
2225        );
2226
2227        use tower::ServiceExt;
2228        let result = producer.ready().await.unwrap().call(exchange).await;
2229
2230        assert!(
2231            result.is_err(),
2232            "Expected error when stopping nonexistent container"
2233        );
2234        let err = result.unwrap_err();
2235        match &err {
2236            CamelError::ProcessorError(msg) => {
2237                // Docker API returns 404 with "No such container" message
2238                assert!(
2239                    msg.to_lowercase().contains("no such container")
2240                        || msg.to_lowercase().contains("not found")
2241                        || msg.contains("404"),
2242                    "Error message should indicate container not found, got: {}",
2243                    msg
2244                );
2245            }
2246            _ => panic!("Expected ProcessorError, got: {:?}", err),
2247        }
2248    }
2249
2250    /// Test that run operation returns an error when no image is provided.
2251    #[tokio::test]
2252    async fn test_container_producer_run_missing_image() {
2253        // Try to connect to Docker - if fails, return early
2254        let docker = match Docker::connect_with_local_defaults() {
2255            Ok(d) => d,
2256            Err(_) => {
2257                eprintln!("Skipping test: Could not connect to Docker daemon");
2258                return;
2259            }
2260        };
2261
2262        if docker.ping().await.is_err() {
2263            eprintln!("Skipping test: Docker daemon not responding to ping");
2264            return;
2265        }
2266
2267        // Create producer without an image in the URI
2268        let component = ContainerComponent::new();
2269        let ctx = NoOpComponentContext;
2270        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2271        let ctx = ProducerContext::new();
2272        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2273
2274        let mut exchange = Exchange::new(Message::new(""));
2275        exchange
2276            .input
2277            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2278        // Deliberately NOT setting CamelContainerImage header
2279
2280        use tower::ServiceExt;
2281        let result = producer.ready().await.unwrap().call(exchange).await;
2282
2283        assert!(
2284            result.is_err(),
2285            "Expected error for run operation without image"
2286        );
2287        let err = result.unwrap_err();
2288        match &err {
2289            CamelError::ProcessorError(msg) => {
2290                assert!(
2291                    msg.to_lowercase().contains("image"),
2292                    "Error message should mention 'image', got: {}",
2293                    msg
2294                );
2295            }
2296            _ => panic!("Expected ProcessorError, got: {:?}", err),
2297        }
2298    }
2299
2300    /// Test that run operation uses image from header.
2301    #[tokio::test]
2302    async fn test_container_producer_run_image_from_header() {
2303        // Try to connect to Docker - if fails, return early
2304        let docker = match Docker::connect_with_local_defaults() {
2305            Ok(d) => d,
2306            Err(_) => {
2307                eprintln!("Skipping test: Could not connect to Docker daemon");
2308                return;
2309            }
2310        };
2311
2312        if docker.ping().await.is_err() {
2313            eprintln!("Skipping test: Docker daemon not responding to ping");
2314            return;
2315        }
2316
2317        // Create producer without an image in the URI
2318        let component = ContainerComponent::new();
2319        let ctx = NoOpComponentContext;
2320        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2321        let ctx = ProducerContext::new();
2322        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2323
2324        let mut exchange = Exchange::new(Message::new(""));
2325        exchange
2326            .input
2327            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2328        // Set a non-existent image to test that the run operation attempts to use it
2329        exchange.input.set_header(
2330            HEADER_IMAGE,
2331            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2332        );
2333
2334        use tower::ServiceExt;
2335        let result = producer.ready().await.unwrap().call(exchange).await;
2336
2337        // The operation should fail because the image doesn't exist
2338        assert!(
2339            result.is_err(),
2340            "Expected error when running container with nonexistent image"
2341        );
2342        let err = result.unwrap_err();
2343        match &err {
2344            CamelError::ProcessorError(msg) => {
2345                // Docker API returns an error about the missing image
2346                assert!(
2347                    msg.to_lowercase().contains("no such image")
2348                        || msg.to_lowercase().contains("not found")
2349                        || msg.to_lowercase().contains("image")
2350                        || msg.to_lowercase().contains("pull")
2351                        || msg.contains("404"),
2352                    "Error message should indicate image issue, got: {}",
2353                    msg
2354                );
2355            }
2356            _ => panic!("Expected ProcessorError, got: {:?}", err),
2357        }
2358    }
2359
2360    /// Integration test: Actually run a container with alpine:latest.
2361    /// This test verifies the full flow: create → start → set headers.
2362    #[tokio::test]
2363    async fn test_container_producer_run_alpine_container() {
2364        let docker = match Docker::connect_with_local_defaults() {
2365            Ok(d) => d,
2366            Err(_) => {
2367                eprintln!("Skipping test: Could not connect to Docker daemon");
2368                return;
2369            }
2370        };
2371
2372        if docker.ping().await.is_err() {
2373            eprintln!("Skipping test: Docker daemon not responding to ping");
2374            return;
2375        }
2376
2377        // Pull alpine:latest if not present
2378        let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2379        let has_alpine = images
2380            .iter()
2381            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2382
2383        if !has_alpine {
2384            eprintln!("Pulling alpine:latest image...");
2385            let mut stream = docker.create_image(
2386                Some(CreateImageOptions {
2387                    from_image: Some("alpine:latest".to_string()),
2388                    ..Default::default()
2389                }),
2390                None,
2391                None,
2392            );
2393
2394            use futures::StreamExt;
2395            while let Some(_item) = stream.next().await {
2396                // Wait for pull to complete
2397            }
2398            eprintln!("Image pulled successfully");
2399        }
2400
2401        // Create producer
2402        let component = ContainerComponent::new();
2403        let ctx = NoOpComponentContext;
2404        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2405        let ctx = ProducerContext::new();
2406        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2407
2408        // Run container with unique name
2409        let timestamp = std::time::SystemTime::now()
2410            .duration_since(std::time::UNIX_EPOCH)
2411            .unwrap()
2412            .as_millis();
2413        let container_name = format!("test-rust-camel-{}", timestamp);
2414        let mut exchange = Exchange::new(Message::new(""));
2415        exchange.input.set_header(
2416            HEADER_IMAGE,
2417            serde_json::Value::String("alpine:latest".into()),
2418        );
2419        exchange.input.set_header(
2420            HEADER_CONTAINER_NAME,
2421            serde_json::Value::String(container_name.clone()),
2422        );
2423
2424        use tower::ServiceExt;
2425        let result = producer
2426            .ready()
2427            .await
2428            .unwrap()
2429            .call(exchange)
2430            .await
2431            .expect("Container run should succeed");
2432
2433        // Verify container ID was set
2434        let container_id = result
2435            .input
2436            .header(HEADER_CONTAINER_ID)
2437            .and_then(|v| v.as_str().map(|s| s.to_string()))
2438            .expect("Expected container ID header");
2439        assert!(!container_id.is_empty(), "Container ID should not be empty");
2440
2441        // Verify success header
2442        assert_eq!(
2443            result
2444                .input
2445                .header(HEADER_ACTION_RESULT)
2446                .and_then(|v| v.as_str()),
2447            Some("success")
2448        );
2449
2450        // Verify container exists in Docker
2451        let inspect = docker
2452            .inspect_container(&container_id, None)
2453            .await
2454            .expect("Container should exist");
2455        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2456
2457        // Cleanup: remove container
2458        docker
2459            .remove_container(
2460                &container_id,
2461                Some(RemoveContainerOptions {
2462                    force: true,
2463                    ..Default::default()
2464                }),
2465            )
2466            .await
2467            .ok();
2468
2469        eprintln!("✅ Container {} created and cleaned up", container_id);
2470    }
2471
2472    /// Test that consumer returns an error for unsupported operations.
2473    #[tokio::test]
2474    async fn test_container_consumer_unsupported_operation() {
2475        use tokio::sync::mpsc;
2476
2477        let component = ContainerComponent::new();
2478        let ctx = NoOpComponentContext;
2479        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2480        let mut consumer = endpoint.create_consumer(rt()).unwrap();
2481
2482        // Create a minimal ConsumerContext
2483        let (tx, _rx) = mpsc::channel(16);
2484        let cancel_token = tokio_util::sync::CancellationToken::new();
2485        let context = ConsumerContext::new(tx, cancel_token, "container-test-route".to_string());
2486
2487        let result = consumer.start(context).await;
2488
2489        // Should return error because "run" is not a supported consumer operation
2490        assert!(
2491            result.is_err(),
2492            "Expected error for unsupported consumer operation"
2493        );
2494        let err = result.unwrap_err();
2495        match &err {
2496            CamelError::EndpointCreationFailed(msg) => {
2497                assert!(
2498                    msg.contains("Consumer only supports 'events' or 'logs'"),
2499                    "Error message should mention events or logs support, got: {}",
2500                    msg
2501                );
2502            }
2503            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2504        }
2505    }
2506
2507    #[test]
2508    fn test_container_consumer_concurrency_model_is_concurrent() {
2509        let consumer = ContainerConsumer {
2510            config: ContainerConfig::from_uri("container:events").unwrap(),
2511            runtime: test_rt(),
2512        };
2513
2514        assert_eq!(
2515            consumer.concurrency_model(),
2516            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2517        );
2518    }
2519
2520    /// Test that consumer gracefully shuts down when cancellation is requested.
2521    /// This test requires a running Docker daemon. If Docker is not available, the test
2522    /// will be ignored.
2523    #[tokio::test]
2524    async fn test_container_consumer_cancellation() {
2525        use std::sync::atomic::{AtomicBool, Ordering};
2526        use tokio::sync::mpsc;
2527
2528        // Try to connect to Docker - if fails, return early
2529        let docker = match Docker::connect_with_local_defaults() {
2530            Ok(d) => d,
2531            Err(_) => {
2532                eprintln!("Skipping test: Could not connect to Docker daemon");
2533                return;
2534            }
2535        };
2536
2537        if docker.ping().await.is_err() {
2538            eprintln!("Skipping test: Docker daemon not responding to ping");
2539            return;
2540        }
2541
2542        let component = ContainerComponent::new();
2543        let ctx = NoOpComponentContext;
2544        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2545        let mut consumer = endpoint.create_consumer(rt()).unwrap();
2546
2547        // Create a ConsumerContext
2548        let (tx, _rx) = mpsc::channel(16);
2549        let cancel_token = tokio_util::sync::CancellationToken::new();
2550        let context =
2551            ConsumerContext::new(tx, cancel_token.clone(), "container-test-route".to_string());
2552
2553        // Track if the consumer task has completed
2554        let completed = Arc::new(AtomicBool::new(false));
2555        let completed_clone = completed.clone();
2556
2557        // Spawn consumer in background
2558        let handle = tokio::spawn(async move {
2559            let result = consumer.start(context).await;
2560            // Mark as completed when done
2561            completed_clone.store(true, Ordering::SeqCst);
2562            result
2563        });
2564
2565        // Wait a bit for the consumer to start
2566        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2567
2568        // Consumer should still be running (not completed)
2569        assert!(
2570            !completed.load(Ordering::SeqCst),
2571            "Consumer should still be running before cancellation"
2572        );
2573
2574        // Request cancellation
2575        cancel_token.cancel();
2576
2577        // Wait for the task to complete (with timeout)
2578        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2579
2580        // Task should have completed (not timed out)
2581        assert!(
2582            result.is_ok(),
2583            "Consumer should gracefully shut down after cancellation"
2584        );
2585
2586        // Verify the consumer completed
2587        assert!(
2588            completed.load(Ordering::SeqCst),
2589            "Consumer should have completed after cancellation"
2590        );
2591    }
2592
2593    /// Integration test for listing containers.
2594    /// This test requires a running Docker daemon. If Docker is not available, the test
2595    /// will return early and be effectively ignored.
2596    #[tokio::test]
2597    async fn test_container_producer_list_containers() {
2598        // Try to connect to Docker using default config
2599        // If ping fails, return early (effectively ignoring this test)
2600        let docker = match Docker::connect_with_local_defaults() {
2601            Ok(d) => d,
2602            Err(_) => {
2603                eprintln!("Skipping test: Could not connect to Docker daemon");
2604                return;
2605            }
2606        };
2607
2608        if docker.ping().await.is_err() {
2609            eprintln!("Skipping test: Docker daemon not responding to ping");
2610            return;
2611        }
2612
2613        // Create producer with list operation
2614        let component = ContainerComponent::new();
2615        let ctx = NoOpComponentContext;
2616        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2617
2618        let ctx = ProducerContext::new();
2619        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2620
2621        // Create exchange with list operation in header
2622        let mut exchange = Exchange::new(Message::new(""));
2623        exchange
2624            .input
2625            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2626
2627        // Call the producer
2628        use tower::ServiceExt;
2629        let result = producer
2630            .ready()
2631            .await
2632            .unwrap()
2633            .call(exchange)
2634            .await
2635            .expect("Producer should succeed when Docker is available");
2636
2637        // Assert that the input exchange body is a JSON array
2638        // (because list_containers should put the JSON result in the body)
2639        match &result.input.body {
2640            camel_component_api::Body::Json(json_value) => {
2641                assert!(
2642                    json_value.is_array(),
2643                    "Expected input body to be a JSON array, got: {:?}",
2644                    json_value
2645                );
2646            }
2647            other => panic!("Expected Body::Json with array, got: {:?}", other),
2648        }
2649    }
2650
2651    #[test]
2652    fn test_container_config_parses_volumes() {
2653        let config = ContainerConfig::from_uri(
2654            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2655        )
2656        .unwrap();
2657        assert_eq!(
2658            config.volumes.as_deref(),
2659            Some("./html:/usr/share/nginx/html:ro")
2660        );
2661    }
2662
2663    #[test]
2664    fn test_container_config_parses_exec_params() {
2665        let config = ContainerConfig::from_uri(
2666            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2667        )
2668        .unwrap();
2669        assert_eq!(config.operation, "exec");
2670        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2671        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2672        assert_eq!(config.user.as_deref(), Some("root"));
2673        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2674        assert!(config.detach);
2675    }
2676
2677    #[test]
2678    fn test_container_config_parses_network_create_params() {
2679        let config =
2680            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2681                .unwrap();
2682        assert_eq!(config.operation, "network-create");
2683        assert_eq!(config.name.as_deref(), Some("my-net"));
2684        assert_eq!(config.driver.as_deref(), Some("bridge"));
2685    }
2686
2687    #[test]
2688    fn test_container_config_defaults_new_fields() {
2689        let config = ContainerConfig::from_uri("container:list").unwrap();
2690        assert!(config.volumes.is_none());
2691        assert!(config.user.is_none());
2692        assert!(config.workdir.is_none());
2693        assert!(!config.detach);
2694        assert!(config.driver.is_none());
2695        assert!(!config.force);
2696    }
2697
2698    #[test]
2699    fn test_parse_volumes_bind_mount() {
2700        let config = ContainerConfig::from_uri(
2701            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2702        )
2703        .unwrap();
2704        let (binds, anon) = config.parse_volumes().unwrap();
2705        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2706        assert!(anon.is_empty());
2707    }
2708
2709    #[test]
2710    fn test_parse_volumes_named_volume() {
2711        let config =
2712            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2713                .unwrap();
2714        let (binds, anon) = config.parse_volumes().unwrap();
2715        assert_eq!(binds, vec!["data:/var/lib/data"]);
2716        assert!(anon.is_empty());
2717    }
2718
2719    #[test]
2720    fn test_parse_volumes_anonymous() {
2721        let config =
2722            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2723        let (binds, anon) = config.parse_volumes().unwrap();
2724        assert!(binds.is_empty());
2725        assert!(anon.contains(&"/tmp/app-data".to_string()));
2726    }
2727
2728    #[test]
2729    fn test_parse_volumes_anonymous_with_mode() {
2730        let config =
2731            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2732                .unwrap();
2733        let (binds, anon) = config.parse_volumes().unwrap();
2734        assert!(binds.is_empty());
2735        assert!(anon.contains(&"/tmp/app-data".to_string()));
2736    }
2737
2738    #[test]
2739    fn test_parse_volumes_multiple() {
2740        let config = ContainerConfig::from_uri(
2741            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2742        )
2743        .unwrap();
2744        let (binds, anon) = config.parse_volumes().unwrap();
2745        assert_eq!(binds.len(), 2);
2746        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2747        assert!(binds.contains(&"data:/var/log/app".to_string()));
2748        assert!(anon.is_empty());
2749    }
2750
2751    #[test]
2752    fn test_parse_volumes_mixed() {
2753        let config = ContainerConfig::from_uri(
2754            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2755        )
2756        .unwrap();
2757        let (binds, anon) = config.parse_volumes().unwrap();
2758        assert_eq!(binds.len(), 1);
2759        assert!(anon.contains(&"/tmp/cache".to_string()));
2760    }
2761
2762    #[test]
2763    fn test_parse_volumes_none() {
2764        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2765        assert!(config.parse_volumes().is_none());
2766    }
2767
2768    #[test]
2769    fn test_parse_volumes_empty_entry_skipped() {
2770        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2771        assert!(config.parse_volumes().is_none());
2772    }
2773
2774    #[test]
2775    fn test_parse_volumes_rw_mode() {
2776        let config =
2777            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2778                .unwrap();
2779        let (binds, _) = config.parse_volumes().unwrap();
2780        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2781    }
2782
2783    #[test]
2784    fn test_container_config_from_uri_parses_false_flags() {
2785        let config = ContainerConfig::from_uri(
2786            "container:logs?containerId=a&follow=false&timestamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2787        )
2788        .unwrap();
2789        assert!(!config.follow);
2790        assert!(!config.timestamps);
2791        assert!(!config.auto_pull);
2792        assert!(!config.auto_remove);
2793        assert!(config.detach);
2794        assert!(config.force);
2795    }
2796
2797    #[test]
2798    fn test_docker_socket_path_validation() {
2799        let unix_cfg =
2800            ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2801        assert_eq!(
2802            unix_cfg.docker_socket_path().unwrap(),
2803            "unix:///tmp/docker.sock"
2804        );
2805
2806        let npipe_cfg =
2807            ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2808                .unwrap();
2809        assert_eq!(
2810            npipe_cfg.docker_socket_path().unwrap(),
2811            "npipe:////./pipe/docker_engine"
2812        );
2813
2814        let plain_cfg =
2815            ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2816        assert_eq!(
2817            plain_cfg.docker_socket_path().unwrap(),
2818            "/var/run/docker.sock"
2819        );
2820
2821        let bad_cfg =
2822            ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2823        assert!(bad_cfg.docker_socket_path().is_err());
2824    }
2825
2826    #[test]
2827    fn test_parse_ports_invalid_and_whitespace_entries() {
2828        // "8080" without colon is malformed — must fail-fast
2829        let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2830        let err = cfg.parse_ports().unwrap_err();
2831        assert!(
2832            err.to_string().contains("malformed port mapping"),
2833            "expected malformed port error, got: {}",
2834            err
2835        );
2836
2837        let cfg =
2838            ContainerConfig::from_uri("container:run?ports= 8080:80 ,  5353:53/udp ").unwrap();
2839        let (exposed, bindings) = cfg.parse_ports().unwrap();
2840        assert!(exposed.contains(&"80/tcp".to_string()));
2841        assert!(exposed.contains(&"53/udp".to_string()));
2842        assert_eq!(bindings.len(), 2);
2843    }
2844
2845    #[test]
2846    fn test_parse_ports_fail_fast_on_malformed() {
2847        // First entry valid, second malformed — must fail on the malformed one
2848        let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2849        let err = cfg.parse_ports().unwrap_err();
2850        assert!(
2851            err.to_string()
2852                .contains("malformed port mapping 'badentry'"),
2853            "expected fail-fast on 'badentry', got: {}",
2854            err
2855        );
2856    }
2857
2858    #[test]
2859    fn test_parse_env_trims_and_filters_empty_items() {
2860        let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2861        let env = cfg.parse_env().unwrap();
2862        assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2863
2864        let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2865        assert!(cfg.parse_env().is_none());
2866    }
2867
2868    #[test]
2869    fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2870        assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2871        let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2872        assert!(binds.contains(&"a:/b:rw".to_string()));
2873        assert!(anon.contains(&"/tmp/cache".to_string()));
2874        assert!(anon.contains(&"/tmp/logs".to_string()));
2875    }
2876
2877    #[test]
2878    fn test_format_docker_event_variants_and_timestamp_extraction() {
2879        let mut attrs = std::collections::HashMap::new();
2880        attrs.insert("name".to_string(), "demo".to_string());
2881        attrs.insert("image".to_string(), "alpine:latest".to_string());
2882        attrs.insert("exitCode".to_string(), "137".to_string());
2883        let actor = bollard::models::EventActor {
2884            id: None,
2885            attributes: Some(attrs),
2886        };
2887
2888        let create_event = bollard::models::EventMessage {
2889            action: Some("create".to_string()),
2890            actor: Some(actor.clone()),
2891            ..Default::default()
2892        };
2893        assert_eq!(
2894            format_docker_event(&create_event),
2895            "[CREATE] Container demo (alpine:latest)"
2896        );
2897
2898        let die_event = bollard::models::EventMessage {
2899            action: Some("die".to_string()),
2900            actor: Some(actor),
2901            ..Default::default()
2902        };
2903        assert_eq!(
2904            format_docker_event(&die_event),
2905            "[DIE]    Container demo (exit: 137)"
2906        );
2907
2908        let other_event = bollard::models::EventMessage {
2909            action: Some("oom".to_string()),
2910            actor: None,
2911            ..Default::default()
2912        };
2913        assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2914
2915        assert_eq!(
2916            extract_timestamp("2024-01-01T00:00:00Z hello"),
2917            Some("2024-01-01T00:00:00Z".to_string())
2918        );
2919        assert_eq!(extract_timestamp("hello world"), None);
2920    }
2921
2922    #[tokio::test]
2923    async fn test_run_container_with_cleanup_error_paths() {
2924        let create_fail = run_container_with_cleanup(
2925            || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2926            |_id| async move { Ok(()) },
2927            |_id| async move { Ok(()) },
2928        )
2929        .await;
2930        assert!(
2931            matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2932        );
2933
2934        let cleanup_fail = run_container_with_cleanup(
2935            || async { Ok("cid-1".to_string()) },
2936            |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2937            |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2938        )
2939        .await;
2940        match cleanup_fail {
2941            Err(CamelError::ProcessorError(msg)) => {
2942                assert!(msg.contains("Failed to start container"));
2943                assert!(msg.contains("Cleanup failed"));
2944            }
2945            other => panic!("unexpected result: {:?}", other),
2946        }
2947    }
2948
2949    #[tokio::test]
2950    async fn test_container_producer_network_lifecycle() {
2951        let docker = match Docker::connect_with_local_defaults() {
2952            Ok(d) => d,
2953            Err(_) => {
2954                eprintln!("Skipping test: Could not connect to Docker daemon");
2955                return;
2956            }
2957        };
2958        if docker.ping().await.is_err() {
2959            eprintln!("Skipping test: Docker daemon not responding to ping");
2960            return;
2961        }
2962
2963        let network_name = format!("camel-test-{}", std::process::id());
2964
2965        let component = ContainerComponent::new();
2966        let component_ctx = NoOpComponentContext;
2967
2968        // Create network
2969        let endpoint = component
2970            .create_endpoint(
2971                &format!("container:network-create?name={}", network_name),
2972                &component_ctx,
2973            )
2974            .unwrap();
2975        let ctx = ProducerContext::new();
2976        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2977
2978        let mut exchange = Exchange::new(Message::new(""));
2979        exchange.input.set_header(
2980            HEADER_ACTION,
2981            serde_json::Value::String("network-create".into()),
2982        );
2983
2984        use tower::ServiceExt;
2985        let result = producer
2986            .ready()
2987            .await
2988            .unwrap()
2989            .call(exchange)
2990            .await
2991            .expect("Network create should succeed");
2992
2993        let network_id = result
2994            .input
2995            .header(HEADER_NETWORK)
2996            .and_then(|v| v.as_str().map(|s| s.to_string()))
2997            .expect("Should have network ID");
2998
2999        assert!(!network_id.is_empty());
3000
3001        // List networks
3002        let endpoint = component
3003            .create_endpoint("container:network-list", &component_ctx)
3004            .unwrap();
3005        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3006
3007        let mut exchange = Exchange::new(Message::new(""));
3008        exchange.input.set_header(
3009            HEADER_ACTION,
3010            serde_json::Value::String("network-list".into()),
3011        );
3012
3013        let list_result = producer
3014            .ready()
3015            .await
3016            .unwrap()
3017            .call(exchange)
3018            .await
3019            .expect("Network list should succeed");
3020
3021        match &list_result.input.body {
3022            Body::Json(json_value) => {
3023                assert!(json_value.is_array(), "Expected JSON array");
3024            }
3025            other => panic!("Expected Body::Json, got: {:?}", other),
3026        }
3027
3028        // Remove network
3029        let endpoint = component
3030            .create_endpoint(
3031                &format!("container:network-remove?network={}", network_name),
3032                &component_ctx,
3033            )
3034            .unwrap();
3035        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3036
3037        let mut exchange = Exchange::new(Message::new(""));
3038        exchange.input.set_header(
3039            HEADER_ACTION,
3040            serde_json::Value::String("network-remove".into()),
3041        );
3042
3043        let remove_result = producer
3044            .ready()
3045            .await
3046            .unwrap()
3047            .call(exchange)
3048            .await
3049            .expect("Network remove should succeed");
3050
3051        let action_result = remove_result
3052            .input
3053            .header(HEADER_ACTION_RESULT)
3054            .and_then(|v| v.as_str());
3055        assert_eq!(action_result, Some("success"));
3056    }
3057
3058    #[tokio::test]
3059    async fn test_logs_consumer_requires_container_id() {
3060        use tokio::sync::mpsc;
3061
3062        let mut consumer = ContainerConsumer {
3063            config: ContainerConfig::from_uri("container:logs").unwrap(),
3064            runtime: test_rt(),
3065        };
3066        let (tx, _rx) = mpsc::channel(4);
3067        let context = ConsumerContext::new(
3068            tx,
3069            tokio_util::sync::CancellationToken::new(),
3070            "container-test-route".to_string(),
3071        );
3072
3073        let err = consumer.start(context).await.unwrap_err();
3074        match err {
3075            CamelError::EndpointCreationFailed(msg) => {
3076                assert!(msg.contains("containerId is required for logs consumer"));
3077            }
3078            other => panic!("unexpected error: {:?}", other),
3079        }
3080    }
3081
3082    #[tokio::test]
3083    async fn test_events_consumer_stops_immediately_when_cancelled() {
3084        use tokio::sync::mpsc;
3085
3086        let mut consumer = ContainerConsumer {
3087            config: ContainerConfig::from_uri("container:events").unwrap(),
3088            runtime: test_rt(),
3089        };
3090
3091        let (tx, _rx) = mpsc::channel(4);
3092        let cancel = tokio_util::sync::CancellationToken::new();
3093        cancel.cancel();
3094        let context = ConsumerContext::new(tx, cancel, "container-test-route".to_string());
3095
3096        let result = consumer.start(context).await;
3097        assert!(result.is_ok());
3098    }
3099
3100    #[test]
3101    fn test_global_config_constructors_and_endpoint_docker_host() {
3102        let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3103        let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3104        cfg.apply_global_defaults(&global);
3105
3106        let endpoint = ContainerEndpoint {
3107            uri: "container:list".to_string(),
3108            config: cfg,
3109        };
3110
3111        assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3112
3113        let component = ContainerComponent::with_config(global);
3114        assert_eq!(component.scheme(), "container");
3115    }
3116
3117    #[test]
3118    fn container_global_config_has_reconnect_policy() {
3119        let cfg = ContainerGlobalConfig::default();
3120        assert_eq!(cfg.reconnect.max_attempts, 0); // unlimited
3121        assert!(cfg.reconnect.enabled);
3122    }
3123
3124    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
3125    /// Replicates the exact retry loop from ContainerConsumer::{start_events_consumer,start_logs_consumer}
3126    /// (lib.rs:~1408-1431, ~1503-1525):
3127    ///   attempt starts at 0, incremented on error, !should_retry(attempt), delay_for(attempt-1)
3128    #[tokio::test]
3129    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
3130        use std::sync::Arc;
3131        use std::sync::atomic::{AtomicU32, Ordering};
3132        use std::time::Duration;
3133
3134        let policy = NetworkRetryPolicy {
3135            max_attempts: 3,
3136            initial_delay: Duration::from_millis(1),
3137            max_delay: Duration::from_millis(1),
3138            multiplier: 1.0,
3139            ..NetworkRetryPolicy::default()
3140        };
3141
3142        let calls = Arc::new(AtomicU32::new(0));
3143        let calls_clone = Arc::clone(&calls);
3144        let mut attempt: u32 = 0;
3145
3146        loop {
3147            calls_clone.fetch_add(1, Ordering::SeqCst);
3148            let result: Result<(), ()> = Err(());
3149            match result {
3150                Ok(_) => {
3151                    break;
3152                }
3153                Err(_) => {
3154                    attempt += 1;
3155                    if !policy.should_retry(attempt) {
3156                        break;
3157                    }
3158                    let delay = policy.delay_for(attempt - 1);
3159                    tokio::time::sleep(delay).await;
3160                    continue;
3161                }
3162            }
3163        }
3164
3165        assert_eq!(
3166            calls.load(Ordering::SeqCst),
3167            3,
3168            "max_attempts=3 must yield exactly 3 invocations"
3169        );
3170    }
3171
3172    // -----------------------------------------------------------------------
3173    // ADR-0012 (e) metric wiring regression tests
3174    // -----------------------------------------------------------------------
3175
3176    /// Regression: events-connect error path calls increment_errors with
3177    /// correct route_id and label. Uses an unsupported tcp:// host to trigger
3178    /// the error path WITHOUT needing a real Docker daemon (docker_socket_path
3179    /// returns Err on non-unix/npipe schemes).
3180    #[tokio::test]
3181    async fn events_connect_error_increments_metrics() {
3182        use std::sync::Mutex;
3183        use std::time::Duration;
3184
3185        struct RecordingMetrics(Mutex<Vec<(String, String)>>);
3186
3187        impl MetricsCollector for RecordingMetrics {
3188            fn record_exchange_duration(&self, _: &str, _: Duration) {}
3189            fn increment_errors(&self, route_id: &str, error_type: &str) {
3190                self.0
3191                    .lock()
3192                    .unwrap()
3193                    .push((route_id.to_string(), error_type.to_string()));
3194            }
3195            fn increment_exchanges(&self, _: &str) {}
3196            fn set_queue_depth(&self, _: &str, _: usize) {}
3197            fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
3198        }
3199
3200        struct RecordingRuntime {
3201            metrics: Arc<RecordingMetrics>,
3202        }
3203
3204        impl HealthCheckRegistry for RecordingRuntime {
3205            fn force_unhealthy_for_route(&self, _: &str, _: &str, _: &str) {}
3206        }
3207
3208        impl RuntimeObservability for RecordingRuntime {
3209            fn metrics(&self) -> Arc<dyn MetricsCollector> {
3210                self.metrics.clone()
3211            }
3212            fn health(&self) -> Arc<dyn HealthCheckRegistry> {
3213                Arc::new(camel_component_api::NoOpHealthCheckRegistry)
3214            }
3215        }
3216
3217        let recording = Arc::new(RecordingMetrics(Mutex::new(Vec::new())));
3218        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
3219            metrics: recording.clone(),
3220        });
3221
3222        // Use unsupported tcp:// host so docker_socket_path() fails fast
3223        // without any real I/O. Disable retry so the first failure propagates
3224        // immediately to the Err(e) arm.
3225        let mut config = ContainerConfig::from_uri("container:events").unwrap();
3226        config.host = Some("tcp://192.0.2.1:2375".to_string());
3227        config.reconnect = NetworkRetryPolicy::disabled();
3228
3229        let mut consumer = ContainerConsumer {
3230            config,
3231            runtime: rt,
3232        };
3233
3234        let (tx, _rx) = tokio::sync::mpsc::channel(4);
3235        let cancel = tokio_util::sync::CancellationToken::new();
3236        let context = ConsumerContext::new(tx, cancel, "events-test-route".to_string());
3237
3238        let result = consumer.start(context).await;
3239        assert!(result.is_err(), "expected Docker connection error");
3240
3241        let errors = recording.0.lock().unwrap();
3242        assert_eq!(
3243            errors.len(),
3244            1,
3245            "expected exactly one increment_errors call"
3246        );
3247        assert_eq!(errors[0].0, "events-test-route");
3248        assert_eq!(errors[0].1, "e:container:events-connect");
3249    }
3250}