Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::models::{
19    ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
20};
21use bollard::query_parameters::{
22    CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
23    ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
24    StartContainerOptions,
25};
26use bollard::service::{HostConfig, PortBinding};
27use camel_component_api::parse_uri;
28use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
29use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
30use tower::Service;
31
32/// Global tracker for containers created by this component.
33/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
34static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
35    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
36
37/// Registers a container ID for tracking (will be cleaned up on shutdown).
38fn track_container(id: String) {
39    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
40        tracker.insert(id);
41    }
42}
43
44/// Removes a container ID from tracking (when it's been removed naturally).
45fn untrack_container(id: &str) {
46    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
47        tracker.remove(id);
48    }
49}
50
51/// Cleans up all tracked containers. Call this on application shutdown.
52pub async fn cleanup_tracked_containers() {
53    let ids: Vec<String> = {
54        match CONTAINER_TRACKER.lock() {
55            Ok(tracker) => tracker.iter().cloned().collect(),
56            Err(_) => return,
57        }
58    };
59
60    if ids.is_empty() {
61        return;
62    }
63
64    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
65
66    let docker = match Docker::connect_with_local_defaults() {
67        Ok(d) => d,
68        Err(e) => {
69            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
70            return;
71        }
72    };
73
74    for id in ids {
75        match docker
76            .remove_container(
77                &id,
78                Some(RemoveContainerOptions {
79                    force: true,
80                    ..Default::default()
81                }),
82            )
83            .await
84        {
85            Ok(_) => {
86                tracing::debug!("Cleaned up container {}", id);
87                untrack_container(&id);
88            }
89            Err(e) => {
90                tracing::warn!("Failed to cleanup container {}: {}", id, e);
91            }
92        }
93    }
94}
95
96// Header constants for container operations
97
98/// Timeout (seconds) for connecting to the Docker daemon.
99const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
100
101/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
102pub const HEADER_ACTION: &str = "CamelContainerAction";
103
104/// Header key for specifying the container image to use for "run" operations.
105pub const HEADER_IMAGE: &str = "CamelContainerImage";
106
107/// Header key for specifying or receiving the container ID.
108pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
109
110/// Header key for the log stream type (stdout or stderr).
111pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
112
113/// Header key for the log timestamp.
114pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
115
116/// Header key for specifying the container name for "run" operations.
117pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
118
119/// Header key for the result status of a container operation (e.g., "success").
120pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
121
122/// Header key for specifying the command to execute in a container.
123pub const HEADER_CMD: &str = "CamelContainerCmd";
124
125/// Header key for specifying the network name for network operations.
126pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
127
128/// Header key for the exit code of an exec operation.
129pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
130
131/// Header key for specifying volume mounts.
132pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
133
134/// Header key for the exec instance ID.
135pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
136
137// ---------------------------------------------------------------------------
138// ContainerGlobalConfig
139// ---------------------------------------------------------------------------
140
141/// Global configuration for Container component.
142/// Supports serde deserialization with defaults and builder methods.
143/// These are the fallback defaults when URI params are not set.
144#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
145#[serde(default)]
146pub struct ContainerGlobalConfig {
147    /// The Docker host URL (default: "unix:///var/run/docker.sock").
148    pub docker_host: String,
149}
150
151impl Default for ContainerGlobalConfig {
152    fn default() -> Self {
153        Self {
154            docker_host: "unix:///var/run/docker.sock".to_string(),
155        }
156    }
157}
158
159impl ContainerGlobalConfig {
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
165        self.docker_host = v.into();
166        self
167    }
168}
169
170// ---------------------------------------------------------------------------
171// ContainerConfig (endpoint configuration)
172// ---------------------------------------------------------------------------
173
174/// Configuration for the container component endpoint.
175///
176/// This struct holds the parsed URI configuration including the operation type,
177/// optional container image, and Docker host connection details.
178#[derive(Debug, Clone)]
179pub struct ContainerConfig {
180    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
181    pub operation: String,
182    /// The container image to use for "run" operations (can be overridden via header).
183    pub image: Option<String>,
184    /// The container name to use for "run" operations (can be overridden via header).
185    pub name: Option<String>,
186    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
187    pub host: Option<String>,
188    /// Command to run in the container (e.g., "sleep 30").
189    pub cmd: Option<String>,
190    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
191    pub ports: Option<String>,
192    /// Environment variables in format "KEY=value,KEY2=value2".
193    pub env: Option<String>,
194    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
195    pub network: Option<String>,
196    /// Container ID or name for logs consumer.
197    pub container_id: Option<String>,
198    /// Follow log output (default: true for consumer).
199    pub follow: bool,
200    /// Include timestamps in logs (default: false).
201    pub timestamps: bool,
202    /// Number of lines to show from the end of logs (default: all).
203    pub tail: Option<String>,
204    /// Automatically pull the image if not present (default: true).
205    pub auto_pull: bool,
206    /// Automatically remove the container when it exits (default: true).
207    pub auto_remove: bool,
208    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
209    pub volumes: Option<String>,
210    /// User to run the container or exec command as (e.g., "root").
211    pub user: Option<String>,
212    /// Working directory inside the container.
213    pub workdir: Option<String>,
214    /// Whether to detach from the exec process (default: false).
215    pub detach: bool,
216    /// Network driver for network-create (e.g., "bridge", "overlay").
217    pub driver: Option<String>,
218    /// Whether to force the operation (default: false).
219    pub force: bool,
220}
221
222impl ContainerConfig {
223    /// Parses a container URI into a `ContainerConfig`.
224    ///
225    /// # Arguments
226    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
227    ///
228    /// # Errors
229    /// Returns an error if the URI scheme is not "container".
230    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
231        let parts = parse_uri(uri)?;
232        if parts.scheme != "container" {
233            return Err(CamelError::InvalidUri(format!(
234                "expected scheme 'container', got '{}'",
235                parts.scheme
236            )));
237        }
238
239        let image = parts.params.get("image").cloned();
240        let name = parts.params.get("name").cloned();
241        let cmd = parts.params.get("cmd").cloned();
242        let ports = parts.params.get("ports").cloned();
243        let env = parts.params.get("env").cloned();
244        let network = parts.params.get("network").cloned();
245        let container_id = parts.params.get("containerId").cloned();
246        let follow = parts
247            .params
248            .get("follow")
249            .map(|v| v.eq_ignore_ascii_case("true"))
250            .unwrap_or(true);
251        let timestamps = parts
252            .params
253            .get("timestamps")
254            .map(|v| v.eq_ignore_ascii_case("true"))
255            .unwrap_or(false);
256        let tail = parts.params.get("tail").cloned();
257        let auto_pull = parts
258            .params
259            .get("autoPull")
260            .map(|v| v.eq_ignore_ascii_case("true"))
261            .unwrap_or(true);
262        let auto_remove = parts
263            .params
264            .get("autoRemove")
265            .map(|v| v.eq_ignore_ascii_case("true"))
266            .unwrap_or(true);
267        // host is only set from URI param; global config defaults are applied later
268        let host = parts.params.get("host").cloned();
269        let volumes = parts.params.get("volumes").cloned();
270        let user = parts.params.get("user").cloned();
271        let workdir = parts.params.get("workdir").cloned();
272        let detach = parts
273            .params
274            .get("detach")
275            .map(|v| v.eq_ignore_ascii_case("true"))
276            .unwrap_or(false);
277        let driver = parts.params.get("driver").cloned();
278        let force = parts
279            .params
280            .get("force")
281            .map(|v| v.eq_ignore_ascii_case("true"))
282            .unwrap_or(false);
283
284        Ok(Self {
285            operation: parts.path,
286            image,
287            name,
288            host,
289            cmd,
290            ports,
291            env,
292            network,
293            container_id,
294            follow,
295            timestamps,
296            tail,
297            auto_pull,
298            auto_remove,
299            volumes,
300            user,
301            workdir,
302            detach,
303            driver,
304            force,
305        })
306    }
307
308    /// Apply global config defaults to this endpoint config.
309    /// Only sets values that are currently `None`.
310    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
311        if self.host.is_none() {
312            self.host = Some(global.docker_host.clone());
313        }
314    }
315
316    fn docker_socket_path(&self) -> Result<&str, CamelError> {
317        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
318            "npipe:////./pipe/docker_engine"
319        } else {
320            "unix:///var/run/docker.sock"
321        });
322
323        if host.starts_with("unix://") || host.starts_with("npipe://") {
324            return Ok(host);
325        }
326
327        if host.contains("://") {
328            return Err(CamelError::ProcessorError(format!(
329                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
330                host
331            )));
332        }
333
334        Ok(host)
335    }
336
337    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
338        let socket_path = self.docker_socket_path()?;
339        Docker::connect_with_socket(
340            socket_path,
341            DOCKER_CONNECT_TIMEOUT_SECS,
342            bollard::API_DEFAULT_VERSION,
343        )
344        .map_err(|e| {
345            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
346        })
347    }
348
349    /// Connects to the Docker daemon using the configured host.
350    ///
351    /// This method establishes a Unix socket connection to Docker and verifies
352    /// the connection by sending a ping request.
353    ///
354    /// # Errors
355    /// Returns an error if the connection fails or the ping request fails.
356    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
357        let docker = self.connect_docker_client()?;
358        docker
359            .ping()
360            .await
361            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
362        Ok(docker)
363    }
364
365    #[allow(clippy::type_complexity)]
366    fn parse_ports(
367        &self,
368    ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
369        let ports_str = match self.ports.as_ref() {
370            Some(s) => s,
371            None => return Ok((Vec::new(), HashMap::new())),
372        };
373
374        let mut exposed_ports: Vec<String> = Vec::new();
375        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
376
377        for mapping in ports_str.split(',') {
378            let mapping = mapping.trim();
379            if mapping.is_empty() {
380                continue;
381            }
382
383            let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
384                CamelError::ProcessorError(format!(
385                    "malformed port mapping '{}': expected hostPort:containerPort",
386                    mapping
387                ))
388            })?;
389
390            let (container_port, protocol) = if container_spec.contains('/') {
391                let parts: Vec<&str> = container_spec.split('/').collect();
392                (parts[0], parts[1])
393            } else {
394                (container_spec, "tcp")
395            };
396
397            let container_key = format!("{}/{}", container_port, protocol);
398
399            exposed_ports.push(container_key.clone());
400
401            port_bindings.insert(
402                container_key,
403                Some(vec![PortBinding {
404                    host_ip: None,
405                    host_port: Some(host_port.to_string()),
406                }]),
407            );
408        }
409
410        Ok((exposed_ports, port_bindings))
411    }
412
413    fn parse_env(&self) -> Option<Vec<String>> {
414        let env_str = self.env.as_ref()?;
415
416        let env_vars: Vec<String> = env_str
417            .split(',')
418            .map(|s| s.trim().to_string())
419            .filter(|s| !s.is_empty())
420            .collect();
421
422        if env_vars.is_empty() {
423            None
424        } else {
425            Some(env_vars)
426        }
427    }
428
429    #[cfg(test)]
430    #[allow(clippy::type_complexity)]
431    fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
432        self.volumes.as_deref().and_then(parse_volume_str)
433    }
434}
435
436/// Parses a volume specification string into bind mounts and anonymous volumes.
437///
438/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
439/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
440#[allow(clippy::type_complexity)]
441fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
442    let mut binds: Vec<String> = Vec::new();
443    let mut anonymous_volumes: Vec<String> = Vec::new();
444
445    for entry in volumes_str.split(',') {
446        let entry = entry.trim();
447        if entry.is_empty() {
448            continue;
449        }
450
451        let segments: Vec<&str> = entry.split(':').collect();
452
453        match segments.len() {
454            3 => {
455                let source = segments[0];
456                let target = segments[1];
457                let mode = segments[2];
458                if mode != "ro" && mode != "rw" {
459                    continue;
460                }
461                binds.push(format!("{}:{}:{}", source, target, mode));
462            }
463            2 => {
464                let a = segments[0];
465                let b = segments[1];
466                if b == "ro" || b == "rw" {
467                    anonymous_volumes.push(a.to_string());
468                } else {
469                    binds.push(format!("{}:{}", a, b));
470                }
471            }
472            1 => {
473                anonymous_volumes.push(segments[0].to_string());
474            }
475            _ => continue,
476        }
477    }
478
479    if binds.is_empty() && anonymous_volumes.is_empty() {
480        None
481    } else {
482        Some((binds, anonymous_volumes))
483    }
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487enum ProducerOperation {
488    List,
489    Run,
490    Start,
491    Stop,
492    Remove,
493    Exec,
494    NetworkCreate,
495    NetworkConnect,
496    NetworkDisconnect,
497    NetworkRemove,
498    NetworkList,
499}
500
501fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
502    match operation {
503        "list" => Ok(ProducerOperation::List),
504        "run" => Ok(ProducerOperation::Run),
505        "start" => Ok(ProducerOperation::Start),
506        "stop" => Ok(ProducerOperation::Stop),
507        "remove" => Ok(ProducerOperation::Remove),
508        "exec" => Ok(ProducerOperation::Exec),
509        "network-create" => Ok(ProducerOperation::NetworkCreate),
510        "network-connect" => Ok(ProducerOperation::NetworkConnect),
511        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
512        "network-remove" => Ok(ProducerOperation::NetworkRemove),
513        "network-list" => Ok(ProducerOperation::NetworkList),
514        _ => Err(CamelError::ProcessorError(format!(
515            "Unknown container operation: {}",
516            operation
517        ))),
518    }
519}
520
521fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
522    exchange
523        .input
524        .header(HEADER_CONTAINER_NAME)
525        .and_then(|v| v.as_str().map(|s| s.to_string()))
526        .or_else(|| config.name.clone())
527}
528
529async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
530    let images = docker
531        .list_images(None::<ListImagesOptions>)
532        .await
533        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
534
535    Ok(images.iter().any(|img| {
536        img.repo_tags
537            .iter()
538            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
539    }))
540}
541
542async fn pull_image_with_progress(
543    docker: &Docker,
544    image: &str,
545    timeout_secs: u64,
546) -> Result<(), CamelError> {
547    use futures::StreamExt;
548
549    tracing::info!("Pulling image: {}", image);
550
551    let mut stream = docker.create_image(
552        Some(CreateImageOptions {
553            from_image: Some(image.to_string()),
554            ..Default::default()
555        }),
556        None,
557        None,
558    );
559
560    let start = std::time::Instant::now();
561    let mut last_progress = std::time::Instant::now();
562
563    while let Some(item) = stream.next().await {
564        if start.elapsed().as_secs() > timeout_secs {
565            return Err(CamelError::ProcessorError(format!(
566                "Image pull timeout after {}s. Try manually: docker pull {}",
567                timeout_secs, image
568            )));
569        }
570
571        match item {
572            Ok(update) => {
573                // Log progress every 2 seconds
574                if last_progress.elapsed().as_secs() >= 2 {
575                    if let Some(status) = update.status {
576                        tracing::debug!("Pull progress: {}", status);
577                    }
578                    last_progress = std::time::Instant::now();
579                }
580            }
581            Err(e) => {
582                let err_str = e.to_string().to_lowercase();
583                if err_str.contains("unauthorized") || err_str.contains("401") {
584                    return Err(CamelError::ProcessorError(format!(
585                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
586                        image
587                    )));
588                }
589                if err_str.contains("not found") || err_str.contains("404") {
590                    return Err(CamelError::ProcessorError(format!(
591                        "Image '{}' not found in registry. Check the image name and tag",
592                        image
593                    )));
594                }
595                return Err(CamelError::ProcessorError(format!(
596                    "Failed to pull image '{}': {}",
597                    image, e
598                )));
599            }
600        }
601    }
602
603    tracing::info!("Successfully pulled image: {}", image);
604    Ok(())
605}
606
607async fn ensure_image_available(
608    docker: &Docker,
609    image: &str,
610    auto_pull: bool,
611    timeout_secs: u64,
612) -> Result<(), CamelError> {
613    if image_exists_locally(docker, image).await? {
614        tracing::debug!("Image '{}' already available locally", image);
615        return Ok(());
616    }
617
618    if !auto_pull {
619        return Err(CamelError::ProcessorError(format!(
620            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
621            image, image
622        )));
623    }
624
625    pull_image_with_progress(docker, image, timeout_secs).await
626}
627
628fn format_docker_event(event: &bollard::models::EventMessage) -> String {
629    let action = event.action.as_deref().unwrap_or("unknown");
630    let actor = event.actor.as_ref();
631
632    let container_name = actor
633        .and_then(|a| a.attributes.as_ref())
634        .and_then(|attrs| attrs.get("name"))
635        .map(|s| s.as_str())
636        .unwrap_or("unknown");
637
638    let image = actor
639        .and_then(|a| a.attributes.as_ref())
640        .and_then(|attrs| attrs.get("image"))
641        .map(|s| s.as_str())
642        .unwrap_or("");
643
644    let exit_code = actor
645        .and_then(|a| a.attributes.as_ref())
646        .and_then(|attrs| attrs.get("exitCode"))
647        .map(|s| s.as_str());
648
649    match action {
650        "create" => {
651            if image.is_empty() {
652                format!("[CREATE] Container {}", container_name)
653            } else {
654                format!("[CREATE] Container {} ({})", container_name, image)
655            }
656        }
657        "start" => format!("[START]  Container {}", container_name),
658        "die" => {
659            if let Some(code) = exit_code {
660                format!("[DIE]    Container {} (exit: {})", container_name, code)
661            } else {
662                format!("[DIE]    Container {}", container_name)
663            }
664        }
665        "destroy" => format!("[DESTROY] Container {}", container_name),
666        "stop" => format!("[STOP]   Container {}", container_name),
667        "pause" => format!("[PAUSE]  Container {}", container_name),
668        "unpause" => format!("[UNPAUSE] Container {}", container_name),
669        "restart" => format!("[RESTART] Container {}", container_name),
670        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
671    }
672}
673
674async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
675    create: CreateFn,
676    start: StartFn,
677    remove: RemoveFn,
678) -> Result<String, CamelError>
679where
680    CreateFn: FnOnce() -> CreateFut,
681    CreateFut: Future<Output = Result<String, CamelError>>,
682    StartFn: FnOnce(String) -> StartFut,
683    StartFut: Future<Output = Result<(), CamelError>>,
684    RemoveFn: FnOnce(String) -> RemoveFut,
685    RemoveFut: Future<Output = Result<(), CamelError>>,
686{
687    let container_id = create().await?;
688    if let Err(start_err) = start(container_id.clone()).await {
689        if let Err(remove_err) = remove(container_id.clone()).await {
690            return Err(CamelError::ProcessorError(format!(
691                "Failed to start container: {}. Cleanup failed: {}",
692                start_err, remove_err
693            )));
694        }
695        return Err(start_err);
696    }
697
698    Ok(container_id)
699}
700
701async fn handle_list(
702    docker: Docker,
703    _config: ContainerConfig,
704    exchange: &mut Exchange,
705) -> Result<(), CamelError> {
706    let containers = docker
707        .list_containers(None::<ListContainersOptions>)
708        .await
709        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
710
711    let json_value = serde_json::to_value(&containers).map_err(|e| {
712        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
713    })?;
714
715    exchange.input.body = Body::Json(json_value);
716    exchange.input.set_header(
717        HEADER_ACTION_RESULT,
718        serde_json::Value::String("success".to_string()),
719    );
720    Ok(())
721}
722
723async fn handle_run(
724    docker: Docker,
725    config: ContainerConfig,
726    exchange: &mut Exchange,
727) -> Result<(), CamelError> {
728    let image = exchange
729        .input
730        .header(HEADER_IMAGE)
731        .and_then(|v| v.as_str().map(|s| s.to_string()))
732        .or(config.image.clone())
733        .ok_or_else(|| {
734            CamelError::ProcessorError(
735                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
736            )
737        })?;
738
739    let image = if !image.contains(':') && !image.contains('@') {
740        format!("{}:latest", image)
741    } else {
742        image
743    };
744
745    let pull_timeout = 300;
746    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
747        .await
748        .map_err(|e| {
749            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
750        })?;
751
752    let container_name = resolve_container_name(exchange, &config);
753    let container_name_ref = container_name.as_deref().unwrap_or("");
754    let cmd_parts: Option<Vec<String>> = config
755        .cmd
756        .as_ref()
757        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
758    let auto_remove = config.auto_remove;
759    let (exposed_ports, port_bindings) = config.parse_ports()?;
760    let env_vars = config.parse_env();
761    let network_mode = config.network.clone();
762
763    let volumes_str = exchange
764        .input
765        .header(HEADER_VOLUMES)
766        .and_then(|v| v.as_str().map(|s| s.to_string()))
767        .or(config.volumes.clone());
768    let (binds, anon_volumes) = volumes_str
769        .as_deref()
770        .and_then(parse_volume_str)
771        .unwrap_or_default();
772
773    let docker_create = docker.clone();
774    let docker_start = docker.clone();
775    let docker_remove = docker.clone();
776
777    let container_id = run_container_with_cleanup(
778        move || async move {
779            let create_options = CreateContainerOptions {
780                name: Some(container_name_ref.to_string()),
781                ..Default::default()
782            };
783            let container_config = ContainerCreateBody {
784                image: Some(image.clone()),
785                cmd: cmd_parts,
786                env: env_vars,
787                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
788                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
789                host_config: Some(HostConfig {
790                    auto_remove: Some(auto_remove),
791                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
792                    network_mode,
793                    binds: if binds.is_empty() { None } else { Some(binds) },
794                    ..Default::default()
795                }),
796                ..Default::default()
797            };
798
799            let create_response = docker_create
800                .create_container(Some(create_options), container_config)
801                .await
802                .map_err(|e| {
803                    let err_str = e.to_string().to_lowercase();
804                    if err_str.contains("409") || err_str.contains("conflict") {
805                        CamelError::ProcessorError(format!(
806                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
807                            container_name_ref
808                        ))
809                    } else {
810                        CamelError::ProcessorError(format!(
811                            "Failed to create container: {}",
812                            e
813                        ))
814                    }
815                })?;
816
817            Ok(create_response.id)
818        },
819        move |container_id| async move {
820            docker_start
821                .start_container(&container_id, None::<StartContainerOptions>)
822                .await
823                .map_err(|e| {
824                    CamelError::ProcessorError(format!(
825                        "Failed to start container: {}",
826                        e
827                    ))
828                })
829        },
830        move |container_id| async move {
831            docker_remove
832                .remove_container(&container_id, None)
833                .await
834                .map_err(|e| {
835                    CamelError::ProcessorError(format!(
836                        "Failed to remove container after start failure: {}",
837                        e
838                    ))
839                })
840        },
841    )
842    .await?;
843
844    track_container(container_id.clone());
845
846    exchange
847        .input
848        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
849    exchange.input.set_header(
850        HEADER_ACTION_RESULT,
851        serde_json::Value::String("success".to_string()),
852    );
853    Ok(())
854}
855
856async fn handle_lifecycle(
857    docker: Docker,
858    _config: ContainerConfig,
859    exchange: &mut Exchange,
860    operation: ProducerOperation,
861    operation_name: &str,
862) -> Result<(), CamelError> {
863    let container_id = exchange
864        .input
865        .header(HEADER_CONTAINER_ID)
866        .and_then(|v| v.as_str().map(|s| s.to_string()))
867        .ok_or_else(|| {
868            CamelError::ProcessorError(format!(
869                "{} header is required for {} operation",
870                HEADER_CONTAINER_ID, operation_name
871            ))
872        })?;
873
874    match operation {
875        ProducerOperation::Start => {
876            docker
877                .start_container(&container_id, None::<StartContainerOptions>)
878                .await
879                .map_err(|e| {
880                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
881                })?;
882        }
883        ProducerOperation::Stop => {
884            docker
885                .stop_container(&container_id, None)
886                .await
887                .map_err(|e| {
888                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
889                })?;
890        }
891        ProducerOperation::Remove => {
892            docker
893                .remove_container(&container_id, None)
894                .await
895                .map_err(|e| {
896                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
897                })?;
898            untrack_container(&container_id);
899        }
900        _ => {}
901    }
902
903    exchange.input.set_header(
904        HEADER_ACTION_RESULT,
905        serde_json::Value::String("success".to_string()),
906    );
907    Ok(())
908}
909
910async fn handle_exec(
911    docker: Docker,
912    config: ContainerConfig,
913    exchange: &mut Exchange,
914) -> Result<(), CamelError> {
915    let container_id = exchange
916        .input
917        .header(HEADER_CONTAINER_ID)
918        .and_then(|v| v.as_str().map(|s| s.to_string()))
919        .or(config.container_id.clone())
920        .ok_or_else(|| {
921            CamelError::ProcessorError(format!(
922                "{} header or containerId param is required for exec operation",
923                HEADER_CONTAINER_ID
924            ))
925        })?;
926
927    let cmd = exchange
928        .input
929        .header(HEADER_CMD)
930        .and_then(|v| v.as_str().map(|s| s.to_string()))
931        .or(config.cmd.clone())
932        .ok_or_else(|| {
933            CamelError::ProcessorError(
934                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
935            )
936        })?;
937
938    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
939    let env_vars = config.parse_env();
940
941    let exec_config = bollard::exec::CreateExecOptions {
942        cmd: Some(cmd_parts),
943        env: env_vars,
944        user: config.user.clone(),
945        working_dir: config.workdir.clone(),
946        attach_stdout: Some(true),
947        attach_stderr: Some(true),
948        ..Default::default()
949    };
950
951    let create_result = docker
952        .create_exec(&container_id, exec_config)
953        .await
954        .map_err(|e| {
955            let err_str = e.to_string().to_lowercase();
956            if err_str.contains("404") || err_str.contains("no such") {
957                CamelError::ProcessorError(format!(
958                    "Container '{}' not found for exec",
959                    container_id
960                ))
961            } else {
962                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
963            }
964        })?;
965
966    let exec_id = create_result.id;
967
968    if config.detach {
969        docker
970            .start_exec(
971                &exec_id,
972                Some(bollard::exec::StartExecOptions {
973                    detach: true,
974                    ..Default::default()
975                }),
976            )
977            .await
978            .map_err(|e| {
979                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
980            })?;
981
982        exchange
983            .input
984            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
985        exchange
986            .input
987            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
988    } else {
989        let start_result = docker
990            .start_exec(&exec_id, None)
991            .await
992            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
993
994        let mut output = String::new();
995
996        match start_result {
997            bollard::exec::StartExecResults::Attached {
998                output: mut stream, ..
999            } => {
1000                use futures::StreamExt;
1001                while let Some(msg) = stream.next().await {
1002                    match msg {
1003                        Ok(bollard::container::LogOutput::StdOut { message }) => {
1004                            output.push_str(&String::from_utf8_lossy(&message));
1005                        }
1006                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1007                            output.push_str(&String::from_utf8_lossy(&message));
1008                        }
1009                        Ok(_) => {}
1010                        Err(e) => {
1011                            output.push_str(&format!("[error reading stream: {}]", e));
1012                        }
1013                    }
1014                }
1015            }
1016            bollard::exec::StartExecResults::Detached => {}
1017        }
1018
1019        let inspect = docker
1020            .inspect_exec(&exec_id)
1021            .await
1022            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1023
1024        let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1025            CamelError::ProcessorError("container exec returned no exit code".into())
1026        })?;
1027
1028        let output = output.trim_end().to_string();
1029        exchange.input.body = Body::Text(output);
1030        exchange.input.set_header(
1031            HEADER_EXIT_CODE,
1032            serde_json::Value::Number(exit_code.into()),
1033        );
1034        exchange
1035            .input
1036            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1037    }
1038
1039    exchange.input.set_header(
1040        HEADER_ACTION_RESULT,
1041        serde_json::Value::String("success".to_string()),
1042    );
1043    Ok(())
1044}
1045
1046async fn handle_network_create(
1047    docker: Docker,
1048    config: ContainerConfig,
1049    exchange: &mut Exchange,
1050) -> Result<(), CamelError> {
1051    let network_name = exchange
1052        .input
1053        .header(HEADER_CONTAINER_NAME)
1054        .and_then(|v| v.as_str().map(|s| s.to_string()))
1055        .or(config.name.clone())
1056        .ok_or_else(|| {
1057            CamelError::ProcessorError(
1058                "CamelContainerName header or name param is required for network-create"
1059                    .to_string(),
1060            )
1061        })?;
1062
1063    let driver = config.driver.as_deref().unwrap_or("bridge");
1064
1065    let options = NetworkCreateRequest {
1066        name: network_name.clone(),
1067        driver: Some(driver.to_string()),
1068        ..Default::default()
1069    };
1070
1071    let result = docker.create_network(options).await.map_err(|e| {
1072        let err_str = e.to_string().to_lowercase();
1073        if err_str.contains("409") || err_str.contains("already exists") {
1074            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1075        } else {
1076            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1077        }
1078    })?;
1079
1080    let network_id = result.id.clone();
1081    let json_value = serde_json::to_value(&result).map_err(|e| {
1082        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1083    })?;
1084
1085    exchange.input.body = Body::Json(json_value);
1086    exchange
1087        .input
1088        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1089    exchange.input.set_header(
1090        HEADER_ACTION_RESULT,
1091        serde_json::Value::String("success".to_string()),
1092    );
1093    Ok(())
1094}
1095
1096async fn handle_network_connect(
1097    docker: Docker,
1098    config: ContainerConfig,
1099    exchange: &mut Exchange,
1100) -> Result<(), CamelError> {
1101    let network = exchange
1102        .input
1103        .header(HEADER_NETWORK)
1104        .and_then(|v| v.as_str().map(|s| s.to_string()))
1105        .or(config.network.clone())
1106        .ok_or_else(|| {
1107            CamelError::ProcessorError(
1108                "CamelContainerNetwork header or network param is required for network-connect"
1109                    .to_string(),
1110            )
1111        })?;
1112
1113    let container = exchange
1114        .input
1115        .header(HEADER_CONTAINER_ID)
1116        .and_then(|v| v.as_str().map(|s| s.to_string()))
1117        .or(config.container_id.clone())
1118        .ok_or_else(|| {
1119            CamelError::ProcessorError(
1120                "CamelContainerId header or container param is required for network-connect"
1121                    .to_string(),
1122            )
1123        })?;
1124
1125    docker
1126        .connect_network(
1127            &network,
1128            NetworkConnectRequest {
1129                container,
1130                ..Default::default()
1131            },
1132        )
1133        .await
1134        .map_err(|e| {
1135            let err_str = e.to_string().to_lowercase();
1136            if err_str.contains("404") || err_str.contains("not found") {
1137                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1138            } else {
1139                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1140            }
1141        })?;
1142
1143    exchange.input.set_header(
1144        HEADER_ACTION_RESULT,
1145        serde_json::Value::String("success".to_string()),
1146    );
1147    Ok(())
1148}
1149
1150async fn handle_network_disconnect(
1151    docker: Docker,
1152    config: ContainerConfig,
1153    exchange: &mut Exchange,
1154) -> Result<(), CamelError> {
1155    let network = exchange
1156        .input
1157        .header(HEADER_NETWORK)
1158        .and_then(|v| v.as_str().map(|s| s.to_string()))
1159        .or(config.network.clone())
1160        .ok_or_else(|| {
1161            CamelError::ProcessorError(
1162                "CamelContainerNetwork header or network param is required for network-disconnect"
1163                    .to_string(),
1164            )
1165        })?;
1166
1167    let container = exchange
1168        .input
1169        .header(HEADER_CONTAINER_ID)
1170        .and_then(|v| v.as_str().map(|s| s.to_string()))
1171        .or(config.container_id.clone())
1172        .ok_or_else(|| {
1173            CamelError::ProcessorError(
1174                "CamelContainerId header or container param is required for network-disconnect"
1175                    .to_string(),
1176            )
1177        })?;
1178
1179    docker
1180        .disconnect_network(
1181            &network,
1182            NetworkDisconnectRequest {
1183                container,
1184                force: Some(config.force),
1185            },
1186        )
1187        .await
1188        .map_err(|e| {
1189            let err_str = e.to_string().to_lowercase();
1190            if err_str.contains("404") || err_str.contains("not found") {
1191                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1192            } else {
1193                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1194            }
1195        })?;
1196
1197    exchange.input.set_header(
1198        HEADER_ACTION_RESULT,
1199        serde_json::Value::String("success".to_string()),
1200    );
1201    Ok(())
1202}
1203
1204async fn handle_network_remove(
1205    docker: Docker,
1206    config: ContainerConfig,
1207    exchange: &mut Exchange,
1208) -> Result<(), CamelError> {
1209    let network = exchange
1210        .input
1211        .header(HEADER_NETWORK)
1212        .and_then(|v| v.as_str().map(|s| s.to_string()))
1213        .or(config.network.clone())
1214        .ok_or_else(|| {
1215            CamelError::ProcessorError(
1216                "CamelContainerNetwork header or network param is required for network-remove"
1217                    .to_string(),
1218            )
1219        })?;
1220
1221    docker.remove_network(&network).await.map_err(|e| {
1222        let err_str = e.to_string().to_lowercase();
1223        if err_str.contains("404") || err_str.contains("not found") {
1224            CamelError::ProcessorError(format!("Network '{}' not found", network))
1225        } else if err_str.contains("409") || err_str.contains("in use") {
1226            CamelError::ProcessorError(format!(
1227                "Network '{}' is in use and cannot be removed",
1228                network
1229            ))
1230        } else {
1231            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1232        }
1233    })?;
1234
1235    exchange.input.set_header(
1236        HEADER_ACTION_RESULT,
1237        serde_json::Value::String("success".to_string()),
1238    );
1239    Ok(())
1240}
1241
1242async fn handle_network_list(
1243    docker: Docker,
1244    _config: ContainerConfig,
1245    exchange: &mut Exchange,
1246) -> Result<(), CamelError> {
1247    let networks = docker
1248        .list_networks(None::<ListNetworksOptions>)
1249        .await
1250        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1251
1252    let json_value = serde_json::to_value(&networks)
1253        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1254
1255    exchange.input.body = Body::Json(json_value);
1256    exchange.input.set_header(
1257        HEADER_ACTION_RESULT,
1258        serde_json::Value::String("success".to_string()),
1259    );
1260    Ok(())
1261}
1262
1263/// Producer for executing container operations.
1264///
1265/// This producer handles synchronous container operations like listing,
1266/// creating, starting, stopping, and removing containers.
1267#[derive(Clone)]
1268pub struct ContainerProducer {
1269    config: ContainerConfig,
1270    docker: Docker,
1271}
1272
1273impl Service<Exchange> for ContainerProducer {
1274    type Response = Exchange;
1275    type Error = CamelError;
1276    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1277
1278    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1279        Poll::Ready(Ok(()))
1280    }
1281
1282    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1283        let config = self.config.clone();
1284        let docker = self.docker.clone();
1285        Box::pin(async move {
1286            let operation_name = exchange
1287                .input
1288                .header(HEADER_ACTION)
1289                .and_then(|v| v.as_str().map(|s| s.to_string()))
1290                .unwrap_or_else(|| config.operation.clone());
1291
1292            let operation = parse_producer_operation(&operation_name)?;
1293
1294            match operation {
1295                ProducerOperation::List => {
1296                    handle_list(docker, config, &mut exchange).await?;
1297                }
1298                ProducerOperation::Run => {
1299                    handle_run(docker, config, &mut exchange).await?;
1300                }
1301                ProducerOperation::Start => {
1302                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1303                        .await?;
1304                }
1305                ProducerOperation::Stop => {
1306                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1307                        .await?;
1308                }
1309                ProducerOperation::Remove => {
1310                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1311                        .await?;
1312                }
1313                ProducerOperation::Exec => {
1314                    handle_exec(docker, config, &mut exchange).await?;
1315                }
1316                ProducerOperation::NetworkCreate => {
1317                    handle_network_create(docker, config, &mut exchange).await?;
1318                }
1319                ProducerOperation::NetworkConnect => {
1320                    handle_network_connect(docker, config, &mut exchange).await?;
1321                }
1322                ProducerOperation::NetworkDisconnect => {
1323                    handle_network_disconnect(docker, config, &mut exchange).await?;
1324                }
1325                ProducerOperation::NetworkRemove => {
1326                    handle_network_remove(docker, config, &mut exchange).await?;
1327                }
1328                ProducerOperation::NetworkList => {
1329                    handle_network_list(docker, config, &mut exchange).await?;
1330                }
1331            }
1332
1333            Ok(exchange)
1334        })
1335    }
1336}
1337
1338/// Consumer for receiving Docker container events or logs.
1339///
1340/// This consumer subscribes to Docker events or container logs and forwards them
1341/// to the route as exchanges. It implements automatic reconnection on connection failures.
1342pub struct ContainerConsumer {
1343    config: ContainerConfig,
1344}
1345
1346#[async_trait]
1347impl Consumer for ContainerConsumer {
1348    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1349        match self.config.operation.as_str() {
1350            "events" => self.start_events_consumer(context).await,
1351            "logs" => self.start_logs_consumer(context).await,
1352            _ => Err(CamelError::EndpointCreationFailed(format!(
1353                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1354                self.config.operation
1355            ))),
1356        }
1357    }
1358
1359    async fn stop(&mut self) -> Result<(), CamelError> {
1360        Ok(())
1361    }
1362
1363    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1364        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1365    }
1366}
1367
1368impl ContainerConsumer {
1369    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1370        use futures::StreamExt;
1371
1372        loop {
1373            if context.is_cancelled() {
1374                tracing::info!("Container events consumer shutting down");
1375                return Ok(());
1376            }
1377
1378            let docker = match self.config.connect_docker().await {
1379                Ok(d) => d,
1380                Err(e) => {
1381                    tracing::error!(
1382                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
1383                        e
1384                    );
1385                    tokio::select! {
1386                        _ = context.cancelled() => {
1387                            tracing::info!("Container events consumer shutting down");
1388                            return Ok(());
1389                        }
1390                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1391                    }
1392                    continue;
1393                }
1394            };
1395
1396            let mut event_stream = docker.events(None::<EventsOptions>);
1397
1398            loop {
1399                tokio::select! {
1400                    _ = context.cancelled() => {
1401                        tracing::info!("Container events consumer shutting down");
1402                        return Ok(());
1403                    }
1404
1405                    msg = event_stream.next() => {
1406                        match msg {
1407                            Some(Ok(event)) => {
1408                                let formatted = format_docker_event(&event);
1409                                let message = Message::new(Body::Text(formatted));
1410                                let exchange = Exchange::new(message);
1411
1412                                if let Err(e) = context.send(exchange).await {
1413                                    tracing::error!("Failed to send exchange: {:?}", e);
1414                                    break;
1415                                }
1416                            }
1417                            Some(Err(e)) => {
1418                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1419                                break;
1420                            }
1421                            None => {
1422                                tracing::info!("Docker event stream ended. Reconnecting...");
1423                                break;
1424                            }
1425                        }
1426                    }
1427                }
1428            }
1429
1430            tokio::select! {
1431                _ = context.cancelled() => {
1432                    tracing::info!("Container events consumer shutting down");
1433                    return Ok(());
1434                }
1435                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1436            }
1437        }
1438    }
1439
1440    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1441        use futures::StreamExt;
1442
1443        let container_id = self.config.container_id.clone().ok_or_else(|| {
1444            CamelError::EndpointCreationFailed(
1445                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1446                    .to_string(),
1447            )
1448        })?;
1449
1450        loop {
1451            if context.is_cancelled() {
1452                tracing::info!("Container logs consumer shutting down");
1453                return Ok(());
1454            }
1455
1456            let docker = match self.config.connect_docker().await {
1457                Ok(d) => d,
1458                Err(e) => {
1459                    tracing::error!(
1460                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1461                        e
1462                    );
1463                    tokio::select! {
1464                        _ = context.cancelled() => {
1465                            tracing::info!("Container logs consumer shutting down");
1466                            return Ok(());
1467                        }
1468                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1469                    }
1470                    continue;
1471                }
1472            };
1473
1474            let tail = self
1475                .config
1476                .tail
1477                .clone()
1478                .unwrap_or_else(|| "all".to_string());
1479
1480            let options = LogsOptions {
1481                follow: self.config.follow,
1482                stdout: true,
1483                stderr: true,
1484                timestamps: self.config.timestamps,
1485                tail,
1486                ..Default::default()
1487            };
1488
1489            let mut log_stream = docker.logs(&container_id, Some(options));
1490            let container_id_header = container_id.clone();
1491
1492            loop {
1493                tokio::select! {
1494                    _ = context.cancelled() => {
1495                        tracing::info!("Container logs consumer shutting down");
1496                        return Ok(());
1497                    }
1498
1499                    msg = log_stream.next() => {
1500                        match msg {
1501                            Some(Ok(log_output)) => {
1502                                let (stream_type, content) = match log_output {
1503                                    bollard::container::LogOutput::StdOut { message } => {
1504                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1505                                    }
1506                                    bollard::container::LogOutput::StdErr { message } => {
1507                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1508                                    }
1509                                    bollard::container::LogOutput::Console { message } => {
1510                                        ("console", String::from_utf8_lossy(&message).into_owned())
1511                                    }
1512                                    bollard::container::LogOutput::StdIn { message } => {
1513                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1514                                    }
1515                                };
1516
1517                                let content = content.trim_end();
1518                                if content.is_empty() {
1519                                    continue;
1520                                }
1521
1522                                let mut message = Message::new(Body::Text(content.to_string()));
1523                                message.set_header(
1524                                    HEADER_CONTAINER_ID,
1525                                    serde_json::Value::String(container_id_header.clone()),
1526                                );
1527                                message.set_header(
1528                                    HEADER_LOG_STREAM,
1529                                    serde_json::Value::String(stream_type.to_string()),
1530                                );
1531
1532                                if self.config.timestamps
1533                                    && let Some(ts) = extract_timestamp(content) {
1534                                        message.set_header(
1535                                            HEADER_LOG_TIMESTAMP,
1536                                            serde_json::Value::String(ts),
1537                                        );
1538                                    }
1539
1540                                let exchange = Exchange::new(message);
1541
1542                                if let Err(e) = context.send(exchange).await {
1543                                    tracing::error!("Failed to send log exchange: {:?}", e);
1544                                    break;
1545                                }
1546                            }
1547                            Some(Err(e)) => {
1548                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1549                                break;
1550                            }
1551                            None => {
1552                                if self.config.follow {
1553                                    tracing::info!("Docker log stream ended. Reconnecting...");
1554                                    break;
1555                                } else {
1556                                    tracing::info!("Container logs consumer finished (follow=false)");
1557                                    return Ok(());
1558                                }
1559                            }
1560                        }
1561                    }
1562                }
1563            }
1564
1565            tokio::select! {
1566                _ = context.cancelled() => {
1567                    tracing::info!("Container logs consumer shutting down");
1568                    return Ok(());
1569                }
1570                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1571            }
1572        }
1573    }
1574}
1575
1576fn extract_timestamp(log_line: &str) -> Option<String> {
1577    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1578    if parts.len() > 1 && parts[0].contains('T') {
1579        Some(parts[0].to_string())
1580    } else {
1581        None
1582    }
1583}
1584
1585/// Component for creating container endpoints.
1586///
1587/// This component handles URIs with the "container" scheme and creates
1588/// appropriate producer and consumer endpoints for Docker operations.
1589///
1590/// Containers created via `run` operation are tracked globally and can be
1591/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1592pub struct ContainerComponent {
1593    config: Option<ContainerGlobalConfig>,
1594}
1595
1596impl ContainerComponent {
1597    /// Creates a new container component instance without global config.
1598    pub fn new() -> Self {
1599        Self { config: None }
1600    }
1601
1602    /// Creates a container component with the given global config.
1603    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1604        Self {
1605            config: Some(config),
1606        }
1607    }
1608
1609    /// Creates a container component with optional global config.
1610    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1611        Self { config }
1612    }
1613}
1614
1615impl Default for ContainerComponent {
1616    fn default() -> Self {
1617        Self::new()
1618    }
1619}
1620
1621impl Component for ContainerComponent {
1622    fn scheme(&self) -> &str {
1623        "container"
1624    }
1625
1626    fn create_endpoint(
1627        &self,
1628        uri: &str,
1629        _ctx: &dyn camel_component_api::ComponentContext,
1630    ) -> Result<Box<dyn Endpoint>, CamelError> {
1631        let mut config = ContainerConfig::from_uri(uri)?;
1632        // Apply global defaults if present and URI didn't set them
1633        if let Some(ref global) = self.config {
1634            config.apply_global_defaults(global);
1635        }
1636        Ok(Box::new(ContainerEndpoint {
1637            uri: uri.to_string(),
1638            config,
1639        }))
1640    }
1641}
1642
1643/// Endpoint for container operations.
1644///
1645/// This endpoint creates producers for executing container operations
1646/// and consumers for receiving container events.
1647// TODO(CON-003): Forward container health status (inspect healthcheck / Health field) to
1648// Camel's health subsystem so the route can react to unhealthy containers.
1649pub struct ContainerEndpoint {
1650    uri: String,
1651    config: ContainerConfig,
1652}
1653
1654impl ContainerEndpoint {
1655    /// Returns the Docker host configured for this endpoint.
1656    /// Returns `None` if not set (for testing purposes).
1657    pub fn docker_host(&self) -> Option<&str> {
1658        self.config.host.as_deref()
1659    }
1660}
1661
1662impl Endpoint for ContainerEndpoint {
1663    fn uri(&self) -> &str {
1664        &self.uri
1665    }
1666
1667    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1668        Ok(Box::new(ContainerConsumer {
1669            config: self.config.clone(),
1670        }))
1671    }
1672
1673    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1674        let docker = self.config.connect_docker_client()?;
1675        Ok(BoxProcessor::new(ContainerProducer {
1676            config: self.config.clone(),
1677            docker,
1678        }))
1679    }
1680}
1681
1682#[cfg(test)]
1683mod tests {
1684    use super::*;
1685    use camel_component_api::NoOpComponentContext;
1686
1687    #[test]
1688    fn test_container_config() {
1689        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1690        assert_eq!(config.operation, "run");
1691        assert_eq!(config.image.as_deref(), Some("alpine"));
1692        // host is None by default; global config applies it later
1693        assert!(config.host.is_none());
1694    }
1695
1696    #[test]
1697    fn test_global_config_applied_to_endpoint() {
1698        // When global config is set and URI doesn't specify host,
1699        // apply_global_defaults should set host from global config.
1700        let global =
1701            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1702        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1703        assert!(
1704            config.host.is_none(),
1705            "URI without ?host= should leave host as None"
1706        );
1707        config.apply_global_defaults(&global);
1708        assert_eq!(
1709            config.host.as_deref(),
1710            Some("unix:///custom/docker.sock"),
1711            "global docker_host must be applied when URI did not set host"
1712        );
1713    }
1714
1715    #[test]
1716    fn test_uri_param_wins_over_global_config() {
1717        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1718        let global =
1719            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1720        let mut config =
1721            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1722                .unwrap();
1723        assert_eq!(
1724            config.host.as_deref(),
1725            Some("unix:///override.sock"),
1726            "URI-set host should be parsed correctly"
1727        );
1728        config.apply_global_defaults(&global);
1729        assert_eq!(
1730            config.host.as_deref(),
1731            Some("unix:///override.sock"),
1732            "global config must NOT override a host already set by URI"
1733        );
1734    }
1735
1736    #[test]
1737    fn test_container_config_parses_name() {
1738        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1739        assert_eq!(config.name.as_deref(), Some("my-container"));
1740    }
1741
1742    #[test]
1743    fn test_parse_producer_operation_known() {
1744        assert_eq!(
1745            parse_producer_operation("list").unwrap(),
1746            ProducerOperation::List
1747        );
1748        assert_eq!(
1749            parse_producer_operation("run").unwrap(),
1750            ProducerOperation::Run
1751        );
1752        assert_eq!(
1753            parse_producer_operation("start").unwrap(),
1754            ProducerOperation::Start
1755        );
1756        assert_eq!(
1757            parse_producer_operation("stop").unwrap(),
1758            ProducerOperation::Stop
1759        );
1760        assert_eq!(
1761            parse_producer_operation("remove").unwrap(),
1762            ProducerOperation::Remove
1763        );
1764    }
1765
1766    #[test]
1767    fn test_parse_producer_operation_unknown() {
1768        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1769        match err {
1770            CamelError::ProcessorError(msg) => {
1771                assert!(
1772                    msg.contains("Unknown container operation"),
1773                    "Unexpected error message: {}",
1774                    msg
1775                );
1776            }
1777            _ => panic!("Expected ProcessorError for unknown operation"),
1778        }
1779    }
1780
1781    #[test]
1782    fn test_parse_producer_operation_new_variants() {
1783        assert_eq!(
1784            parse_producer_operation("exec").unwrap(),
1785            ProducerOperation::Exec
1786        );
1787        assert_eq!(
1788            parse_producer_operation("network-create").unwrap(),
1789            ProducerOperation::NetworkCreate
1790        );
1791        assert_eq!(
1792            parse_producer_operation("network-connect").unwrap(),
1793            ProducerOperation::NetworkConnect
1794        );
1795        assert_eq!(
1796            parse_producer_operation("network-disconnect").unwrap(),
1797            ProducerOperation::NetworkDisconnect
1798        );
1799        assert_eq!(
1800            parse_producer_operation("network-remove").unwrap(),
1801            ProducerOperation::NetworkRemove
1802        );
1803        assert_eq!(
1804            parse_producer_operation("network-list").unwrap(),
1805            ProducerOperation::NetworkList
1806        );
1807    }
1808
1809    #[test]
1810    fn test_resolve_container_name_header_overrides_config() {
1811        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1812        let mut exchange = Exchange::new(Message::new(""));
1813        exchange.input.set_header(
1814            HEADER_CONTAINER_NAME,
1815            serde_json::Value::String("header-name".to_string()),
1816        );
1817
1818        let resolved = resolve_container_name(&exchange, &config);
1819        assert_eq!(resolved.as_deref(), Some("header-name"));
1820    }
1821
1822    #[test]
1823    fn test_container_config_rejects_tcp_host() {
1824        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1825        let err = config.connect_docker_client().unwrap_err();
1826        match err {
1827            CamelError::ProcessorError(msg) => {
1828                assert!(
1829                    msg.to_lowercase().contains("tcp"),
1830                    "Expected TCP scheme error, got: {}",
1831                    msg
1832                );
1833            }
1834            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1835        }
1836    }
1837
1838    #[tokio::test]
1839    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1840        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1841        let remove_called_clone = remove_called.clone();
1842
1843        let result = run_container_with_cleanup(
1844            || async { Ok("container-123".to_string()) },
1845            |_id| async move {
1846                Err(CamelError::ProcessorError(
1847                    "Failed to start container".to_string(),
1848                ))
1849            },
1850            move |_id| {
1851                let remove_called_inner = remove_called_clone.clone();
1852                async move {
1853                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1854                    Ok(())
1855                }
1856            },
1857        )
1858        .await;
1859
1860        assert!(result.is_err(), "Expected start failure to bubble up");
1861        assert!(
1862            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1863            "Expected cleanup to remove container"
1864        );
1865    }
1866
1867    #[test]
1868    fn test_container_component_creates_endpoint() {
1869        let component = ContainerComponent::new();
1870        assert_eq!(component.scheme(), "container");
1871        let ctx = NoOpComponentContext;
1872        let endpoint = component
1873            .create_endpoint("container:run?image=alpine", &ctx)
1874            .unwrap();
1875        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1876    }
1877
1878    #[test]
1879    fn test_container_config_parses_ports() {
1880        let config =
1881            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1882        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1883    }
1884
1885    #[test]
1886    fn test_container_config_parses_env() {
1887        let config =
1888            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1889        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1890    }
1891
1892    #[test]
1893    fn test_container_config_parses_logs_options() {
1894        let config = ContainerConfig::from_uri(
1895            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1896        )
1897        .unwrap();
1898        assert_eq!(config.operation, "logs");
1899        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1900        assert!(config.follow);
1901        assert!(config.timestamps);
1902        assert_eq!(config.tail.as_deref(), Some("100"));
1903    }
1904
1905    #[test]
1906    fn test_container_config_logs_defaults() {
1907        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1908        assert!(config.follow); // default: true
1909        assert!(!config.timestamps); // default: false
1910        assert!(config.tail.is_none()); // default: None (all)
1911    }
1912
1913    #[test]
1914    fn test_parse_ports_single() {
1915        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1916        let (exposed, bindings) = config.parse_ports().unwrap();
1917
1918        assert!(exposed.contains(&"80/tcp".to_string()));
1919        assert!(bindings.contains_key("80/tcp"));
1920
1921        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1922        assert_eq!(binding.len(), 1);
1923        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1924    }
1925
1926    #[test]
1927    fn test_parse_ports_multiple() {
1928        let config =
1929            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1930        let (exposed, bindings) = config.parse_ports().unwrap();
1931
1932        assert!(exposed.contains(&"80/tcp".to_string()));
1933        assert!(exposed.contains(&"443/tcp".to_string()));
1934        assert_eq!(bindings.len(), 2);
1935    }
1936
1937    #[test]
1938    fn test_parse_ports_with_protocol() {
1939        let config =
1940            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1941                .unwrap();
1942        let (exposed, _bindings) = config.parse_ports().unwrap();
1943
1944        assert!(exposed.contains(&"80/tcp".to_string()));
1945        assert!(exposed.contains(&"53/udp".to_string()));
1946    }
1947
1948    #[test]
1949    fn test_parse_ports_none() {
1950        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1951        let (exposed, bindings) = config.parse_ports().unwrap();
1952        assert!(exposed.is_empty());
1953        assert!(bindings.is_empty());
1954    }
1955
1956    #[test]
1957    fn test_parse_env_single() {
1958        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1959        let env = config.parse_env().unwrap();
1960
1961        assert_eq!(env.len(), 1);
1962        assert_eq!(env[0], "FOO=bar");
1963    }
1964
1965    #[test]
1966    fn test_parse_env_multiple() {
1967        let config =
1968            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1969                .unwrap();
1970        let env = config.parse_env().unwrap();
1971
1972        assert_eq!(env.len(), 3);
1973        assert!(env.contains(&"FOO=bar".to_string()));
1974        assert!(env.contains(&"BAZ=qux".to_string()));
1975        assert!(env.contains(&"NUM=123".to_string()));
1976    }
1977
1978    #[test]
1979    fn test_parse_env_none() {
1980        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1981        assert!(config.parse_env().is_none());
1982    }
1983
1984    use camel_component_api::Message;
1985    use std::sync::Arc;
1986
1987    #[tokio::test]
1988    async fn test_container_producer_resolves_operation_from_header() {
1989        // Try to connect to Docker - if fails, return early
1990        let docker = match Docker::connect_with_local_defaults() {
1991            Ok(d) => d,
1992            Err(_) => {
1993                eprintln!("Skipping test: Could not connect to Docker daemon");
1994                return;
1995            }
1996        };
1997
1998        if docker.ping().await.is_err() {
1999            eprintln!("Skipping test: Docker daemon not responding to ping");
2000            return;
2001        }
2002
2003        let component = ContainerComponent::new();
2004        let ctx = NoOpComponentContext;
2005        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2006
2007        let ctx = ProducerContext::new();
2008        let mut producer = endpoint.create_producer(&ctx).unwrap();
2009
2010        let mut exchange = Exchange::new(Message::new(""));
2011        exchange
2012            .input
2013            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2014
2015        use tower::ServiceExt;
2016        let result = producer
2017            .ready()
2018            .await
2019            .unwrap()
2020            .call(exchange)
2021            .await
2022            .unwrap();
2023
2024        // For list operation, the result header should be on the input message
2025        assert_eq!(
2026            result
2027                .input
2028                .header(HEADER_ACTION_RESULT)
2029                .map(|v| v.as_str().unwrap()),
2030            Some("success")
2031        );
2032    }
2033
2034    #[tokio::test]
2035    async fn test_container_producer_connection_error_on_invalid_host() {
2036        // Test that an invalid host (nonexistent socket) results in a connection error
2037        let component = ContainerComponent::new();
2038        let ctx = NoOpComponentContext;
2039        let endpoint = component
2040            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2041            .unwrap();
2042
2043        let ctx = ProducerContext::new();
2044        let result = endpoint.create_producer(&ctx);
2045
2046        // The producer should return an error because it cannot connect to the invalid socket
2047        assert!(
2048            result.is_err(),
2049            "Expected error when connecting to invalid host"
2050        );
2051        let err = result.unwrap_err();
2052        match &err {
2053            CamelError::ProcessorError(msg) => {
2054                assert!(
2055                    msg.to_lowercase().contains("connection")
2056                        || msg.to_lowercase().contains("connect")
2057                        || msg.to_lowercase().contains("socket")
2058                        || msg.contains("docker"),
2059                    "Error message should indicate connection failure, got: {}",
2060                    msg
2061                );
2062            }
2063            _ => panic!("Expected ProcessorError, got: {:?}", err),
2064        }
2065    }
2066
2067    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2068    #[tokio::test]
2069    async fn test_container_producer_lifecycle_operations_missing_id() {
2070        // Try to connect to Docker - if fails, return early
2071        let docker = match Docker::connect_with_local_defaults() {
2072            Ok(d) => d,
2073            Err(_) => {
2074                eprintln!("Skipping test: Could not connect to Docker daemon");
2075                return;
2076            }
2077        };
2078
2079        if docker.ping().await.is_err() {
2080            eprintln!("Skipping test: Docker daemon not responding to ping");
2081            return;
2082        }
2083
2084        let component = ContainerComponent::new();
2085        let ctx = NoOpComponentContext;
2086        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2087        let ctx = ProducerContext::new();
2088        let mut producer = endpoint.create_producer(&ctx).unwrap();
2089
2090        // Test each lifecycle operation without CamelContainerId header
2091        for operation in ["start", "stop", "remove"] {
2092            let mut exchange = Exchange::new(Message::new(""));
2093            exchange.input.set_header(
2094                HEADER_ACTION,
2095                serde_json::Value::String(operation.to_string()),
2096            );
2097            // Deliberately NOT setting CamelContainerId header
2098
2099            use tower::ServiceExt;
2100            let result = producer.ready().await.unwrap().call(exchange).await;
2101
2102            assert!(
2103                result.is_err(),
2104                "Expected error for {} operation without CamelContainerId",
2105                operation
2106            );
2107            let err = result.unwrap_err();
2108            match &err {
2109                CamelError::ProcessorError(msg) => {
2110                    assert!(
2111                        msg.contains(HEADER_CONTAINER_ID),
2112                        "Error message should mention {}, got: {}",
2113                        HEADER_CONTAINER_ID,
2114                        msg
2115                    );
2116                }
2117                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2118            }
2119        }
2120    }
2121
2122    /// Test that stop operation returns an error for a nonexistent container.
2123    #[tokio::test]
2124    async fn test_container_producer_stop_nonexistent() {
2125        // Try to connect to Docker - if fails, return early
2126        let docker = match Docker::connect_with_local_defaults() {
2127            Ok(d) => d,
2128            Err(_) => {
2129                eprintln!("Skipping test: Could not connect to Docker daemon");
2130                return;
2131            }
2132        };
2133
2134        if docker.ping().await.is_err() {
2135            eprintln!("Skipping test: Docker daemon not responding to ping");
2136            return;
2137        }
2138
2139        let component = ContainerComponent::new();
2140        let ctx = NoOpComponentContext;
2141        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2142        let ctx = ProducerContext::new();
2143        let mut producer = endpoint.create_producer(&ctx).unwrap();
2144
2145        let mut exchange = Exchange::new(Message::new(""));
2146        exchange
2147            .input
2148            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2149        exchange.input.set_header(
2150            HEADER_CONTAINER_ID,
2151            serde_json::Value::String("nonexistent-container-123".into()),
2152        );
2153
2154        use tower::ServiceExt;
2155        let result = producer.ready().await.unwrap().call(exchange).await;
2156
2157        assert!(
2158            result.is_err(),
2159            "Expected error when stopping nonexistent container"
2160        );
2161        let err = result.unwrap_err();
2162        match &err {
2163            CamelError::ProcessorError(msg) => {
2164                // Docker API returns 404 with "No such container" message
2165                assert!(
2166                    msg.to_lowercase().contains("no such container")
2167                        || msg.to_lowercase().contains("not found")
2168                        || msg.contains("404"),
2169                    "Error message should indicate container not found, got: {}",
2170                    msg
2171                );
2172            }
2173            _ => panic!("Expected ProcessorError, got: {:?}", err),
2174        }
2175    }
2176
2177    /// Test that run operation returns an error when no image is provided.
2178    #[tokio::test]
2179    async fn test_container_producer_run_missing_image() {
2180        // Try to connect to Docker - if fails, return early
2181        let docker = match Docker::connect_with_local_defaults() {
2182            Ok(d) => d,
2183            Err(_) => {
2184                eprintln!("Skipping test: Could not connect to Docker daemon");
2185                return;
2186            }
2187        };
2188
2189        if docker.ping().await.is_err() {
2190            eprintln!("Skipping test: Docker daemon not responding to ping");
2191            return;
2192        }
2193
2194        // Create producer without an image in the URI
2195        let component = ContainerComponent::new();
2196        let ctx = NoOpComponentContext;
2197        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2198        let ctx = ProducerContext::new();
2199        let mut producer = endpoint.create_producer(&ctx).unwrap();
2200
2201        let mut exchange = Exchange::new(Message::new(""));
2202        exchange
2203            .input
2204            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2205        // Deliberately NOT setting CamelContainerImage header
2206
2207        use tower::ServiceExt;
2208        let result = producer.ready().await.unwrap().call(exchange).await;
2209
2210        assert!(
2211            result.is_err(),
2212            "Expected error for run operation without image"
2213        );
2214        let err = result.unwrap_err();
2215        match &err {
2216            CamelError::ProcessorError(msg) => {
2217                assert!(
2218                    msg.to_lowercase().contains("image"),
2219                    "Error message should mention 'image', got: {}",
2220                    msg
2221                );
2222            }
2223            _ => panic!("Expected ProcessorError, got: {:?}", err),
2224        }
2225    }
2226
2227    /// Test that run operation uses image from header.
2228    #[tokio::test]
2229    async fn test_container_producer_run_image_from_header() {
2230        // Try to connect to Docker - if fails, return early
2231        let docker = match Docker::connect_with_local_defaults() {
2232            Ok(d) => d,
2233            Err(_) => {
2234                eprintln!("Skipping test: Could not connect to Docker daemon");
2235                return;
2236            }
2237        };
2238
2239        if docker.ping().await.is_err() {
2240            eprintln!("Skipping test: Docker daemon not responding to ping");
2241            return;
2242        }
2243
2244        // Create producer without an image in the URI
2245        let component = ContainerComponent::new();
2246        let ctx = NoOpComponentContext;
2247        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2248        let ctx = ProducerContext::new();
2249        let mut producer = endpoint.create_producer(&ctx).unwrap();
2250
2251        let mut exchange = Exchange::new(Message::new(""));
2252        exchange
2253            .input
2254            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2255        // Set a non-existent image to test that the run operation attempts to use it
2256        exchange.input.set_header(
2257            HEADER_IMAGE,
2258            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2259        );
2260
2261        use tower::ServiceExt;
2262        let result = producer.ready().await.unwrap().call(exchange).await;
2263
2264        // The operation should fail because the image doesn't exist
2265        assert!(
2266            result.is_err(),
2267            "Expected error when running container with nonexistent image"
2268        );
2269        let err = result.unwrap_err();
2270        match &err {
2271            CamelError::ProcessorError(msg) => {
2272                // Docker API returns an error about the missing image
2273                assert!(
2274                    msg.to_lowercase().contains("no such image")
2275                        || msg.to_lowercase().contains("not found")
2276                        || msg.to_lowercase().contains("image")
2277                        || msg.to_lowercase().contains("pull")
2278                        || msg.contains("404"),
2279                    "Error message should indicate image issue, got: {}",
2280                    msg
2281                );
2282            }
2283            _ => panic!("Expected ProcessorError, got: {:?}", err),
2284        }
2285    }
2286
2287    /// Integration test: Actually run a container with alpine:latest.
2288    /// This test verifies the full flow: create → start → set headers.
2289    #[tokio::test]
2290    async fn test_container_producer_run_alpine_container() {
2291        let docker = match Docker::connect_with_local_defaults() {
2292            Ok(d) => d,
2293            Err(_) => {
2294                eprintln!("Skipping test: Could not connect to Docker daemon");
2295                return;
2296            }
2297        };
2298
2299        if docker.ping().await.is_err() {
2300            eprintln!("Skipping test: Docker daemon not responding to ping");
2301            return;
2302        }
2303
2304        // Pull alpine:latest if not present
2305        let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2306        let has_alpine = images
2307            .iter()
2308            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2309
2310        if !has_alpine {
2311            eprintln!("Pulling alpine:latest image...");
2312            let mut stream = docker.create_image(
2313                Some(CreateImageOptions {
2314                    from_image: Some("alpine:latest".to_string()),
2315                    ..Default::default()
2316                }),
2317                None,
2318                None,
2319            );
2320
2321            use futures::StreamExt;
2322            while let Some(_item) = stream.next().await {
2323                // Wait for pull to complete
2324            }
2325            eprintln!("Image pulled successfully");
2326        }
2327
2328        // Create producer
2329        let component = ContainerComponent::new();
2330        let ctx = NoOpComponentContext;
2331        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2332        let ctx = ProducerContext::new();
2333        let mut producer = endpoint.create_producer(&ctx).unwrap();
2334
2335        // Run container with unique name
2336        let timestamp = std::time::SystemTime::now()
2337            .duration_since(std::time::UNIX_EPOCH)
2338            .unwrap()
2339            .as_millis();
2340        let container_name = format!("test-rust-camel-{}", timestamp);
2341        let mut exchange = Exchange::new(Message::new(""));
2342        exchange.input.set_header(
2343            HEADER_IMAGE,
2344            serde_json::Value::String("alpine:latest".into()),
2345        );
2346        exchange.input.set_header(
2347            HEADER_CONTAINER_NAME,
2348            serde_json::Value::String(container_name.clone()),
2349        );
2350
2351        use tower::ServiceExt;
2352        let result = producer
2353            .ready()
2354            .await
2355            .unwrap()
2356            .call(exchange)
2357            .await
2358            .expect("Container run should succeed");
2359
2360        // Verify container ID was set
2361        let container_id = result
2362            .input
2363            .header(HEADER_CONTAINER_ID)
2364            .and_then(|v| v.as_str().map(|s| s.to_string()))
2365            .expect("Expected container ID header");
2366        assert!(!container_id.is_empty(), "Container ID should not be empty");
2367
2368        // Verify success header
2369        assert_eq!(
2370            result
2371                .input
2372                .header(HEADER_ACTION_RESULT)
2373                .and_then(|v| v.as_str()),
2374            Some("success")
2375        );
2376
2377        // Verify container exists in Docker
2378        let inspect = docker
2379            .inspect_container(&container_id, None)
2380            .await
2381            .expect("Container should exist");
2382        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2383
2384        // Cleanup: remove container
2385        docker
2386            .remove_container(
2387                &container_id,
2388                Some(RemoveContainerOptions {
2389                    force: true,
2390                    ..Default::default()
2391                }),
2392            )
2393            .await
2394            .ok();
2395
2396        eprintln!("✅ Container {} created and cleaned up", container_id);
2397    }
2398
2399    /// Test that consumer returns an error for unsupported operations.
2400    #[tokio::test]
2401    async fn test_container_consumer_unsupported_operation() {
2402        use tokio::sync::mpsc;
2403
2404        let component = ContainerComponent::new();
2405        let ctx = NoOpComponentContext;
2406        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2407        let mut consumer = endpoint.create_consumer().unwrap();
2408
2409        // Create a minimal ConsumerContext
2410        let (tx, _rx) = mpsc::channel(16);
2411        let cancel_token = tokio_util::sync::CancellationToken::new();
2412        let context = ConsumerContext::new(tx, cancel_token);
2413
2414        let result = consumer.start(context).await;
2415
2416        // Should return error because "run" is not a supported consumer operation
2417        assert!(
2418            result.is_err(),
2419            "Expected error for unsupported consumer operation"
2420        );
2421        let err = result.unwrap_err();
2422        match &err {
2423            CamelError::EndpointCreationFailed(msg) => {
2424                assert!(
2425                    msg.contains("Consumer only supports 'events' or 'logs'"),
2426                    "Error message should mention events or logs support, got: {}",
2427                    msg
2428                );
2429            }
2430            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2431        }
2432    }
2433
2434    #[test]
2435    fn test_container_consumer_concurrency_model_is_concurrent() {
2436        let consumer = ContainerConsumer {
2437            config: ContainerConfig::from_uri("container:events").unwrap(),
2438        };
2439
2440        assert_eq!(
2441            consumer.concurrency_model(),
2442            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2443        );
2444    }
2445
2446    /// Test that consumer gracefully shuts down when cancellation is requested.
2447    /// This test requires a running Docker daemon. If Docker is not available, the test
2448    /// will be ignored.
2449    #[tokio::test]
2450    async fn test_container_consumer_cancellation() {
2451        use std::sync::atomic::{AtomicBool, Ordering};
2452        use tokio::sync::mpsc;
2453
2454        // Try to connect to Docker - if fails, return early
2455        let docker = match Docker::connect_with_local_defaults() {
2456            Ok(d) => d,
2457            Err(_) => {
2458                eprintln!("Skipping test: Could not connect to Docker daemon");
2459                return;
2460            }
2461        };
2462
2463        if docker.ping().await.is_err() {
2464            eprintln!("Skipping test: Docker daemon not responding to ping");
2465            return;
2466        }
2467
2468        let component = ContainerComponent::new();
2469        let ctx = NoOpComponentContext;
2470        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2471        let mut consumer = endpoint.create_consumer().unwrap();
2472
2473        // Create a ConsumerContext
2474        let (tx, _rx) = mpsc::channel(16);
2475        let cancel_token = tokio_util::sync::CancellationToken::new();
2476        let context = ConsumerContext::new(tx, cancel_token.clone());
2477
2478        // Track if the consumer task has completed
2479        let completed = Arc::new(AtomicBool::new(false));
2480        let completed_clone = completed.clone();
2481
2482        // Spawn consumer in background
2483        let handle = tokio::spawn(async move {
2484            let result = consumer.start(context).await;
2485            // Mark as completed when done
2486            completed_clone.store(true, Ordering::SeqCst);
2487            result
2488        });
2489
2490        // Wait a bit for the consumer to start
2491        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2492
2493        // Consumer should still be running (not completed)
2494        assert!(
2495            !completed.load(Ordering::SeqCst),
2496            "Consumer should still be running before cancellation"
2497        );
2498
2499        // Request cancellation
2500        cancel_token.cancel();
2501
2502        // Wait for the task to complete (with timeout)
2503        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2504
2505        // Task should have completed (not timed out)
2506        assert!(
2507            result.is_ok(),
2508            "Consumer should gracefully shut down after cancellation"
2509        );
2510
2511        // Verify the consumer completed
2512        assert!(
2513            completed.load(Ordering::SeqCst),
2514            "Consumer should have completed after cancellation"
2515        );
2516    }
2517
2518    /// Integration test for listing containers.
2519    /// This test requires a running Docker daemon. If Docker is not available, the test
2520    /// will return early and be effectively ignored.
2521    #[tokio::test]
2522    async fn test_container_producer_list_containers() {
2523        // Try to connect to Docker using default config
2524        // If ping fails, return early (effectively ignoring this test)
2525        let docker = match Docker::connect_with_local_defaults() {
2526            Ok(d) => d,
2527            Err(_) => {
2528                eprintln!("Skipping test: Could not connect to Docker daemon");
2529                return;
2530            }
2531        };
2532
2533        if docker.ping().await.is_err() {
2534            eprintln!("Skipping test: Docker daemon not responding to ping");
2535            return;
2536        }
2537
2538        // Create producer with list operation
2539        let component = ContainerComponent::new();
2540        let ctx = NoOpComponentContext;
2541        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2542
2543        let ctx = ProducerContext::new();
2544        let mut producer = endpoint.create_producer(&ctx).unwrap();
2545
2546        // Create exchange with list operation in header
2547        let mut exchange = Exchange::new(Message::new(""));
2548        exchange
2549            .input
2550            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2551
2552        // Call the producer
2553        use tower::ServiceExt;
2554        let result = producer
2555            .ready()
2556            .await
2557            .unwrap()
2558            .call(exchange)
2559            .await
2560            .expect("Producer should succeed when Docker is available");
2561
2562        // Assert that the input exchange body is a JSON array
2563        // (because list_containers should put the JSON result in the body)
2564        match &result.input.body {
2565            camel_component_api::Body::Json(json_value) => {
2566                assert!(
2567                    json_value.is_array(),
2568                    "Expected input body to be a JSON array, got: {:?}",
2569                    json_value
2570                );
2571            }
2572            other => panic!("Expected Body::Json with array, got: {:?}", other),
2573        }
2574    }
2575
2576    #[test]
2577    fn test_container_config_parses_volumes() {
2578        let config = ContainerConfig::from_uri(
2579            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2580        )
2581        .unwrap();
2582        assert_eq!(
2583            config.volumes.as_deref(),
2584            Some("./html:/usr/share/nginx/html:ro")
2585        );
2586    }
2587
2588    #[test]
2589    fn test_container_config_parses_exec_params() {
2590        let config = ContainerConfig::from_uri(
2591            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2592        )
2593        .unwrap();
2594        assert_eq!(config.operation, "exec");
2595        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2596        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2597        assert_eq!(config.user.as_deref(), Some("root"));
2598        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2599        assert!(config.detach);
2600    }
2601
2602    #[test]
2603    fn test_container_config_parses_network_create_params() {
2604        let config =
2605            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2606                .unwrap();
2607        assert_eq!(config.operation, "network-create");
2608        assert_eq!(config.name.as_deref(), Some("my-net"));
2609        assert_eq!(config.driver.as_deref(), Some("bridge"));
2610    }
2611
2612    #[test]
2613    fn test_container_config_defaults_new_fields() {
2614        let config = ContainerConfig::from_uri("container:list").unwrap();
2615        assert!(config.volumes.is_none());
2616        assert!(config.user.is_none());
2617        assert!(config.workdir.is_none());
2618        assert!(!config.detach);
2619        assert!(config.driver.is_none());
2620        assert!(!config.force);
2621    }
2622
2623    #[test]
2624    fn test_parse_volumes_bind_mount() {
2625        let config = ContainerConfig::from_uri(
2626            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2627        )
2628        .unwrap();
2629        let (binds, anon) = config.parse_volumes().unwrap();
2630        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2631        assert!(anon.is_empty());
2632    }
2633
2634    #[test]
2635    fn test_parse_volumes_named_volume() {
2636        let config =
2637            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2638                .unwrap();
2639        let (binds, anon) = config.parse_volumes().unwrap();
2640        assert_eq!(binds, vec!["data:/var/lib/data"]);
2641        assert!(anon.is_empty());
2642    }
2643
2644    #[test]
2645    fn test_parse_volumes_anonymous() {
2646        let config =
2647            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2648        let (binds, anon) = config.parse_volumes().unwrap();
2649        assert!(binds.is_empty());
2650        assert!(anon.contains(&"/tmp/app-data".to_string()));
2651    }
2652
2653    #[test]
2654    fn test_parse_volumes_anonymous_with_mode() {
2655        let config =
2656            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2657                .unwrap();
2658        let (binds, anon) = config.parse_volumes().unwrap();
2659        assert!(binds.is_empty());
2660        assert!(anon.contains(&"/tmp/app-data".to_string()));
2661    }
2662
2663    #[test]
2664    fn test_parse_volumes_multiple() {
2665        let config = ContainerConfig::from_uri(
2666            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2667        )
2668        .unwrap();
2669        let (binds, anon) = config.parse_volumes().unwrap();
2670        assert_eq!(binds.len(), 2);
2671        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2672        assert!(binds.contains(&"data:/var/log/app".to_string()));
2673        assert!(anon.is_empty());
2674    }
2675
2676    #[test]
2677    fn test_parse_volumes_mixed() {
2678        let config = ContainerConfig::from_uri(
2679            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2680        )
2681        .unwrap();
2682        let (binds, anon) = config.parse_volumes().unwrap();
2683        assert_eq!(binds.len(), 1);
2684        assert!(anon.contains(&"/tmp/cache".to_string()));
2685    }
2686
2687    #[test]
2688    fn test_parse_volumes_none() {
2689        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2690        assert!(config.parse_volumes().is_none());
2691    }
2692
2693    #[test]
2694    fn test_parse_volumes_empty_entry_skipped() {
2695        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2696        assert!(config.parse_volumes().is_none());
2697    }
2698
2699    #[test]
2700    fn test_parse_volumes_rw_mode() {
2701        let config =
2702            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2703                .unwrap();
2704        let (binds, _) = config.parse_volumes().unwrap();
2705        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2706    }
2707
2708    #[test]
2709    fn test_container_config_from_uri_parses_false_flags() {
2710        let config = ContainerConfig::from_uri(
2711            "container:logs?containerId=a&follow=false&timestamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2712        )
2713        .unwrap();
2714        assert!(!config.follow);
2715        assert!(!config.timestamps);
2716        assert!(!config.auto_pull);
2717        assert!(!config.auto_remove);
2718        assert!(config.detach);
2719        assert!(config.force);
2720    }
2721
2722    #[test]
2723    fn test_docker_socket_path_validation() {
2724        let unix_cfg =
2725            ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2726        assert_eq!(
2727            unix_cfg.docker_socket_path().unwrap(),
2728            "unix:///tmp/docker.sock"
2729        );
2730
2731        let npipe_cfg =
2732            ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2733                .unwrap();
2734        assert_eq!(
2735            npipe_cfg.docker_socket_path().unwrap(),
2736            "npipe:////./pipe/docker_engine"
2737        );
2738
2739        let plain_cfg =
2740            ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2741        assert_eq!(
2742            plain_cfg.docker_socket_path().unwrap(),
2743            "/var/run/docker.sock"
2744        );
2745
2746        let bad_cfg =
2747            ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2748        assert!(bad_cfg.docker_socket_path().is_err());
2749    }
2750
2751    #[test]
2752    fn test_parse_ports_invalid_and_whitespace_entries() {
2753        // "8080" without colon is malformed — must fail-fast
2754        let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2755        let err = cfg.parse_ports().unwrap_err();
2756        assert!(
2757            err.to_string().contains("malformed port mapping"),
2758            "expected malformed port error, got: {}",
2759            err
2760        );
2761
2762        let cfg =
2763            ContainerConfig::from_uri("container:run?ports= 8080:80 ,  5353:53/udp ").unwrap();
2764        let (exposed, bindings) = cfg.parse_ports().unwrap();
2765        assert!(exposed.contains(&"80/tcp".to_string()));
2766        assert!(exposed.contains(&"53/udp".to_string()));
2767        assert_eq!(bindings.len(), 2);
2768    }
2769
2770    #[test]
2771    fn test_parse_ports_fail_fast_on_malformed() {
2772        // First entry valid, second malformed — must fail on the malformed one
2773        let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2774        let err = cfg.parse_ports().unwrap_err();
2775        assert!(
2776            err.to_string()
2777                .contains("malformed port mapping 'badentry'"),
2778            "expected fail-fast on 'badentry', got: {}",
2779            err
2780        );
2781    }
2782
2783    #[test]
2784    fn test_parse_env_trims_and_filters_empty_items() {
2785        let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2786        let env = cfg.parse_env().unwrap();
2787        assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2788
2789        let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2790        assert!(cfg.parse_env().is_none());
2791    }
2792
2793    #[test]
2794    fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2795        assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2796        let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2797        assert!(binds.contains(&"a:/b:rw".to_string()));
2798        assert!(anon.contains(&"/tmp/cache".to_string()));
2799        assert!(anon.contains(&"/tmp/logs".to_string()));
2800    }
2801
2802    #[test]
2803    fn test_format_docker_event_variants_and_timestamp_extraction() {
2804        let mut attrs = std::collections::HashMap::new();
2805        attrs.insert("name".to_string(), "demo".to_string());
2806        attrs.insert("image".to_string(), "alpine:latest".to_string());
2807        attrs.insert("exitCode".to_string(), "137".to_string());
2808        let actor = bollard::models::EventActor {
2809            id: None,
2810            attributes: Some(attrs),
2811        };
2812
2813        let create_event = bollard::models::EventMessage {
2814            action: Some("create".to_string()),
2815            actor: Some(actor.clone()),
2816            ..Default::default()
2817        };
2818        assert_eq!(
2819            format_docker_event(&create_event),
2820            "[CREATE] Container demo (alpine:latest)"
2821        );
2822
2823        let die_event = bollard::models::EventMessage {
2824            action: Some("die".to_string()),
2825            actor: Some(actor),
2826            ..Default::default()
2827        };
2828        assert_eq!(
2829            format_docker_event(&die_event),
2830            "[DIE]    Container demo (exit: 137)"
2831        );
2832
2833        let other_event = bollard::models::EventMessage {
2834            action: Some("oom".to_string()),
2835            actor: None,
2836            ..Default::default()
2837        };
2838        assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2839
2840        assert_eq!(
2841            extract_timestamp("2024-01-01T00:00:00Z hello"),
2842            Some("2024-01-01T00:00:00Z".to_string())
2843        );
2844        assert_eq!(extract_timestamp("hello world"), None);
2845    }
2846
2847    #[tokio::test]
2848    async fn test_run_container_with_cleanup_error_paths() {
2849        let create_fail = run_container_with_cleanup(
2850            || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2851            |_id| async move { Ok(()) },
2852            |_id| async move { Ok(()) },
2853        )
2854        .await;
2855        assert!(
2856            matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2857        );
2858
2859        let cleanup_fail = run_container_with_cleanup(
2860            || async { Ok("cid-1".to_string()) },
2861            |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2862            |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2863        )
2864        .await;
2865        match cleanup_fail {
2866            Err(CamelError::ProcessorError(msg)) => {
2867                assert!(msg.contains("Failed to start container"));
2868                assert!(msg.contains("Cleanup failed"));
2869            }
2870            other => panic!("unexpected result: {:?}", other),
2871        }
2872    }
2873
2874    #[tokio::test]
2875    async fn test_container_producer_network_lifecycle() {
2876        let docker = match Docker::connect_with_local_defaults() {
2877            Ok(d) => d,
2878            Err(_) => {
2879                eprintln!("Skipping test: Could not connect to Docker daemon");
2880                return;
2881            }
2882        };
2883        if docker.ping().await.is_err() {
2884            eprintln!("Skipping test: Docker daemon not responding to ping");
2885            return;
2886        }
2887
2888        let network_name = format!("camel-test-{}", std::process::id());
2889
2890        let component = ContainerComponent::new();
2891        let component_ctx = NoOpComponentContext;
2892
2893        // Create network
2894        let endpoint = component
2895            .create_endpoint(
2896                &format!("container:network-create?name={}", network_name),
2897                &component_ctx,
2898            )
2899            .unwrap();
2900        let ctx = ProducerContext::new();
2901        let mut producer = endpoint.create_producer(&ctx).unwrap();
2902
2903        let mut exchange = Exchange::new(Message::new(""));
2904        exchange.input.set_header(
2905            HEADER_ACTION,
2906            serde_json::Value::String("network-create".into()),
2907        );
2908
2909        use tower::ServiceExt;
2910        let result = producer
2911            .ready()
2912            .await
2913            .unwrap()
2914            .call(exchange)
2915            .await
2916            .expect("Network create should succeed");
2917
2918        let network_id = result
2919            .input
2920            .header(HEADER_NETWORK)
2921            .and_then(|v| v.as_str().map(|s| s.to_string()))
2922            .expect("Should have network ID");
2923
2924        assert!(!network_id.is_empty());
2925
2926        // List networks
2927        let endpoint = component
2928            .create_endpoint("container:network-list", &component_ctx)
2929            .unwrap();
2930        let mut producer = endpoint.create_producer(&ctx).unwrap();
2931
2932        let mut exchange = Exchange::new(Message::new(""));
2933        exchange.input.set_header(
2934            HEADER_ACTION,
2935            serde_json::Value::String("network-list".into()),
2936        );
2937
2938        let list_result = producer
2939            .ready()
2940            .await
2941            .unwrap()
2942            .call(exchange)
2943            .await
2944            .expect("Network list should succeed");
2945
2946        match &list_result.input.body {
2947            Body::Json(json_value) => {
2948                assert!(json_value.is_array(), "Expected JSON array");
2949            }
2950            other => panic!("Expected Body::Json, got: {:?}", other),
2951        }
2952
2953        // Remove network
2954        let endpoint = component
2955            .create_endpoint(
2956                &format!("container:network-remove?network={}", network_name),
2957                &component_ctx,
2958            )
2959            .unwrap();
2960        let mut producer = endpoint.create_producer(&ctx).unwrap();
2961
2962        let mut exchange = Exchange::new(Message::new(""));
2963        exchange.input.set_header(
2964            HEADER_ACTION,
2965            serde_json::Value::String("network-remove".into()),
2966        );
2967
2968        let remove_result = producer
2969            .ready()
2970            .await
2971            .unwrap()
2972            .call(exchange)
2973            .await
2974            .expect("Network remove should succeed");
2975
2976        let action_result = remove_result
2977            .input
2978            .header(HEADER_ACTION_RESULT)
2979            .and_then(|v| v.as_str());
2980        assert_eq!(action_result, Some("success"));
2981    }
2982
2983    #[tokio::test]
2984    async fn test_logs_consumer_requires_container_id() {
2985        use tokio::sync::mpsc;
2986
2987        let mut consumer = ContainerConsumer {
2988            config: ContainerConfig::from_uri("container:logs").unwrap(),
2989        };
2990        let (tx, _rx) = mpsc::channel(4);
2991        let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
2992
2993        let err = consumer.start(context).await.unwrap_err();
2994        match err {
2995            CamelError::EndpointCreationFailed(msg) => {
2996                assert!(msg.contains("containerId is required for logs consumer"));
2997            }
2998            other => panic!("unexpected error: {:?}", other),
2999        }
3000    }
3001
3002    #[tokio::test]
3003    async fn test_events_consumer_stops_immediately_when_cancelled() {
3004        use tokio::sync::mpsc;
3005
3006        let mut consumer = ContainerConsumer {
3007            config: ContainerConfig::from_uri("container:events").unwrap(),
3008        };
3009
3010        let (tx, _rx) = mpsc::channel(4);
3011        let cancel = tokio_util::sync::CancellationToken::new();
3012        cancel.cancel();
3013        let context = ConsumerContext::new(tx, cancel);
3014
3015        let result = consumer.start(context).await;
3016        assert!(result.is_ok());
3017    }
3018
3019    #[test]
3020    fn test_global_config_constructors_and_endpoint_docker_host() {
3021        let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3022        let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3023        cfg.apply_global_defaults(&global);
3024
3025        let endpoint = ContainerEndpoint {
3026            uri: "container:list".to_string(),
3027            config: cfg,
3028        };
3029
3030        assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3031
3032        let component = ContainerComponent::with_config(global);
3033        assert_eq!(component.scheme(), "container");
3034    }
3035}