Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21    ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24    CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25    ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26    StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::NetworkRetryPolicy;
30use camel_component_api::parse_uri;
31use camel_component_api::retry_async_cancelable;
32use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
33use camel_component_api::{
34    Component, Consumer, ConsumerContext, Endpoint, ProducerContext, RuntimeObservability,
35};
36use tower::Service;
37
38/// Global tracker for containers created by this component.
39/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
40static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
41    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
42
43/// Registers a container ID for tracking (will be cleaned up on shutdown).
44fn track_container(id: String) {
45    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
46        tracker.insert(id);
47    }
48}
49
50/// Removes a container ID from tracking (when it's been removed naturally).
51fn untrack_container(id: &str) {
52    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
53        tracker.remove(id);
54    }
55}
56
57/// Cleans up all tracked containers. Call this on application shutdown.
58pub async fn cleanup_tracked_containers() {
59    let ids: Vec<String> = {
60        match CONTAINER_TRACKER.lock() {
61            Ok(tracker) => tracker.iter().cloned().collect(),
62            Err(_) => return,
63        }
64    };
65
66    if ids.is_empty() {
67        return;
68    }
69
70    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
71
72    let docker = match Docker::connect_with_local_defaults() {
73        Ok(d) => d,
74        Err(e) => {
75            // log-policy: system-broken
76            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
77            return;
78        }
79    };
80
81    for id in ids {
82        match docker
83            .remove_container(
84                &id,
85                Some(RemoveContainerOptions {
86                    force: true,
87                    ..Default::default()
88                }),
89            )
90            .await
91        {
92            Ok(_) => {
93                tracing::debug!("Cleaned up container {}", id);
94                untrack_container(&id);
95            }
96            Err(e) => {
97                tracing::warn!("Failed to cleanup container {}: {}", id, e);
98            }
99        }
100    }
101}
102
103// Header constants for container operations
104
105/// Timeout (seconds) for connecting to the Docker daemon.
106const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
107
108/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
109pub const HEADER_ACTION: &str = "CamelContainerAction";
110
111/// Header key for specifying the container image to use for "run" operations.
112pub const HEADER_IMAGE: &str = "CamelContainerImage";
113
114/// Header key for specifying or receiving the container ID.
115pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
116
117/// Header key for the log stream type (stdout or stderr).
118pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
119
120/// Header key for the log timestamp.
121pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
122
123/// Header key for specifying the container name for "run" operations.
124pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
125
126/// Header key for the result status of a container operation (e.g., "success").
127pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
128
129/// Header key for specifying the command to execute in a container.
130pub const HEADER_CMD: &str = "CamelContainerCmd";
131
132/// Header key for specifying the network name for network operations.
133pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
134
135/// Header key for the exit code of an exec operation.
136pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
137
138/// Header key for specifying volume mounts.
139pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
140
141/// Header key for the exec instance ID.
142pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
143
144// ---------------------------------------------------------------------------
145// ContainerGlobalConfig
146// ---------------------------------------------------------------------------
147
148/// Per-component reconnect default: unlimited retries (max_attempts=0)
149/// with a fixed 5s delay, preserving the old infinite-reconnect behavior.
150/// Operators can opt into bounded retry via TOML `[reconnect]`.
151fn container_reconnect_default() -> NetworkRetryPolicy {
152    NetworkRetryPolicy {
153        enabled: true,
154        max_attempts: 0, // unlimited
155        initial_delay: std::time::Duration::from_secs(5),
156        multiplier: 1.0, // fixed delay (old behavior was no backoff)
157        max_delay: std::time::Duration::from_secs(5),
158        jitter_factor: 0.0,
159    }
160}
161
162/// Global configuration for Container component.
163/// Supports serde deserialization with defaults and builder methods.
164/// These are the fallback defaults when URI params are not set.
165#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
166#[serde(default)]
167pub struct ContainerGlobalConfig {
168    /// The Docker host URL (default: "unix:///var/run/docker.sock").
169    pub docker_host: String,
170    /// Reconnection policy for events/logs consumers (default: unlimited, 5s fixed delay).
171    #[serde(default = "container_reconnect_default")]
172    pub reconnect: NetworkRetryPolicy,
173}
174
175impl Default for ContainerGlobalConfig {
176    fn default() -> Self {
177        Self {
178            docker_host: "unix:///var/run/docker.sock".to_string(),
179            reconnect: container_reconnect_default(),
180        }
181    }
182}
183
184impl ContainerGlobalConfig {
185    pub fn new() -> Self {
186        Self::default()
187    }
188
189    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
190        self.docker_host = v.into();
191        self
192    }
193}
194
195// ---------------------------------------------------------------------------
196// ContainerConfig (endpoint configuration)
197// ---------------------------------------------------------------------------
198
199/// Configuration for the container component endpoint.
200///
201/// This struct holds the parsed URI configuration including the operation type,
202/// optional container image, and Docker host connection details.
203#[derive(Debug, Clone)]
204pub struct ContainerConfig {
205    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
206    pub operation: String,
207    /// The container image to use for "run" operations (can be overridden via header).
208    pub image: Option<String>,
209    /// The container name to use for "run" operations (can be overridden via header).
210    pub name: Option<String>,
211    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
212    pub host: Option<String>,
213    /// Command to run in the container (e.g., "sleep 30").
214    pub cmd: Option<String>,
215    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
216    pub ports: Option<String>,
217    /// Environment variables in format "KEY=value,KEY2=value2".
218    pub env: Option<String>,
219    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
220    pub network: Option<String>,
221    /// Container ID or name for logs consumer.
222    pub container_id: Option<String>,
223    /// Follow log output (default: true for consumer).
224    pub follow: bool,
225    /// Include timestamps in logs (default: false).
226    pub timestamps: bool,
227    /// Number of lines to show from the end of logs (default: all).
228    pub tail: Option<String>,
229    /// Automatically pull the image if not present (default: true).
230    pub auto_pull: bool,
231    /// Automatically remove the container when it exits (default: true).
232    pub auto_remove: bool,
233    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
234    pub volumes: Option<String>,
235    /// User to run the container or exec command as (e.g., "root").
236    pub user: Option<String>,
237    /// Working directory inside the container.
238    pub workdir: Option<String>,
239    /// Whether to detach from the exec process (default: false).
240    pub detach: bool,
241    /// Network driver for network-create (e.g., "bridge", "overlay").
242    pub driver: Option<String>,
243    /// Whether to force the operation (default: false).
244    pub force: bool,
245    /// Reconnection policy for events/logs consumers (applied from global config).
246    pub reconnect: NetworkRetryPolicy,
247}
248
249impl ContainerConfig {
250    /// Parses a container URI into a `ContainerConfig`.
251    ///
252    /// # Arguments
253    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
254    ///
255    /// # Errors
256    /// Returns an error if the URI scheme is not "container".
257    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
258        let parts = parse_uri(uri)?;
259        if parts.scheme != "container" {
260            return Err(CamelError::InvalidUri(format!(
261                "expected scheme 'container', got '{}'",
262                parts.scheme
263            )));
264        }
265
266        let image = parts.params.get("image").cloned();
267        let name = parts.params.get("name").cloned();
268        let cmd = parts.params.get("cmd").cloned();
269        let ports = parts.params.get("ports").cloned();
270        let env = parts.params.get("env").cloned();
271        let network = parts.params.get("network").cloned();
272        let container_id = parts.params.get("containerId").cloned();
273        let follow = parts
274            .params
275            .get("follow")
276            .map(|v| v.eq_ignore_ascii_case("true"))
277            .unwrap_or(true);
278        let timestamps = parts
279            .params
280            .get("timestamps")
281            .map(|v| v.eq_ignore_ascii_case("true"))
282            .unwrap_or(false);
283        let tail = parts.params.get("tail").cloned();
284        let auto_pull = parts
285            .params
286            .get("autoPull")
287            .map(|v| v.eq_ignore_ascii_case("true"))
288            .unwrap_or(true);
289        let auto_remove = parts
290            .params
291            .get("autoRemove")
292            .map(|v| v.eq_ignore_ascii_case("true"))
293            .unwrap_or(true);
294        // host is only set from URI param; global config defaults are applied later
295        let host = parts.params.get("host").cloned();
296        let volumes = parts.params.get("volumes").cloned();
297        let user = parts.params.get("user").cloned();
298        let workdir = parts.params.get("workdir").cloned();
299        let detach = parts
300            .params
301            .get("detach")
302            .map(|v| v.eq_ignore_ascii_case("true"))
303            .unwrap_or(false);
304        let driver = parts.params.get("driver").cloned();
305        let force = parts
306            .params
307            .get("force")
308            .map(|v| v.eq_ignore_ascii_case("true"))
309            .unwrap_or(false);
310
311        Ok(Self {
312            operation: parts.path,
313            image,
314            name,
315            host,
316            cmd,
317            ports,
318            env,
319            network,
320            container_id,
321            follow,
322            timestamps,
323            tail,
324            auto_pull,
325            auto_remove,
326            volumes,
327            user,
328            workdir,
329            detach,
330            driver,
331            force,
332            reconnect: NetworkRetryPolicy::default(),
333        })
334    }
335
336    /// Apply global config defaults to this endpoint config.
337    /// Only sets values that are currently `None`.
338    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
339        if self.host.is_none() {
340            self.host = Some(global.docker_host.clone());
341        }
342        self.reconnect = global.reconnect.clone();
343    }
344
345    fn docker_socket_path(&self) -> Result<&str, CamelError> {
346        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
347            "npipe:////./pipe/docker_engine"
348        } else {
349            "unix:///var/run/docker.sock"
350        });
351
352        if host.starts_with("unix://") || host.starts_with("npipe://") {
353            return Ok(host);
354        }
355
356        if host.contains("://") {
357            return Err(CamelError::ProcessorError(format!(
358                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
359                host
360            )));
361        }
362
363        Ok(host)
364    }
365
366    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
367        let socket_path = self.docker_socket_path()?;
368        Docker::connect_with_socket(
369            socket_path,
370            DOCKER_CONNECT_TIMEOUT_SECS,
371            bollard::API_DEFAULT_VERSION,
372        )
373        .map_err(|e| {
374            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
375        })
376    }
377
378    /// Connects to the Docker daemon using the configured host.
379    ///
380    /// This method establishes a Unix socket connection to Docker and verifies
381    /// the connection by sending a ping request.
382    ///
383    /// # Errors
384    /// Returns an error if the connection fails or the ping request fails.
385    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
386        let docker = self.connect_docker_client()?;
387        docker
388            .ping()
389            .await
390            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
391        Ok(docker)
392    }
393
394    #[allow(clippy::type_complexity)]
395    fn parse_ports(
396        &self,
397    ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
398        let ports_str = match self.ports.as_ref() {
399            Some(s) => s,
400            None => return Ok((Vec::new(), HashMap::new())),
401        };
402
403        let mut exposed_ports: Vec<String> = Vec::new();
404        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
405
406        for mapping in ports_str.split(',') {
407            let mapping = mapping.trim();
408            if mapping.is_empty() {
409                continue;
410            }
411
412            let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
413                CamelError::ProcessorError(format!(
414                    "malformed port mapping '{}': expected hostPort:containerPort",
415                    mapping
416                ))
417            })?;
418
419            let (container_port, protocol) = if container_spec.contains('/') {
420                let parts: Vec<&str> = container_spec.split('/').collect();
421                (parts[0], parts[1])
422            } else {
423                (container_spec, "tcp")
424            };
425
426            let container_key = format!("{}/{}", container_port, protocol);
427
428            exposed_ports.push(container_key.clone());
429
430            port_bindings.insert(
431                container_key,
432                Some(vec![PortBinding {
433                    host_ip: None,
434                    host_port: Some(host_port.to_string()),
435                }]),
436            );
437        }
438
439        Ok((exposed_ports, port_bindings))
440    }
441
442    fn parse_env(&self) -> Option<Vec<String>> {
443        let env_str = self.env.as_ref()?;
444
445        let env_vars: Vec<String> = env_str
446            .split(',')
447            .map(|s| s.trim().to_string())
448            .filter(|s| !s.is_empty())
449            .collect();
450
451        if env_vars.is_empty() {
452            None
453        } else {
454            Some(env_vars)
455        }
456    }
457
458    #[cfg(test)]
459    #[allow(clippy::type_complexity)]
460    fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
461        self.volumes.as_deref().and_then(parse_volume_str)
462    }
463}
464
465/// Parses a volume specification string into bind mounts and anonymous volumes.
466///
467/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
468/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
469#[allow(clippy::type_complexity)]
470fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
471    let mut binds: Vec<String> = Vec::new();
472    let mut anonymous_volumes: Vec<String> = Vec::new();
473
474    for entry in volumes_str.split(',') {
475        let entry = entry.trim();
476        if entry.is_empty() {
477            continue;
478        }
479
480        let segments: Vec<&str> = entry.split(':').collect();
481
482        match segments.len() {
483            3 => {
484                let source = segments[0];
485                let target = segments[1];
486                let mode = segments[2];
487                if mode != "ro" && mode != "rw" {
488                    continue;
489                }
490                binds.push(format!("{}:{}:{}", source, target, mode));
491            }
492            2 => {
493                let a = segments[0];
494                let b = segments[1];
495                if b == "ro" || b == "rw" {
496                    anonymous_volumes.push(a.to_string());
497                } else {
498                    binds.push(format!("{}:{}", a, b));
499                }
500            }
501            1 => {
502                anonymous_volumes.push(segments[0].to_string());
503            }
504            _ => continue,
505        }
506    }
507
508    if binds.is_empty() && anonymous_volumes.is_empty() {
509        None
510    } else {
511        Some((binds, anonymous_volumes))
512    }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516enum ProducerOperation {
517    List,
518    Run,
519    Start,
520    Stop,
521    Remove,
522    Exec,
523    NetworkCreate,
524    NetworkConnect,
525    NetworkDisconnect,
526    NetworkRemove,
527    NetworkList,
528}
529
530fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
531    match operation {
532        "list" => Ok(ProducerOperation::List),
533        "run" => Ok(ProducerOperation::Run),
534        "start" => Ok(ProducerOperation::Start),
535        "stop" => Ok(ProducerOperation::Stop),
536        "remove" => Ok(ProducerOperation::Remove),
537        "exec" => Ok(ProducerOperation::Exec),
538        "network-create" => Ok(ProducerOperation::NetworkCreate),
539        "network-connect" => Ok(ProducerOperation::NetworkConnect),
540        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
541        "network-remove" => Ok(ProducerOperation::NetworkRemove),
542        "network-list" => Ok(ProducerOperation::NetworkList),
543        _ => Err(CamelError::ProcessorError(format!(
544            "Unknown container operation: {}",
545            operation
546        ))),
547    }
548}
549
550fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
551    exchange
552        .input
553        .header(HEADER_CONTAINER_NAME)
554        .and_then(|v| v.as_str().map(|s| s.to_string()))
555        .or_else(|| config.name.clone())
556}
557
558async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
559    let images = docker
560        .list_images(None::<ListImagesOptions>)
561        .await
562        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
563
564    Ok(images.iter().any(|img| {
565        img.repo_tags
566            .iter()
567            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
568    }))
569}
570
571async fn pull_image_with_progress(
572    docker: &Docker,
573    image: &str,
574    timeout_secs: u64,
575) -> Result<(), CamelError> {
576    use futures::StreamExt;
577
578    tracing::info!("Pulling image: {}", image);
579
580    let mut stream = docker.create_image(
581        Some(CreateImageOptions {
582            from_image: Some(image.to_string()),
583            ..Default::default()
584        }),
585        None,
586        None,
587    );
588
589    let start = std::time::Instant::now();
590    let mut last_progress = std::time::Instant::now();
591
592    while let Some(item) = stream.next().await {
593        if start.elapsed().as_secs() > timeout_secs {
594            return Err(CamelError::ProcessorError(format!(
595                "Image pull timeout after {}s. Try manually: docker pull {}",
596                timeout_secs, image
597            )));
598        }
599
600        match item {
601            Ok(update) => {
602                // Log progress every 2 seconds
603                if last_progress.elapsed().as_secs() >= 2 {
604                    if let Some(status) = update.status {
605                        tracing::debug!("Pull progress: {}", status);
606                    }
607                    last_progress = std::time::Instant::now();
608                }
609            }
610            Err(e) => {
611                let err_str = e.to_string().to_lowercase();
612                if err_str.contains("unauthorized") || err_str.contains("401") {
613                    return Err(CamelError::ProcessorError(format!(
614                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
615                        image
616                    )));
617                }
618                if err_str.contains("not found") || err_str.contains("404") {
619                    return Err(CamelError::ProcessorError(format!(
620                        "Image '{}' not found in registry. Check the image name and tag",
621                        image
622                    )));
623                }
624                return Err(CamelError::ProcessorError(format!(
625                    "Failed to pull image '{}': {}",
626                    image, e
627                )));
628            }
629        }
630    }
631
632    tracing::info!("Successfully pulled image: {}", image);
633    Ok(())
634}
635
636async fn ensure_image_available(
637    docker: &Docker,
638    image: &str,
639    auto_pull: bool,
640    timeout_secs: u64,
641) -> Result<(), CamelError> {
642    if image_exists_locally(docker, image).await? {
643        tracing::debug!("Image '{}' already available locally", image);
644        return Ok(());
645    }
646
647    if !auto_pull {
648        return Err(CamelError::ProcessorError(format!(
649            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
650            image, image
651        )));
652    }
653
654    pull_image_with_progress(docker, image, timeout_secs).await
655}
656
657fn format_docker_event(event: &bollard::models::EventMessage) -> String {
658    let action = event.action.as_deref().unwrap_or("unknown");
659    let actor = event.actor.as_ref();
660
661    let container_name = actor
662        .and_then(|a| a.attributes.as_ref())
663        .and_then(|attrs| attrs.get("name"))
664        .map(|s| s.as_str())
665        .unwrap_or("unknown");
666
667    let image = actor
668        .and_then(|a| a.attributes.as_ref())
669        .and_then(|attrs| attrs.get("image"))
670        .map(|s| s.as_str())
671        .unwrap_or("");
672
673    let exit_code = actor
674        .and_then(|a| a.attributes.as_ref())
675        .and_then(|attrs| attrs.get("exitCode"))
676        .map(|s| s.as_str());
677
678    match action {
679        "create" => {
680            if image.is_empty() {
681                format!("[CREATE] Container {}", container_name)
682            } else {
683                format!("[CREATE] Container {} ({})", container_name, image)
684            }
685        }
686        "start" => format!("[START]  Container {}", container_name),
687        "die" => {
688            if let Some(code) = exit_code {
689                format!("[DIE]    Container {} (exit: {})", container_name, code)
690            } else {
691                format!("[DIE]    Container {}", container_name)
692            }
693        }
694        "destroy" => format!("[DESTROY] Container {}", container_name),
695        "stop" => format!("[STOP]   Container {}", container_name),
696        "pause" => format!("[PAUSE]  Container {}", container_name),
697        "unpause" => format!("[UNPAUSE] Container {}", container_name),
698        "restart" => format!("[RESTART] Container {}", container_name),
699        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
700    }
701}
702
703async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
704    create: CreateFn,
705    start: StartFn,
706    remove: RemoveFn,
707) -> Result<String, CamelError>
708where
709    CreateFn: FnOnce() -> CreateFut,
710    CreateFut: Future<Output = Result<String, CamelError>>,
711    StartFn: FnOnce(String) -> StartFut,
712    StartFut: Future<Output = Result<(), CamelError>>,
713    RemoveFn: FnOnce(String) -> RemoveFut,
714    RemoveFut: Future<Output = Result<(), CamelError>>,
715{
716    let container_id = create().await?;
717    if let Err(start_err) = start(container_id.clone()).await {
718        if let Err(remove_err) = remove(container_id.clone()).await {
719            return Err(CamelError::ProcessorError(format!(
720                "Failed to start container: {}. Cleanup failed: {}",
721                start_err, remove_err
722            )));
723        }
724        return Err(start_err);
725    }
726
727    Ok(container_id)
728}
729
730async fn handle_list(
731    docker: Docker,
732    _config: ContainerConfig,
733    exchange: &mut Exchange,
734) -> Result<(), CamelError> {
735    let containers = docker
736        .list_containers(None::<ListContainersOptions>)
737        .await
738        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
739
740    let json_value = serde_json::to_value(&containers).map_err(|e| {
741        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
742    })?;
743
744    exchange.input.body = Body::Json(json_value);
745    exchange.input.set_header(
746        HEADER_ACTION_RESULT,
747        serde_json::Value::String("success".to_string()),
748    );
749    Ok(())
750}
751
752async fn handle_run(
753    docker: Docker,
754    config: ContainerConfig,
755    exchange: &mut Exchange,
756) -> Result<(), CamelError> {
757    let image = exchange
758        .input
759        .header(HEADER_IMAGE)
760        .and_then(|v| v.as_str().map(|s| s.to_string()))
761        .or(config.image.clone())
762        .ok_or_else(|| {
763            CamelError::ProcessorError(
764                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
765            )
766        })?;
767
768    let image = if !image.contains(':') && !image.contains('@') {
769        format!("{}:latest", image)
770    } else {
771        image
772    };
773
774    let pull_timeout = 300;
775    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
776        .await
777        .map_err(|e| {
778            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
779        })?;
780
781    let container_name = resolve_container_name(exchange, &config);
782    let container_name_ref = container_name.as_deref().unwrap_or("");
783    let cmd_parts: Option<Vec<String>> = config
784        .cmd
785        .as_ref()
786        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
787    let auto_remove = config.auto_remove;
788    let (exposed_ports, port_bindings) = config.parse_ports()?;
789    let env_vars = config.parse_env();
790    let network_mode = config.network.clone();
791
792    let volumes_str = exchange
793        .input
794        .header(HEADER_VOLUMES)
795        .and_then(|v| v.as_str().map(|s| s.to_string()))
796        .or(config.volumes.clone());
797    let (binds, anon_volumes) = volumes_str
798        .as_deref()
799        .and_then(parse_volume_str)
800        .unwrap_or_default();
801
802    let docker_create = docker.clone();
803    let docker_start = docker.clone();
804    let docker_remove = docker.clone();
805
806    let container_id = run_container_with_cleanup(
807        move || async move {
808            let create_options = CreateContainerOptions {
809                name: Some(container_name_ref.to_string()),
810                ..Default::default()
811            };
812            let container_config = ContainerCreateBody {
813                image: Some(image.clone()),
814                cmd: cmd_parts,
815                env: env_vars,
816                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
817                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
818                host_config: Some(HostConfig {
819                    auto_remove: Some(auto_remove),
820                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
821                    network_mode,
822                    binds: if binds.is_empty() { None } else { Some(binds) },
823                    ..Default::default()
824                }),
825                ..Default::default()
826            };
827
828            let create_response = docker_create
829                .create_container(Some(create_options), container_config)
830                .await
831                .map_err(|e| {
832                    let err_str = e.to_string().to_lowercase();
833                    if err_str.contains("409") || err_str.contains("conflict") {
834                        CamelError::ProcessorError(format!(
835                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
836                            container_name_ref
837                        ))
838                    } else {
839                        CamelError::ProcessorError(format!(
840                            "Failed to create container: {}",
841                            e
842                        ))
843                    }
844                })?;
845
846            Ok(create_response.id)
847        },
848        move |container_id| async move {
849            docker_start
850                .start_container(&container_id, None::<StartContainerOptions>)
851                .await
852                .map_err(|e| {
853                    CamelError::ProcessorError(format!(
854                        "Failed to start container: {}",
855                        e
856                    ))
857                })
858        },
859        move |container_id| async move {
860            docker_remove
861                .remove_container(&container_id, None)
862                .await
863                .map_err(|e| {
864                    CamelError::ProcessorError(format!(
865                        "Failed to remove container after start failure: {}",
866                        e
867                    ))
868                })
869        },
870    )
871    .await?;
872
873    track_container(container_id.clone());
874
875    exchange
876        .input
877        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
878    exchange.input.set_header(
879        HEADER_ACTION_RESULT,
880        serde_json::Value::String("success".to_string()),
881    );
882    Ok(())
883}
884
885async fn handle_lifecycle(
886    docker: Docker,
887    _config: ContainerConfig,
888    exchange: &mut Exchange,
889    operation: ProducerOperation,
890    operation_name: &str,
891) -> Result<(), CamelError> {
892    let container_id = exchange
893        .input
894        .header(HEADER_CONTAINER_ID)
895        .and_then(|v| v.as_str().map(|s| s.to_string()))
896        .ok_or_else(|| {
897            CamelError::ProcessorError(format!(
898                "{} header is required for {} operation",
899                HEADER_CONTAINER_ID, operation_name
900            ))
901        })?;
902
903    match operation {
904        ProducerOperation::Start => {
905            docker
906                .start_container(&container_id, None::<StartContainerOptions>)
907                .await
908                .map_err(|e| {
909                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
910                })?;
911        }
912        ProducerOperation::Stop => {
913            docker
914                .stop_container(&container_id, None)
915                .await
916                .map_err(|e| {
917                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
918                })?;
919        }
920        ProducerOperation::Remove => {
921            docker
922                .remove_container(&container_id, None)
923                .await
924                .map_err(|e| {
925                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
926                })?;
927            untrack_container(&container_id);
928        }
929        _ => {}
930    }
931
932    exchange.input.set_header(
933        HEADER_ACTION_RESULT,
934        serde_json::Value::String("success".to_string()),
935    );
936    Ok(())
937}
938
939async fn handle_exec(
940    docker: Docker,
941    config: ContainerConfig,
942    exchange: &mut Exchange,
943) -> Result<(), CamelError> {
944    let container_id = exchange
945        .input
946        .header(HEADER_CONTAINER_ID)
947        .and_then(|v| v.as_str().map(|s| s.to_string()))
948        .or(config.container_id.clone())
949        .ok_or_else(|| {
950            CamelError::ProcessorError(format!(
951                "{} header or containerId param is required for exec operation",
952                HEADER_CONTAINER_ID
953            ))
954        })?;
955
956    let cmd = exchange
957        .input
958        .header(HEADER_CMD)
959        .and_then(|v| v.as_str().map(|s| s.to_string()))
960        .or(config.cmd.clone())
961        .ok_or_else(|| {
962            CamelError::ProcessorError(
963                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
964            )
965        })?;
966
967    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
968    let env_vars = config.parse_env();
969
970    let exec_config = bollard::exec::CreateExecOptions {
971        cmd: Some(cmd_parts),
972        env: env_vars,
973        user: config.user.clone(),
974        working_dir: config.workdir.clone(),
975        attach_stdout: Some(true),
976        attach_stderr: Some(true),
977        ..Default::default()
978    };
979
980    let create_result = docker
981        .create_exec(&container_id, exec_config)
982        .await
983        .map_err(|e| {
984            let err_str = e.to_string().to_lowercase();
985            if err_str.contains("404") || err_str.contains("no such") {
986                CamelError::ProcessorError(format!(
987                    "Container '{}' not found for exec",
988                    container_id
989                ))
990            } else {
991                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
992            }
993        })?;
994
995    let exec_id = create_result.id;
996
997    if config.detach {
998        docker
999            .start_exec(
1000                &exec_id,
1001                Some(bollard::exec::StartExecOptions {
1002                    detach: true,
1003                    ..Default::default()
1004                }),
1005            )
1006            .await
1007            .map_err(|e| {
1008                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
1009            })?;
1010
1011        exchange
1012            .input
1013            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
1014        exchange
1015            .input
1016            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1017    } else {
1018        let start_result = docker
1019            .start_exec(&exec_id, None)
1020            .await
1021            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
1022
1023        let mut output = String::new();
1024
1025        match start_result {
1026            bollard::exec::StartExecResults::Attached {
1027                output: mut stream, ..
1028            } => {
1029                use futures::StreamExt;
1030                while let Some(msg) = stream.next().await {
1031                    match msg {
1032                        Ok(bollard::container::LogOutput::StdOut { message }) => {
1033                            output.push_str(&String::from_utf8_lossy(&message));
1034                        }
1035                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1036                            output.push_str(&String::from_utf8_lossy(&message));
1037                        }
1038                        Ok(_) => {}
1039                        Err(e) => {
1040                            output.push_str(&format!("[error reading stream: {}]", e));
1041                        }
1042                    }
1043                }
1044            }
1045            bollard::exec::StartExecResults::Detached => {}
1046        }
1047
1048        let inspect = docker
1049            .inspect_exec(&exec_id)
1050            .await
1051            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1052
1053        let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1054            CamelError::ProcessorError("container exec returned no exit code".into())
1055        })?;
1056
1057        let output = output.trim_end().to_string();
1058        exchange.input.body = Body::Text(output);
1059        exchange.input.set_header(
1060            HEADER_EXIT_CODE,
1061            serde_json::Value::Number(exit_code.into()),
1062        );
1063        exchange
1064            .input
1065            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1066    }
1067
1068    exchange.input.set_header(
1069        HEADER_ACTION_RESULT,
1070        serde_json::Value::String("success".to_string()),
1071    );
1072    Ok(())
1073}
1074
1075async fn handle_network_create(
1076    docker: Docker,
1077    config: ContainerConfig,
1078    exchange: &mut Exchange,
1079) -> Result<(), CamelError> {
1080    let network_name = exchange
1081        .input
1082        .header(HEADER_CONTAINER_NAME)
1083        .and_then(|v| v.as_str().map(|s| s.to_string()))
1084        .or(config.name.clone())
1085        .ok_or_else(|| {
1086            CamelError::ProcessorError(
1087                "CamelContainerName header or name param is required for network-create"
1088                    .to_string(),
1089            )
1090        })?;
1091
1092    let driver = config.driver.as_deref().unwrap_or("bridge");
1093
1094    let options = NetworkCreateRequest {
1095        name: network_name.clone(),
1096        driver: Some(driver.to_string()),
1097        ..Default::default()
1098    };
1099
1100    let result = docker.create_network(options).await.map_err(|e| {
1101        let err_str = e.to_string().to_lowercase();
1102        if err_str.contains("409") || err_str.contains("already exists") {
1103            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1104        } else {
1105            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1106        }
1107    })?;
1108
1109    let network_id = result.id.clone();
1110    let json_value = serde_json::to_value(&result).map_err(|e| {
1111        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1112    })?;
1113
1114    exchange.input.body = Body::Json(json_value);
1115    exchange
1116        .input
1117        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1118    exchange.input.set_header(
1119        HEADER_ACTION_RESULT,
1120        serde_json::Value::String("success".to_string()),
1121    );
1122    Ok(())
1123}
1124
1125async fn handle_network_connect(
1126    docker: Docker,
1127    config: ContainerConfig,
1128    exchange: &mut Exchange,
1129) -> Result<(), CamelError> {
1130    let network = exchange
1131        .input
1132        .header(HEADER_NETWORK)
1133        .and_then(|v| v.as_str().map(|s| s.to_string()))
1134        .or(config.network.clone())
1135        .ok_or_else(|| {
1136            CamelError::ProcessorError(
1137                "CamelContainerNetwork header or network param is required for network-connect"
1138                    .to_string(),
1139            )
1140        })?;
1141
1142    let container = exchange
1143        .input
1144        .header(HEADER_CONTAINER_ID)
1145        .and_then(|v| v.as_str().map(|s| s.to_string()))
1146        .or(config.container_id.clone())
1147        .ok_or_else(|| {
1148            CamelError::ProcessorError(
1149                "CamelContainerId header or container param is required for network-connect"
1150                    .to_string(),
1151            )
1152        })?;
1153
1154    docker
1155        .connect_network(
1156            &network,
1157            NetworkConnectRequest {
1158                container,
1159                ..Default::default()
1160            },
1161        )
1162        .await
1163        .map_err(|e| {
1164            let err_str = e.to_string().to_lowercase();
1165            if err_str.contains("404") || err_str.contains("not found") {
1166                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1167            } else {
1168                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1169            }
1170        })?;
1171
1172    exchange.input.set_header(
1173        HEADER_ACTION_RESULT,
1174        serde_json::Value::String("success".to_string()),
1175    );
1176    Ok(())
1177}
1178
1179async fn handle_network_disconnect(
1180    docker: Docker,
1181    config: ContainerConfig,
1182    exchange: &mut Exchange,
1183) -> Result<(), CamelError> {
1184    let network = exchange
1185        .input
1186        .header(HEADER_NETWORK)
1187        .and_then(|v| v.as_str().map(|s| s.to_string()))
1188        .or(config.network.clone())
1189        .ok_or_else(|| {
1190            CamelError::ProcessorError(
1191                "CamelContainerNetwork header or network param is required for network-disconnect"
1192                    .to_string(),
1193            )
1194        })?;
1195
1196    let container = exchange
1197        .input
1198        .header(HEADER_CONTAINER_ID)
1199        .and_then(|v| v.as_str().map(|s| s.to_string()))
1200        .or(config.container_id.clone())
1201        .ok_or_else(|| {
1202            CamelError::ProcessorError(
1203                "CamelContainerId header or container param is required for network-disconnect"
1204                    .to_string(),
1205            )
1206        })?;
1207
1208    docker
1209        .disconnect_network(
1210            &network,
1211            NetworkDisconnectRequest {
1212                container,
1213                force: Some(config.force),
1214            },
1215        )
1216        .await
1217        .map_err(|e| {
1218            let err_str = e.to_string().to_lowercase();
1219            if err_str.contains("404") || err_str.contains("not found") {
1220                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1221            } else {
1222                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1223            }
1224        })?;
1225
1226    exchange.input.set_header(
1227        HEADER_ACTION_RESULT,
1228        serde_json::Value::String("success".to_string()),
1229    );
1230    Ok(())
1231}
1232
1233async fn handle_network_remove(
1234    docker: Docker,
1235    config: ContainerConfig,
1236    exchange: &mut Exchange,
1237) -> Result<(), CamelError> {
1238    let network = exchange
1239        .input
1240        .header(HEADER_NETWORK)
1241        .and_then(|v| v.as_str().map(|s| s.to_string()))
1242        .or(config.network.clone())
1243        .ok_or_else(|| {
1244            CamelError::ProcessorError(
1245                "CamelContainerNetwork header or network param is required for network-remove"
1246                    .to_string(),
1247            )
1248        })?;
1249
1250    docker.remove_network(&network).await.map_err(|e| {
1251        let err_str = e.to_string().to_lowercase();
1252        if err_str.contains("404") || err_str.contains("not found") {
1253            CamelError::ProcessorError(format!("Network '{}' not found", network))
1254        } else if err_str.contains("409") || err_str.contains("in use") {
1255            CamelError::ProcessorError(format!(
1256                "Network '{}' is in use and cannot be removed",
1257                network
1258            ))
1259        } else {
1260            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1261        }
1262    })?;
1263
1264    exchange.input.set_header(
1265        HEADER_ACTION_RESULT,
1266        serde_json::Value::String("success".to_string()),
1267    );
1268    Ok(())
1269}
1270
1271async fn handle_network_list(
1272    docker: Docker,
1273    _config: ContainerConfig,
1274    exchange: &mut Exchange,
1275) -> Result<(), CamelError> {
1276    let networks = docker
1277        .list_networks(None::<ListNetworksOptions>)
1278        .await
1279        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1280
1281    let json_value = serde_json::to_value(&networks)
1282        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1283
1284    exchange.input.body = Body::Json(json_value);
1285    exchange.input.set_header(
1286        HEADER_ACTION_RESULT,
1287        serde_json::Value::String("success".to_string()),
1288    );
1289    Ok(())
1290}
1291
1292/// Producer for executing container operations.
1293///
1294/// This producer handles synchronous container operations like listing,
1295/// creating, starting, stopping, and removing containers.
1296#[derive(Clone)]
1297pub struct ContainerProducer {
1298    config: ContainerConfig,
1299    docker: Docker,
1300}
1301
1302impl Service<Exchange> for ContainerProducer {
1303    type Response = Exchange;
1304    type Error = CamelError;
1305    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1306
1307    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1308        Poll::Ready(Ok(()))
1309    }
1310
1311    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1312        let config = self.config.clone();
1313        let docker = self.docker.clone();
1314        Box::pin(async move {
1315            let operation_name = exchange
1316                .input
1317                .header(HEADER_ACTION)
1318                .and_then(|v| v.as_str().map(|s| s.to_string()))
1319                .unwrap_or_else(|| config.operation.clone());
1320
1321            let operation = parse_producer_operation(&operation_name)?;
1322
1323            match operation {
1324                ProducerOperation::List => {
1325                    handle_list(docker, config, &mut exchange).await?;
1326                }
1327                ProducerOperation::Run => {
1328                    handle_run(docker, config, &mut exchange).await?;
1329                }
1330                ProducerOperation::Start => {
1331                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1332                        .await?;
1333                }
1334                ProducerOperation::Stop => {
1335                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1336                        .await?;
1337                }
1338                ProducerOperation::Remove => {
1339                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1340                        .await?;
1341                }
1342                ProducerOperation::Exec => {
1343                    handle_exec(docker, config, &mut exchange).await?;
1344                }
1345                ProducerOperation::NetworkCreate => {
1346                    handle_network_create(docker, config, &mut exchange).await?;
1347                }
1348                ProducerOperation::NetworkConnect => {
1349                    handle_network_connect(docker, config, &mut exchange).await?;
1350                }
1351                ProducerOperation::NetworkDisconnect => {
1352                    handle_network_disconnect(docker, config, &mut exchange).await?;
1353                }
1354                ProducerOperation::NetworkRemove => {
1355                    handle_network_remove(docker, config, &mut exchange).await?;
1356                }
1357                ProducerOperation::NetworkList => {
1358                    handle_network_list(docker, config, &mut exchange).await?;
1359                }
1360            }
1361
1362            Ok(exchange)
1363        })
1364    }
1365}
1366
1367/// Consumer for receiving Docker container events or logs.
1368///
1369/// This consumer subscribes to Docker events or container logs and forwards them
1370/// to the route as exchanges. It implements automatic reconnection on connection failures.
1371pub struct ContainerConsumer {
1372    config: ContainerConfig,
1373    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
1374    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
1375    #[allow(dead_code)]
1376    runtime: Arc<dyn RuntimeObservability>,
1377}
1378
1379impl ContainerConsumer {
1380    pub fn new(config: ContainerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
1381        Self { config, runtime }
1382    }
1383}
1384
1385#[async_trait]
1386impl Consumer for ContainerConsumer {
1387    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1388        match self.config.operation.as_str() {
1389            "events" => self.start_events_consumer(context).await,
1390            "logs" => self.start_logs_consumer(context).await,
1391            _ => Err(CamelError::EndpointCreationFailed(format!(
1392                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1393                self.config.operation
1394            ))),
1395        }
1396    }
1397
1398    async fn stop(&mut self) -> Result<(), CamelError> {
1399        Ok(())
1400    }
1401
1402    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1403        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1404    }
1405}
1406
1407impl ContainerConsumer {
1408    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1409        use futures::StreamExt;
1410
1411        let cancel = context.cancel_token();
1412
1413        // Outer reconnect loop: when the inner event stream breaks, this loop
1414        // reconnects. The connect_docker() retry is now backed by
1415        // retry_async_cancelable (migrated from manual loop in rc-k9c).
1416        loop {
1417            let docker = match retry_async_cancelable(
1418                &self.config.reconnect,
1419                Some("container-events"),
1420                || async { self.config.connect_docker().await },
1421                |_| true,
1422                &cancel,
1423            )
1424            .await
1425            {
1426                Ok(d) => d,
1427                Err(_) if context.is_cancelled() => {
1428                    tracing::info!("Container events consumer shutting down");
1429                    return Ok(());
1430                }
1431                Err(e) => {
1432                    // TODO(ADR-0012-e-metrics): wire increment_errors("e:container:events-connect") via bd rc-mf3
1433                    tracing::error!(error = %e, "Container events consumer exhausted reconnect attempts"); // allow-log-levels
1434                    return Err(e);
1435                }
1436            };
1437
1438            let mut event_stream = docker.events(None::<EventsOptions>);
1439
1440            loop {
1441                tokio::select! {
1442                    _ = context.cancelled() => {
1443                        tracing::info!("Container events consumer shutting down");
1444                        return Ok(());
1445                    }
1446
1447                    msg = event_stream.next() => {
1448                        match msg {
1449                            Some(Ok(event)) => {
1450                                let formatted = format_docker_event(&event);
1451                                let message = Message::new(Body::Text(formatted));
1452                                let exchange = Exchange::new(message);
1453
1454                                if let Err(e) = context.send(exchange).await {
1455                                    // log-policy: system-broken
1456                                    tracing::error!("Failed to send exchange: {:?}", e);
1457                                    break;
1458                                }
1459                            }
1460                            Some(Err(e)) => {
1461                                // TODO(ADR-0012-e-metrics): wire increment_errors("e:container:events-stream") via bd rc-mf3
1462                                tracing::error!("Docker event stream error: {}. Reconnecting...", e); // allow-log-levels
1463                                break;
1464                            }
1465                            None => {
1466                                tracing::info!("Docker event stream ended. Reconnecting...");
1467                                break;
1468                            }
1469                        }
1470                    }
1471                }
1472            }
1473
1474            tokio::select! {
1475                _ = context.cancelled() => {
1476                    tracing::info!("Container events consumer shutting down");
1477                    return Ok(());
1478                }
1479                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1480            }
1481        }
1482    }
1483
1484    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1485        use futures::StreamExt;
1486
1487        let container_id = self.config.container_id.clone().ok_or_else(|| {
1488            CamelError::EndpointCreationFailed(
1489                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1490                    .to_string(),
1491            )
1492        })?;
1493
1494        let cancel = context.cancel_token();
1495
1496        // Outer reconnect loop: when the inner log stream breaks, this loop
1497        // reconnects. The connect_docker() retry is now backed by
1498        // retry_async_cancelable (migrated from manual loop in rc-k9c).
1499        loop {
1500            let docker = match retry_async_cancelable(
1501                &self.config.reconnect,
1502                Some("container-logs"),
1503                || async { self.config.connect_docker().await },
1504                |_| true,
1505                &cancel,
1506            )
1507            .await
1508            {
1509                Ok(d) => d,
1510                Err(_) if context.is_cancelled() => {
1511                    tracing::info!("Container logs consumer shutting down");
1512                    return Ok(());
1513                }
1514                Err(e) => {
1515                    // TODO(ADR-0012-e-metrics): wire increment_errors("e:container:logs-connect") via bd rc-mf3
1516                    tracing::error!(error = %e, "Container logs consumer exhausted reconnect attempts"); // allow-log-levels
1517                    return Err(e);
1518                }
1519            };
1520
1521            let tail = self
1522                .config
1523                .tail
1524                .clone()
1525                .unwrap_or_else(|| "all".to_string());
1526
1527            let options = LogsOptions {
1528                follow: self.config.follow,
1529                stdout: true,
1530                stderr: true,
1531                timestamps: self.config.timestamps,
1532                tail,
1533                ..Default::default()
1534            };
1535
1536            let mut log_stream = docker.logs(&container_id, Some(options));
1537            let container_id_header = container_id.clone();
1538
1539            loop {
1540                tokio::select! {
1541                    _ = context.cancelled() => {
1542                        tracing::info!("Container logs consumer shutting down");
1543                        return Ok(());
1544                    }
1545
1546                    msg = log_stream.next() => {
1547                        match msg {
1548                            Some(Ok(log_output)) => {
1549                                let (stream_type, content) = match log_output {
1550                                    bollard::container::LogOutput::StdOut { message } => {
1551                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1552                                    }
1553                                    bollard::container::LogOutput::StdErr { message } => {
1554                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1555                                    }
1556                                    bollard::container::LogOutput::Console { message } => {
1557                                        ("console", String::from_utf8_lossy(&message).into_owned())
1558                                    }
1559                                    bollard::container::LogOutput::StdIn { message } => {
1560                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1561                                    }
1562                                };
1563
1564                                let content = content.trim_end();
1565                                if content.is_empty() {
1566                                    continue;
1567                                }
1568
1569                                let mut message = Message::new(Body::Text(content.to_string()));
1570                                message.set_header(
1571                                    HEADER_CONTAINER_ID,
1572                                    serde_json::Value::String(container_id_header.clone()),
1573                                );
1574                                message.set_header(
1575                                    HEADER_LOG_STREAM,
1576                                    serde_json::Value::String(stream_type.to_string()),
1577                                );
1578
1579                                if self.config.timestamps
1580                                    && let Some(ts) = extract_timestamp(content) {
1581                                        message.set_header(
1582                                            HEADER_LOG_TIMESTAMP,
1583                                            serde_json::Value::String(ts),
1584                                        );
1585                                    }
1586
1587                                let exchange = Exchange::new(message);
1588
1589                                if let Err(e) = context.send(exchange).await {
1590                                    // log-policy: system-broken
1591                                    tracing::error!("Failed to send log exchange: {:?}", e);
1592                                    break;
1593                                }
1594                            }
1595                            Some(Err(e)) => {
1596                                // TODO(ADR-0012-e-metrics): wire increment_errors("e:container:logs-stream") via bd rc-mf3
1597                                tracing::error!("Docker log stream error: {}. Reconnecting...", e); // allow-log-levels
1598                                break;
1599                            }
1600                            None => {
1601                                if self.config.follow {
1602                                    tracing::info!("Docker log stream ended. Reconnecting...");
1603                                    break;
1604                                } else {
1605                                    tracing::info!("Container logs consumer finished (follow=false)");
1606                                    return Ok(());
1607                                }
1608                            }
1609                        }
1610                    }
1611                }
1612            }
1613
1614            tokio::select! {
1615                _ = context.cancelled() => {
1616                    tracing::info!("Container logs consumer shutting down");
1617                    return Ok(());
1618                }
1619                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1620            }
1621        }
1622    }
1623}
1624
1625fn extract_timestamp(log_line: &str) -> Option<String> {
1626    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1627    if parts.len() > 1 && parts[0].contains('T') {
1628        Some(parts[0].to_string())
1629    } else {
1630        None
1631    }
1632}
1633
1634/// Component for creating container endpoints.
1635///
1636/// This component handles URIs with the "container" scheme and creates
1637/// appropriate producer and consumer endpoints for Docker operations.
1638///
1639/// Containers created via `run` operation are tracked globally and can be
1640/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1641pub struct ContainerComponent {
1642    config: Option<ContainerGlobalConfig>,
1643}
1644
1645impl ContainerComponent {
1646    /// Creates a new container component instance without global config.
1647    pub fn new() -> Self {
1648        Self { config: None }
1649    }
1650
1651    /// Creates a container component with the given global config.
1652    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1653        Self {
1654            config: Some(config),
1655        }
1656    }
1657
1658    /// Creates a container component with optional global config.
1659    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1660        Self { config }
1661    }
1662}
1663
1664impl Default for ContainerComponent {
1665    fn default() -> Self {
1666        Self::new()
1667    }
1668}
1669
1670impl Component for ContainerComponent {
1671    fn scheme(&self) -> &str {
1672        "container"
1673    }
1674
1675    fn create_endpoint(
1676        &self,
1677        uri: &str,
1678        ctx: &dyn camel_component_api::ComponentContext,
1679    ) -> Result<Box<dyn Endpoint>, CamelError> {
1680        let mut config = ContainerConfig::from_uri(uri)?;
1681        // Apply global defaults if present and URI didn't set them
1682        if let Some(ref global) = self.config {
1683            config.apply_global_defaults(global);
1684        }
1685        let health_check = ContainerHealthCheck::new(&config);
1686        ctx.register_current_route_health_check(Arc::new(health_check));
1687        Ok(Box::new(ContainerEndpoint {
1688            uri: uri.to_string(),
1689            config,
1690        }))
1691    }
1692}
1693
1694/// Endpoint for container operations.
1695///
1696/// This endpoint creates producers for executing container operations
1697/// and consumers for receiving container events.
1698// TODO(CON-003): Forward container health status (inspect healthcheck / Health field) to
1699// Camel's health subsystem so the route can react to unhealthy containers.
1700pub struct ContainerEndpoint {
1701    uri: String,
1702    config: ContainerConfig,
1703}
1704
1705impl ContainerEndpoint {
1706    /// Returns the Docker host configured for this endpoint.
1707    /// Returns `None` if not set (for testing purposes).
1708    pub fn docker_host(&self) -> Option<&str> {
1709        self.config.host.as_deref()
1710    }
1711}
1712
1713impl Endpoint for ContainerEndpoint {
1714    fn uri(&self) -> &str {
1715        &self.uri
1716    }
1717
1718    fn create_consumer(
1719        &self,
1720        rt: Arc<dyn camel_component_api::RuntimeObservability>,
1721    ) -> Result<Box<dyn Consumer>, CamelError> {
1722        Ok(Box::new(ContainerConsumer::new(self.config.clone(), rt)))
1723    }
1724
1725    fn create_producer(
1726        &self,
1727        _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1728        _ctx: &ProducerContext,
1729    ) -> Result<BoxProcessor, CamelError> {
1730        let docker = self.config.connect_docker_client()?;
1731        Ok(BoxProcessor::new(ContainerProducer {
1732            config: self.config.clone(),
1733            docker,
1734        }))
1735    }
1736}
1737
1738#[cfg(test)]
1739mod tests {
1740    use camel_component_api::test_support::PanicRuntimeObservability;
1741    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1742        std::sync::Arc::new(PanicRuntimeObservability)
1743    }
1744    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1745        std::sync::Arc::new(PanicRuntimeObservability)
1746    }
1747
1748    use super::*;
1749    use camel_component_api::NoOpComponentContext;
1750
1751    #[test]
1752    fn test_container_config() {
1753        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1754        assert_eq!(config.operation, "run");
1755        assert_eq!(config.image.as_deref(), Some("alpine"));
1756        // host is None by default; global config applies it later
1757        assert!(config.host.is_none());
1758    }
1759
1760    #[test]
1761    fn test_global_config_applied_to_endpoint() {
1762        // When global config is set and URI doesn't specify host,
1763        // apply_global_defaults should set host from global config.
1764        let global =
1765            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1766        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1767        assert!(
1768            config.host.is_none(),
1769            "URI without ?host= should leave host as None"
1770        );
1771        config.apply_global_defaults(&global);
1772        assert_eq!(
1773            config.host.as_deref(),
1774            Some("unix:///custom/docker.sock"),
1775            "global docker_host must be applied when URI did not set host"
1776        );
1777    }
1778
1779    #[test]
1780    fn test_uri_param_wins_over_global_config() {
1781        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1782        let global =
1783            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1784        let mut config =
1785            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1786                .unwrap();
1787        assert_eq!(
1788            config.host.as_deref(),
1789            Some("unix:///override.sock"),
1790            "URI-set host should be parsed correctly"
1791        );
1792        config.apply_global_defaults(&global);
1793        assert_eq!(
1794            config.host.as_deref(),
1795            Some("unix:///override.sock"),
1796            "global config must NOT override a host already set by URI"
1797        );
1798    }
1799
1800    #[test]
1801    fn test_container_config_parses_name() {
1802        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1803        assert_eq!(config.name.as_deref(), Some("my-container"));
1804    }
1805
1806    #[test]
1807    fn test_parse_producer_operation_known() {
1808        assert_eq!(
1809            parse_producer_operation("list").unwrap(),
1810            ProducerOperation::List
1811        );
1812        assert_eq!(
1813            parse_producer_operation("run").unwrap(),
1814            ProducerOperation::Run
1815        );
1816        assert_eq!(
1817            parse_producer_operation("start").unwrap(),
1818            ProducerOperation::Start
1819        );
1820        assert_eq!(
1821            parse_producer_operation("stop").unwrap(),
1822            ProducerOperation::Stop
1823        );
1824        assert_eq!(
1825            parse_producer_operation("remove").unwrap(),
1826            ProducerOperation::Remove
1827        );
1828    }
1829
1830    #[test]
1831    fn test_parse_producer_operation_unknown() {
1832        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1833        match err {
1834            CamelError::ProcessorError(msg) => {
1835                assert!(
1836                    msg.contains("Unknown container operation"),
1837                    "Unexpected error message: {}",
1838                    msg
1839                );
1840            }
1841            _ => panic!("Expected ProcessorError for unknown operation"),
1842        }
1843    }
1844
1845    #[test]
1846    fn test_parse_producer_operation_new_variants() {
1847        assert_eq!(
1848            parse_producer_operation("exec").unwrap(),
1849            ProducerOperation::Exec
1850        );
1851        assert_eq!(
1852            parse_producer_operation("network-create").unwrap(),
1853            ProducerOperation::NetworkCreate
1854        );
1855        assert_eq!(
1856            parse_producer_operation("network-connect").unwrap(),
1857            ProducerOperation::NetworkConnect
1858        );
1859        assert_eq!(
1860            parse_producer_operation("network-disconnect").unwrap(),
1861            ProducerOperation::NetworkDisconnect
1862        );
1863        assert_eq!(
1864            parse_producer_operation("network-remove").unwrap(),
1865            ProducerOperation::NetworkRemove
1866        );
1867        assert_eq!(
1868            parse_producer_operation("network-list").unwrap(),
1869            ProducerOperation::NetworkList
1870        );
1871    }
1872
1873    #[test]
1874    fn test_resolve_container_name_header_overrides_config() {
1875        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1876        let mut exchange = Exchange::new(Message::new(""));
1877        exchange.input.set_header(
1878            HEADER_CONTAINER_NAME,
1879            serde_json::Value::String("header-name".to_string()),
1880        );
1881
1882        let resolved = resolve_container_name(&exchange, &config);
1883        assert_eq!(resolved.as_deref(), Some("header-name"));
1884    }
1885
1886    #[test]
1887    fn test_container_config_rejects_tcp_host() {
1888        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1889        let err = config.connect_docker_client().unwrap_err();
1890        match err {
1891            CamelError::ProcessorError(msg) => {
1892                assert!(
1893                    msg.to_lowercase().contains("tcp"),
1894                    "Expected TCP scheme error, got: {}",
1895                    msg
1896                );
1897            }
1898            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1899        }
1900    }
1901
1902    #[tokio::test]
1903    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1904        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1905        let remove_called_clone = remove_called.clone();
1906
1907        let result = run_container_with_cleanup(
1908            || async { Ok("container-123".to_string()) },
1909            |_id| async move {
1910                Err(CamelError::ProcessorError(
1911                    "Failed to start container".to_string(),
1912                ))
1913            },
1914            move |_id| {
1915                let remove_called_inner = remove_called_clone.clone();
1916                async move {
1917                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1918                    Ok(())
1919                }
1920            },
1921        )
1922        .await;
1923
1924        assert!(result.is_err(), "Expected start failure to bubble up");
1925        assert!(
1926            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1927            "Expected cleanup to remove container"
1928        );
1929    }
1930
1931    #[test]
1932    fn test_container_component_creates_endpoint() {
1933        let component = ContainerComponent::new();
1934        assert_eq!(component.scheme(), "container");
1935        let ctx = NoOpComponentContext;
1936        let endpoint = component
1937            .create_endpoint("container:run?image=alpine", &ctx)
1938            .unwrap();
1939        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1940    }
1941
1942    #[test]
1943    fn test_container_config_parses_ports() {
1944        let config =
1945            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1946        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1947    }
1948
1949    #[test]
1950    fn test_container_config_parses_env() {
1951        let config =
1952            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1953        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1954    }
1955
1956    #[test]
1957    fn test_container_config_parses_logs_options() {
1958        let config = ContainerConfig::from_uri(
1959            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1960        )
1961        .unwrap();
1962        assert_eq!(config.operation, "logs");
1963        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1964        assert!(config.follow);
1965        assert!(config.timestamps);
1966        assert_eq!(config.tail.as_deref(), Some("100"));
1967    }
1968
1969    #[test]
1970    fn test_container_config_logs_defaults() {
1971        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1972        assert!(config.follow); // default: true
1973        assert!(!config.timestamps); // default: false
1974        assert!(config.tail.is_none()); // default: None (all)
1975    }
1976
1977    #[test]
1978    fn test_parse_ports_single() {
1979        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1980        let (exposed, bindings) = config.parse_ports().unwrap();
1981
1982        assert!(exposed.contains(&"80/tcp".to_string()));
1983        assert!(bindings.contains_key("80/tcp"));
1984
1985        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1986        assert_eq!(binding.len(), 1);
1987        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1988    }
1989
1990    #[test]
1991    fn test_parse_ports_multiple() {
1992        let config =
1993            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1994        let (exposed, bindings) = config.parse_ports().unwrap();
1995
1996        assert!(exposed.contains(&"80/tcp".to_string()));
1997        assert!(exposed.contains(&"443/tcp".to_string()));
1998        assert_eq!(bindings.len(), 2);
1999    }
2000
2001    #[test]
2002    fn test_parse_ports_with_protocol() {
2003        let config =
2004            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
2005                .unwrap();
2006        let (exposed, _bindings) = config.parse_ports().unwrap();
2007
2008        assert!(exposed.contains(&"80/tcp".to_string()));
2009        assert!(exposed.contains(&"53/udp".to_string()));
2010    }
2011
2012    #[test]
2013    fn test_parse_ports_none() {
2014        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2015        let (exposed, bindings) = config.parse_ports().unwrap();
2016        assert!(exposed.is_empty());
2017        assert!(bindings.is_empty());
2018    }
2019
2020    #[test]
2021    fn test_parse_env_single() {
2022        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
2023        let env = config.parse_env().unwrap();
2024
2025        assert_eq!(env.len(), 1);
2026        assert_eq!(env[0], "FOO=bar");
2027    }
2028
2029    #[test]
2030    fn test_parse_env_multiple() {
2031        let config =
2032            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
2033                .unwrap();
2034        let env = config.parse_env().unwrap();
2035
2036        assert_eq!(env.len(), 3);
2037        assert!(env.contains(&"FOO=bar".to_string()));
2038        assert!(env.contains(&"BAZ=qux".to_string()));
2039        assert!(env.contains(&"NUM=123".to_string()));
2040    }
2041
2042    #[test]
2043    fn test_parse_env_none() {
2044        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2045        assert!(config.parse_env().is_none());
2046    }
2047
2048    use camel_component_api::Message;
2049    use std::sync::Arc;
2050
2051    #[tokio::test]
2052    async fn test_container_producer_resolves_operation_from_header() {
2053        // Try to connect to Docker - if fails, return early
2054        let docker = match Docker::connect_with_local_defaults() {
2055            Ok(d) => d,
2056            Err(_) => {
2057                eprintln!("Skipping test: Could not connect to Docker daemon");
2058                return;
2059            }
2060        };
2061
2062        if docker.ping().await.is_err() {
2063            eprintln!("Skipping test: Docker daemon not responding to ping");
2064            return;
2065        }
2066
2067        let component = ContainerComponent::new();
2068        let ctx = NoOpComponentContext;
2069        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2070
2071        let ctx = ProducerContext::new();
2072        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2073
2074        let mut exchange = Exchange::new(Message::new(""));
2075        exchange
2076            .input
2077            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2078
2079        use tower::ServiceExt;
2080        let result = producer
2081            .ready()
2082            .await
2083            .unwrap()
2084            .call(exchange)
2085            .await
2086            .unwrap();
2087
2088        // For list operation, the result header should be on the input message
2089        assert_eq!(
2090            result
2091                .input
2092                .header(HEADER_ACTION_RESULT)
2093                .map(|v| v.as_str().unwrap()),
2094            Some("success")
2095        );
2096    }
2097
2098    #[tokio::test]
2099    async fn test_container_producer_connection_error_on_invalid_host() {
2100        // Test that an invalid host (nonexistent socket) results in a connection error
2101        let component = ContainerComponent::new();
2102        let ctx = NoOpComponentContext;
2103        let endpoint = component
2104            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2105            .unwrap();
2106
2107        let ctx = ProducerContext::new();
2108        let result = endpoint.create_producer(rt(), &ctx);
2109
2110        // The producer should return an error because it cannot connect to the invalid socket
2111        assert!(
2112            result.is_err(),
2113            "Expected error when connecting to invalid host"
2114        );
2115        let err = result.unwrap_err();
2116        match &err {
2117            CamelError::ProcessorError(msg) => {
2118                assert!(
2119                    msg.to_lowercase().contains("connection")
2120                        || msg.to_lowercase().contains("connect")
2121                        || msg.to_lowercase().contains("socket")
2122                        || msg.contains("docker"),
2123                    "Error message should indicate connection failure, got: {}",
2124                    msg
2125                );
2126            }
2127            _ => panic!("Expected ProcessorError, got: {:?}", err),
2128        }
2129    }
2130
2131    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2132    #[tokio::test]
2133    async fn test_container_producer_lifecycle_operations_missing_id() {
2134        // Try to connect to Docker - if fails, return early
2135        let docker = match Docker::connect_with_local_defaults() {
2136            Ok(d) => d,
2137            Err(_) => {
2138                eprintln!("Skipping test: Could not connect to Docker daemon");
2139                return;
2140            }
2141        };
2142
2143        if docker.ping().await.is_err() {
2144            eprintln!("Skipping test: Docker daemon not responding to ping");
2145            return;
2146        }
2147
2148        let component = ContainerComponent::new();
2149        let ctx = NoOpComponentContext;
2150        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2151        let ctx = ProducerContext::new();
2152        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2153
2154        // Test each lifecycle operation without CamelContainerId header
2155        for operation in ["start", "stop", "remove"] {
2156            let mut exchange = Exchange::new(Message::new(""));
2157            exchange.input.set_header(
2158                HEADER_ACTION,
2159                serde_json::Value::String(operation.to_string()),
2160            );
2161            // Deliberately NOT setting CamelContainerId header
2162
2163            use tower::ServiceExt;
2164            let result = producer.ready().await.unwrap().call(exchange).await;
2165
2166            assert!(
2167                result.is_err(),
2168                "Expected error for {} operation without CamelContainerId",
2169                operation
2170            );
2171            let err = result.unwrap_err();
2172            match &err {
2173                CamelError::ProcessorError(msg) => {
2174                    assert!(
2175                        msg.contains(HEADER_CONTAINER_ID),
2176                        "Error message should mention {}, got: {}",
2177                        HEADER_CONTAINER_ID,
2178                        msg
2179                    );
2180                }
2181                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2182            }
2183        }
2184    }
2185
2186    /// Test that stop operation returns an error for a nonexistent container.
2187    #[tokio::test]
2188    async fn test_container_producer_stop_nonexistent() {
2189        // Try to connect to Docker - if fails, return early
2190        let docker = match Docker::connect_with_local_defaults() {
2191            Ok(d) => d,
2192            Err(_) => {
2193                eprintln!("Skipping test: Could not connect to Docker daemon");
2194                return;
2195            }
2196        };
2197
2198        if docker.ping().await.is_err() {
2199            eprintln!("Skipping test: Docker daemon not responding to ping");
2200            return;
2201        }
2202
2203        let component = ContainerComponent::new();
2204        let ctx = NoOpComponentContext;
2205        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2206        let ctx = ProducerContext::new();
2207        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2208
2209        let mut exchange = Exchange::new(Message::new(""));
2210        exchange
2211            .input
2212            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2213        exchange.input.set_header(
2214            HEADER_CONTAINER_ID,
2215            serde_json::Value::String("nonexistent-container-123".into()),
2216        );
2217
2218        use tower::ServiceExt;
2219        let result = producer.ready().await.unwrap().call(exchange).await;
2220
2221        assert!(
2222            result.is_err(),
2223            "Expected error when stopping nonexistent container"
2224        );
2225        let err = result.unwrap_err();
2226        match &err {
2227            CamelError::ProcessorError(msg) => {
2228                // Docker API returns 404 with "No such container" message
2229                assert!(
2230                    msg.to_lowercase().contains("no such container")
2231                        || msg.to_lowercase().contains("not found")
2232                        || msg.contains("404"),
2233                    "Error message should indicate container not found, got: {}",
2234                    msg
2235                );
2236            }
2237            _ => panic!("Expected ProcessorError, got: {:?}", err),
2238        }
2239    }
2240
2241    /// Test that run operation returns an error when no image is provided.
2242    #[tokio::test]
2243    async fn test_container_producer_run_missing_image() {
2244        // Try to connect to Docker - if fails, return early
2245        let docker = match Docker::connect_with_local_defaults() {
2246            Ok(d) => d,
2247            Err(_) => {
2248                eprintln!("Skipping test: Could not connect to Docker daemon");
2249                return;
2250            }
2251        };
2252
2253        if docker.ping().await.is_err() {
2254            eprintln!("Skipping test: Docker daemon not responding to ping");
2255            return;
2256        }
2257
2258        // Create producer without an image in the URI
2259        let component = ContainerComponent::new();
2260        let ctx = NoOpComponentContext;
2261        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2262        let ctx = ProducerContext::new();
2263        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2264
2265        let mut exchange = Exchange::new(Message::new(""));
2266        exchange
2267            .input
2268            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2269        // Deliberately NOT setting CamelContainerImage header
2270
2271        use tower::ServiceExt;
2272        let result = producer.ready().await.unwrap().call(exchange).await;
2273
2274        assert!(
2275            result.is_err(),
2276            "Expected error for run operation without image"
2277        );
2278        let err = result.unwrap_err();
2279        match &err {
2280            CamelError::ProcessorError(msg) => {
2281                assert!(
2282                    msg.to_lowercase().contains("image"),
2283                    "Error message should mention 'image', got: {}",
2284                    msg
2285                );
2286            }
2287            _ => panic!("Expected ProcessorError, got: {:?}", err),
2288        }
2289    }
2290
2291    /// Test that run operation uses image from header.
2292    #[tokio::test]
2293    async fn test_container_producer_run_image_from_header() {
2294        // Try to connect to Docker - if fails, return early
2295        let docker = match Docker::connect_with_local_defaults() {
2296            Ok(d) => d,
2297            Err(_) => {
2298                eprintln!("Skipping test: Could not connect to Docker daemon");
2299                return;
2300            }
2301        };
2302
2303        if docker.ping().await.is_err() {
2304            eprintln!("Skipping test: Docker daemon not responding to ping");
2305            return;
2306        }
2307
2308        // Create producer without an image in the URI
2309        let component = ContainerComponent::new();
2310        let ctx = NoOpComponentContext;
2311        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2312        let ctx = ProducerContext::new();
2313        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2314
2315        let mut exchange = Exchange::new(Message::new(""));
2316        exchange
2317            .input
2318            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2319        // Set a non-existent image to test that the run operation attempts to use it
2320        exchange.input.set_header(
2321            HEADER_IMAGE,
2322            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2323        );
2324
2325        use tower::ServiceExt;
2326        let result = producer.ready().await.unwrap().call(exchange).await;
2327
2328        // The operation should fail because the image doesn't exist
2329        assert!(
2330            result.is_err(),
2331            "Expected error when running container with nonexistent image"
2332        );
2333        let err = result.unwrap_err();
2334        match &err {
2335            CamelError::ProcessorError(msg) => {
2336                // Docker API returns an error about the missing image
2337                assert!(
2338                    msg.to_lowercase().contains("no such image")
2339                        || msg.to_lowercase().contains("not found")
2340                        || msg.to_lowercase().contains("image")
2341                        || msg.to_lowercase().contains("pull")
2342                        || msg.contains("404"),
2343                    "Error message should indicate image issue, got: {}",
2344                    msg
2345                );
2346            }
2347            _ => panic!("Expected ProcessorError, got: {:?}", err),
2348        }
2349    }
2350
2351    /// Integration test: Actually run a container with alpine:latest.
2352    /// This test verifies the full flow: create → start → set headers.
2353    #[tokio::test]
2354    async fn test_container_producer_run_alpine_container() {
2355        let docker = match Docker::connect_with_local_defaults() {
2356            Ok(d) => d,
2357            Err(_) => {
2358                eprintln!("Skipping test: Could not connect to Docker daemon");
2359                return;
2360            }
2361        };
2362
2363        if docker.ping().await.is_err() {
2364            eprintln!("Skipping test: Docker daemon not responding to ping");
2365            return;
2366        }
2367
2368        // Pull alpine:latest if not present
2369        let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2370        let has_alpine = images
2371            .iter()
2372            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2373
2374        if !has_alpine {
2375            eprintln!("Pulling alpine:latest image...");
2376            let mut stream = docker.create_image(
2377                Some(CreateImageOptions {
2378                    from_image: Some("alpine:latest".to_string()),
2379                    ..Default::default()
2380                }),
2381                None,
2382                None,
2383            );
2384
2385            use futures::StreamExt;
2386            while let Some(_item) = stream.next().await {
2387                // Wait for pull to complete
2388            }
2389            eprintln!("Image pulled successfully");
2390        }
2391
2392        // Create producer
2393        let component = ContainerComponent::new();
2394        let ctx = NoOpComponentContext;
2395        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2396        let ctx = ProducerContext::new();
2397        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2398
2399        // Run container with unique name
2400        let timestamp = std::time::SystemTime::now()
2401            .duration_since(std::time::UNIX_EPOCH)
2402            .unwrap()
2403            .as_millis();
2404        let container_name = format!("test-rust-camel-{}", timestamp);
2405        let mut exchange = Exchange::new(Message::new(""));
2406        exchange.input.set_header(
2407            HEADER_IMAGE,
2408            serde_json::Value::String("alpine:latest".into()),
2409        );
2410        exchange.input.set_header(
2411            HEADER_CONTAINER_NAME,
2412            serde_json::Value::String(container_name.clone()),
2413        );
2414
2415        use tower::ServiceExt;
2416        let result = producer
2417            .ready()
2418            .await
2419            .unwrap()
2420            .call(exchange)
2421            .await
2422            .expect("Container run should succeed");
2423
2424        // Verify container ID was set
2425        let container_id = result
2426            .input
2427            .header(HEADER_CONTAINER_ID)
2428            .and_then(|v| v.as_str().map(|s| s.to_string()))
2429            .expect("Expected container ID header");
2430        assert!(!container_id.is_empty(), "Container ID should not be empty");
2431
2432        // Verify success header
2433        assert_eq!(
2434            result
2435                .input
2436                .header(HEADER_ACTION_RESULT)
2437                .and_then(|v| v.as_str()),
2438            Some("success")
2439        );
2440
2441        // Verify container exists in Docker
2442        let inspect = docker
2443            .inspect_container(&container_id, None)
2444            .await
2445            .expect("Container should exist");
2446        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2447
2448        // Cleanup: remove container
2449        docker
2450            .remove_container(
2451                &container_id,
2452                Some(RemoveContainerOptions {
2453                    force: true,
2454                    ..Default::default()
2455                }),
2456            )
2457            .await
2458            .ok();
2459
2460        eprintln!("✅ Container {} created and cleaned up", container_id);
2461    }
2462
2463    /// Test that consumer returns an error for unsupported operations.
2464    #[tokio::test]
2465    async fn test_container_consumer_unsupported_operation() {
2466        use tokio::sync::mpsc;
2467
2468        let component = ContainerComponent::new();
2469        let ctx = NoOpComponentContext;
2470        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2471        let mut consumer = endpoint.create_consumer(rt()).unwrap();
2472
2473        // Create a minimal ConsumerContext
2474        let (tx, _rx) = mpsc::channel(16);
2475        let cancel_token = tokio_util::sync::CancellationToken::new();
2476        let context = ConsumerContext::new(tx, cancel_token);
2477
2478        let result = consumer.start(context).await;
2479
2480        // Should return error because "run" is not a supported consumer operation
2481        assert!(
2482            result.is_err(),
2483            "Expected error for unsupported consumer operation"
2484        );
2485        let err = result.unwrap_err();
2486        match &err {
2487            CamelError::EndpointCreationFailed(msg) => {
2488                assert!(
2489                    msg.contains("Consumer only supports 'events' or 'logs'"),
2490                    "Error message should mention events or logs support, got: {}",
2491                    msg
2492                );
2493            }
2494            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2495        }
2496    }
2497
2498    #[test]
2499    fn test_container_consumer_concurrency_model_is_concurrent() {
2500        let consumer = ContainerConsumer {
2501            config: ContainerConfig::from_uri("container:events").unwrap(),
2502            runtime: test_rt(),
2503        };
2504
2505        assert_eq!(
2506            consumer.concurrency_model(),
2507            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2508        );
2509    }
2510
2511    /// Test that consumer gracefully shuts down when cancellation is requested.
2512    /// This test requires a running Docker daemon. If Docker is not available, the test
2513    /// will be ignored.
2514    #[tokio::test]
2515    async fn test_container_consumer_cancellation() {
2516        use std::sync::atomic::{AtomicBool, Ordering};
2517        use tokio::sync::mpsc;
2518
2519        // Try to connect to Docker - if fails, return early
2520        let docker = match Docker::connect_with_local_defaults() {
2521            Ok(d) => d,
2522            Err(_) => {
2523                eprintln!("Skipping test: Could not connect to Docker daemon");
2524                return;
2525            }
2526        };
2527
2528        if docker.ping().await.is_err() {
2529            eprintln!("Skipping test: Docker daemon not responding to ping");
2530            return;
2531        }
2532
2533        let component = ContainerComponent::new();
2534        let ctx = NoOpComponentContext;
2535        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2536        let mut consumer = endpoint.create_consumer(rt()).unwrap();
2537
2538        // Create a ConsumerContext
2539        let (tx, _rx) = mpsc::channel(16);
2540        let cancel_token = tokio_util::sync::CancellationToken::new();
2541        let context = ConsumerContext::new(tx, cancel_token.clone());
2542
2543        // Track if the consumer task has completed
2544        let completed = Arc::new(AtomicBool::new(false));
2545        let completed_clone = completed.clone();
2546
2547        // Spawn consumer in background
2548        let handle = tokio::spawn(async move {
2549            let result = consumer.start(context).await;
2550            // Mark as completed when done
2551            completed_clone.store(true, Ordering::SeqCst);
2552            result
2553        });
2554
2555        // Wait a bit for the consumer to start
2556        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2557
2558        // Consumer should still be running (not completed)
2559        assert!(
2560            !completed.load(Ordering::SeqCst),
2561            "Consumer should still be running before cancellation"
2562        );
2563
2564        // Request cancellation
2565        cancel_token.cancel();
2566
2567        // Wait for the task to complete (with timeout)
2568        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2569
2570        // Task should have completed (not timed out)
2571        assert!(
2572            result.is_ok(),
2573            "Consumer should gracefully shut down after cancellation"
2574        );
2575
2576        // Verify the consumer completed
2577        assert!(
2578            completed.load(Ordering::SeqCst),
2579            "Consumer should have completed after cancellation"
2580        );
2581    }
2582
2583    /// Integration test for listing containers.
2584    /// This test requires a running Docker daemon. If Docker is not available, the test
2585    /// will return early and be effectively ignored.
2586    #[tokio::test]
2587    async fn test_container_producer_list_containers() {
2588        // Try to connect to Docker using default config
2589        // If ping fails, return early (effectively ignoring this test)
2590        let docker = match Docker::connect_with_local_defaults() {
2591            Ok(d) => d,
2592            Err(_) => {
2593                eprintln!("Skipping test: Could not connect to Docker daemon");
2594                return;
2595            }
2596        };
2597
2598        if docker.ping().await.is_err() {
2599            eprintln!("Skipping test: Docker daemon not responding to ping");
2600            return;
2601        }
2602
2603        // Create producer with list operation
2604        let component = ContainerComponent::new();
2605        let ctx = NoOpComponentContext;
2606        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2607
2608        let ctx = ProducerContext::new();
2609        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2610
2611        // Create exchange with list operation in header
2612        let mut exchange = Exchange::new(Message::new(""));
2613        exchange
2614            .input
2615            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2616
2617        // Call the producer
2618        use tower::ServiceExt;
2619        let result = producer
2620            .ready()
2621            .await
2622            .unwrap()
2623            .call(exchange)
2624            .await
2625            .expect("Producer should succeed when Docker is available");
2626
2627        // Assert that the input exchange body is a JSON array
2628        // (because list_containers should put the JSON result in the body)
2629        match &result.input.body {
2630            camel_component_api::Body::Json(json_value) => {
2631                assert!(
2632                    json_value.is_array(),
2633                    "Expected input body to be a JSON array, got: {:?}",
2634                    json_value
2635                );
2636            }
2637            other => panic!("Expected Body::Json with array, got: {:?}", other),
2638        }
2639    }
2640
2641    #[test]
2642    fn test_container_config_parses_volumes() {
2643        let config = ContainerConfig::from_uri(
2644            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2645        )
2646        .unwrap();
2647        assert_eq!(
2648            config.volumes.as_deref(),
2649            Some("./html:/usr/share/nginx/html:ro")
2650        );
2651    }
2652
2653    #[test]
2654    fn test_container_config_parses_exec_params() {
2655        let config = ContainerConfig::from_uri(
2656            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2657        )
2658        .unwrap();
2659        assert_eq!(config.operation, "exec");
2660        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2661        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2662        assert_eq!(config.user.as_deref(), Some("root"));
2663        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2664        assert!(config.detach);
2665    }
2666
2667    #[test]
2668    fn test_container_config_parses_network_create_params() {
2669        let config =
2670            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2671                .unwrap();
2672        assert_eq!(config.operation, "network-create");
2673        assert_eq!(config.name.as_deref(), Some("my-net"));
2674        assert_eq!(config.driver.as_deref(), Some("bridge"));
2675    }
2676
2677    #[test]
2678    fn test_container_config_defaults_new_fields() {
2679        let config = ContainerConfig::from_uri("container:list").unwrap();
2680        assert!(config.volumes.is_none());
2681        assert!(config.user.is_none());
2682        assert!(config.workdir.is_none());
2683        assert!(!config.detach);
2684        assert!(config.driver.is_none());
2685        assert!(!config.force);
2686    }
2687
2688    #[test]
2689    fn test_parse_volumes_bind_mount() {
2690        let config = ContainerConfig::from_uri(
2691            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2692        )
2693        .unwrap();
2694        let (binds, anon) = config.parse_volumes().unwrap();
2695        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2696        assert!(anon.is_empty());
2697    }
2698
2699    #[test]
2700    fn test_parse_volumes_named_volume() {
2701        let config =
2702            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2703                .unwrap();
2704        let (binds, anon) = config.parse_volumes().unwrap();
2705        assert_eq!(binds, vec!["data:/var/lib/data"]);
2706        assert!(anon.is_empty());
2707    }
2708
2709    #[test]
2710    fn test_parse_volumes_anonymous() {
2711        let config =
2712            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2713        let (binds, anon) = config.parse_volumes().unwrap();
2714        assert!(binds.is_empty());
2715        assert!(anon.contains(&"/tmp/app-data".to_string()));
2716    }
2717
2718    #[test]
2719    fn test_parse_volumes_anonymous_with_mode() {
2720        let config =
2721            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2722                .unwrap();
2723        let (binds, anon) = config.parse_volumes().unwrap();
2724        assert!(binds.is_empty());
2725        assert!(anon.contains(&"/tmp/app-data".to_string()));
2726    }
2727
2728    #[test]
2729    fn test_parse_volumes_multiple() {
2730        let config = ContainerConfig::from_uri(
2731            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2732        )
2733        .unwrap();
2734        let (binds, anon) = config.parse_volumes().unwrap();
2735        assert_eq!(binds.len(), 2);
2736        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2737        assert!(binds.contains(&"data:/var/log/app".to_string()));
2738        assert!(anon.is_empty());
2739    }
2740
2741    #[test]
2742    fn test_parse_volumes_mixed() {
2743        let config = ContainerConfig::from_uri(
2744            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2745        )
2746        .unwrap();
2747        let (binds, anon) = config.parse_volumes().unwrap();
2748        assert_eq!(binds.len(), 1);
2749        assert!(anon.contains(&"/tmp/cache".to_string()));
2750    }
2751
2752    #[test]
2753    fn test_parse_volumes_none() {
2754        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2755        assert!(config.parse_volumes().is_none());
2756    }
2757
2758    #[test]
2759    fn test_parse_volumes_empty_entry_skipped() {
2760        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2761        assert!(config.parse_volumes().is_none());
2762    }
2763
2764    #[test]
2765    fn test_parse_volumes_rw_mode() {
2766        let config =
2767            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2768                .unwrap();
2769        let (binds, _) = config.parse_volumes().unwrap();
2770        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2771    }
2772
2773    #[test]
2774    fn test_container_config_from_uri_parses_false_flags() {
2775        let config = ContainerConfig::from_uri(
2776            "container:logs?containerId=a&follow=false&timestamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2777        )
2778        .unwrap();
2779        assert!(!config.follow);
2780        assert!(!config.timestamps);
2781        assert!(!config.auto_pull);
2782        assert!(!config.auto_remove);
2783        assert!(config.detach);
2784        assert!(config.force);
2785    }
2786
2787    #[test]
2788    fn test_docker_socket_path_validation() {
2789        let unix_cfg =
2790            ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2791        assert_eq!(
2792            unix_cfg.docker_socket_path().unwrap(),
2793            "unix:///tmp/docker.sock"
2794        );
2795
2796        let npipe_cfg =
2797            ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2798                .unwrap();
2799        assert_eq!(
2800            npipe_cfg.docker_socket_path().unwrap(),
2801            "npipe:////./pipe/docker_engine"
2802        );
2803
2804        let plain_cfg =
2805            ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2806        assert_eq!(
2807            plain_cfg.docker_socket_path().unwrap(),
2808            "/var/run/docker.sock"
2809        );
2810
2811        let bad_cfg =
2812            ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2813        assert!(bad_cfg.docker_socket_path().is_err());
2814    }
2815
2816    #[test]
2817    fn test_parse_ports_invalid_and_whitespace_entries() {
2818        // "8080" without colon is malformed — must fail-fast
2819        let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2820        let err = cfg.parse_ports().unwrap_err();
2821        assert!(
2822            err.to_string().contains("malformed port mapping"),
2823            "expected malformed port error, got: {}",
2824            err
2825        );
2826
2827        let cfg =
2828            ContainerConfig::from_uri("container:run?ports= 8080:80 ,  5353:53/udp ").unwrap();
2829        let (exposed, bindings) = cfg.parse_ports().unwrap();
2830        assert!(exposed.contains(&"80/tcp".to_string()));
2831        assert!(exposed.contains(&"53/udp".to_string()));
2832        assert_eq!(bindings.len(), 2);
2833    }
2834
2835    #[test]
2836    fn test_parse_ports_fail_fast_on_malformed() {
2837        // First entry valid, second malformed — must fail on the malformed one
2838        let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2839        let err = cfg.parse_ports().unwrap_err();
2840        assert!(
2841            err.to_string()
2842                .contains("malformed port mapping 'badentry'"),
2843            "expected fail-fast on 'badentry', got: {}",
2844            err
2845        );
2846    }
2847
2848    #[test]
2849    fn test_parse_env_trims_and_filters_empty_items() {
2850        let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2851        let env = cfg.parse_env().unwrap();
2852        assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2853
2854        let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2855        assert!(cfg.parse_env().is_none());
2856    }
2857
2858    #[test]
2859    fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2860        assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2861        let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2862        assert!(binds.contains(&"a:/b:rw".to_string()));
2863        assert!(anon.contains(&"/tmp/cache".to_string()));
2864        assert!(anon.contains(&"/tmp/logs".to_string()));
2865    }
2866
2867    #[test]
2868    fn test_format_docker_event_variants_and_timestamp_extraction() {
2869        let mut attrs = std::collections::HashMap::new();
2870        attrs.insert("name".to_string(), "demo".to_string());
2871        attrs.insert("image".to_string(), "alpine:latest".to_string());
2872        attrs.insert("exitCode".to_string(), "137".to_string());
2873        let actor = bollard::models::EventActor {
2874            id: None,
2875            attributes: Some(attrs),
2876        };
2877
2878        let create_event = bollard::models::EventMessage {
2879            action: Some("create".to_string()),
2880            actor: Some(actor.clone()),
2881            ..Default::default()
2882        };
2883        assert_eq!(
2884            format_docker_event(&create_event),
2885            "[CREATE] Container demo (alpine:latest)"
2886        );
2887
2888        let die_event = bollard::models::EventMessage {
2889            action: Some("die".to_string()),
2890            actor: Some(actor),
2891            ..Default::default()
2892        };
2893        assert_eq!(
2894            format_docker_event(&die_event),
2895            "[DIE]    Container demo (exit: 137)"
2896        );
2897
2898        let other_event = bollard::models::EventMessage {
2899            action: Some("oom".to_string()),
2900            actor: None,
2901            ..Default::default()
2902        };
2903        assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2904
2905        assert_eq!(
2906            extract_timestamp("2024-01-01T00:00:00Z hello"),
2907            Some("2024-01-01T00:00:00Z".to_string())
2908        );
2909        assert_eq!(extract_timestamp("hello world"), None);
2910    }
2911
2912    #[tokio::test]
2913    async fn test_run_container_with_cleanup_error_paths() {
2914        let create_fail = run_container_with_cleanup(
2915            || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2916            |_id| async move { Ok(()) },
2917            |_id| async move { Ok(()) },
2918        )
2919        .await;
2920        assert!(
2921            matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2922        );
2923
2924        let cleanup_fail = run_container_with_cleanup(
2925            || async { Ok("cid-1".to_string()) },
2926            |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2927            |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2928        )
2929        .await;
2930        match cleanup_fail {
2931            Err(CamelError::ProcessorError(msg)) => {
2932                assert!(msg.contains("Failed to start container"));
2933                assert!(msg.contains("Cleanup failed"));
2934            }
2935            other => panic!("unexpected result: {:?}", other),
2936        }
2937    }
2938
2939    #[tokio::test]
2940    async fn test_container_producer_network_lifecycle() {
2941        let docker = match Docker::connect_with_local_defaults() {
2942            Ok(d) => d,
2943            Err(_) => {
2944                eprintln!("Skipping test: Could not connect to Docker daemon");
2945                return;
2946            }
2947        };
2948        if docker.ping().await.is_err() {
2949            eprintln!("Skipping test: Docker daemon not responding to ping");
2950            return;
2951        }
2952
2953        let network_name = format!("camel-test-{}", std::process::id());
2954
2955        let component = ContainerComponent::new();
2956        let component_ctx = NoOpComponentContext;
2957
2958        // Create network
2959        let endpoint = component
2960            .create_endpoint(
2961                &format!("container:network-create?name={}", network_name),
2962                &component_ctx,
2963            )
2964            .unwrap();
2965        let ctx = ProducerContext::new();
2966        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2967
2968        let mut exchange = Exchange::new(Message::new(""));
2969        exchange.input.set_header(
2970            HEADER_ACTION,
2971            serde_json::Value::String("network-create".into()),
2972        );
2973
2974        use tower::ServiceExt;
2975        let result = producer
2976            .ready()
2977            .await
2978            .unwrap()
2979            .call(exchange)
2980            .await
2981            .expect("Network create should succeed");
2982
2983        let network_id = result
2984            .input
2985            .header(HEADER_NETWORK)
2986            .and_then(|v| v.as_str().map(|s| s.to_string()))
2987            .expect("Should have network ID");
2988
2989        assert!(!network_id.is_empty());
2990
2991        // List networks
2992        let endpoint = component
2993            .create_endpoint("container:network-list", &component_ctx)
2994            .unwrap();
2995        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2996
2997        let mut exchange = Exchange::new(Message::new(""));
2998        exchange.input.set_header(
2999            HEADER_ACTION,
3000            serde_json::Value::String("network-list".into()),
3001        );
3002
3003        let list_result = producer
3004            .ready()
3005            .await
3006            .unwrap()
3007            .call(exchange)
3008            .await
3009            .expect("Network list should succeed");
3010
3011        match &list_result.input.body {
3012            Body::Json(json_value) => {
3013                assert!(json_value.is_array(), "Expected JSON array");
3014            }
3015            other => panic!("Expected Body::Json, got: {:?}", other),
3016        }
3017
3018        // Remove network
3019        let endpoint = component
3020            .create_endpoint(
3021                &format!("container:network-remove?network={}", network_name),
3022                &component_ctx,
3023            )
3024            .unwrap();
3025        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3026
3027        let mut exchange = Exchange::new(Message::new(""));
3028        exchange.input.set_header(
3029            HEADER_ACTION,
3030            serde_json::Value::String("network-remove".into()),
3031        );
3032
3033        let remove_result = producer
3034            .ready()
3035            .await
3036            .unwrap()
3037            .call(exchange)
3038            .await
3039            .expect("Network remove should succeed");
3040
3041        let action_result = remove_result
3042            .input
3043            .header(HEADER_ACTION_RESULT)
3044            .and_then(|v| v.as_str());
3045        assert_eq!(action_result, Some("success"));
3046    }
3047
3048    #[tokio::test]
3049    async fn test_logs_consumer_requires_container_id() {
3050        use tokio::sync::mpsc;
3051
3052        let mut consumer = ContainerConsumer {
3053            config: ContainerConfig::from_uri("container:logs").unwrap(),
3054            runtime: test_rt(),
3055        };
3056        let (tx, _rx) = mpsc::channel(4);
3057        let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
3058
3059        let err = consumer.start(context).await.unwrap_err();
3060        match err {
3061            CamelError::EndpointCreationFailed(msg) => {
3062                assert!(msg.contains("containerId is required for logs consumer"));
3063            }
3064            other => panic!("unexpected error: {:?}", other),
3065        }
3066    }
3067
3068    #[tokio::test]
3069    async fn test_events_consumer_stops_immediately_when_cancelled() {
3070        use tokio::sync::mpsc;
3071
3072        let mut consumer = ContainerConsumer {
3073            config: ContainerConfig::from_uri("container:events").unwrap(),
3074            runtime: test_rt(),
3075        };
3076
3077        let (tx, _rx) = mpsc::channel(4);
3078        let cancel = tokio_util::sync::CancellationToken::new();
3079        cancel.cancel();
3080        let context = ConsumerContext::new(tx, cancel);
3081
3082        let result = consumer.start(context).await;
3083        assert!(result.is_ok());
3084    }
3085
3086    #[test]
3087    fn test_global_config_constructors_and_endpoint_docker_host() {
3088        let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3089        let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3090        cfg.apply_global_defaults(&global);
3091
3092        let endpoint = ContainerEndpoint {
3093            uri: "container:list".to_string(),
3094            config: cfg,
3095        };
3096
3097        assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3098
3099        let component = ContainerComponent::with_config(global);
3100        assert_eq!(component.scheme(), "container");
3101    }
3102
3103    #[test]
3104    fn container_global_config_has_reconnect_policy() {
3105        let cfg = ContainerGlobalConfig::default();
3106        assert_eq!(cfg.reconnect.max_attempts, 0); // unlimited
3107        assert!(cfg.reconnect.enabled);
3108    }
3109
3110    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
3111    /// Replicates the exact retry loop from ContainerConsumer::{start_events_consumer,start_logs_consumer}
3112    /// (lib.rs:~1408-1431, ~1503-1525):
3113    ///   attempt starts at 0, incremented on error, !should_retry(attempt), delay_for(attempt-1)
3114    #[tokio::test]
3115    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
3116        use std::sync::Arc;
3117        use std::sync::atomic::{AtomicU32, Ordering};
3118        use std::time::Duration;
3119
3120        let policy = NetworkRetryPolicy {
3121            max_attempts: 3,
3122            initial_delay: Duration::from_millis(1),
3123            max_delay: Duration::from_millis(1),
3124            multiplier: 1.0,
3125            ..NetworkRetryPolicy::default()
3126        };
3127
3128        let calls = Arc::new(AtomicU32::new(0));
3129        let calls_clone = Arc::clone(&calls);
3130        let mut attempt: u32 = 0;
3131
3132        loop {
3133            calls_clone.fetch_add(1, Ordering::SeqCst);
3134            let result: Result<(), ()> = Err(());
3135            match result {
3136                Ok(_) => {
3137                    break;
3138                }
3139                Err(_) => {
3140                    attempt += 1;
3141                    if !policy.should_retry(attempt) {
3142                        break;
3143                    }
3144                    let delay = policy.delay_for(attempt - 1);
3145                    tokio::time::sleep(delay).await;
3146                    continue;
3147                }
3148            }
3149        }
3150
3151        assert_eq!(
3152            calls.load(Ordering::SeqCst),
3153            3,
3154            "max_attempts=3 must yield exactly 3 invocations"
3155        );
3156    }
3157}