Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::models::{
19    ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
20};
21use bollard::query_parameters::{
22    CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
23    ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
24    StartContainerOptions,
25};
26use bollard::service::{HostConfig, PortBinding};
27use camel_component_api::parse_uri;
28use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
29use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
30use tower::Service;
31
32/// Global tracker for containers created by this component.
33/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
34static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
35    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
36
37/// Registers a container ID for tracking (will be cleaned up on shutdown).
38fn track_container(id: String) {
39    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
40        tracker.insert(id);
41    }
42}
43
44/// Removes a container ID from tracking (when it's been removed naturally).
45fn untrack_container(id: &str) {
46    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
47        tracker.remove(id);
48    }
49}
50
51/// Cleans up all tracked containers. Call this on application shutdown.
52pub async fn cleanup_tracked_containers() {
53    let ids: Vec<String> = {
54        match CONTAINER_TRACKER.lock() {
55            Ok(tracker) => tracker.iter().cloned().collect(),
56            Err(_) => return,
57        }
58    };
59
60    if ids.is_empty() {
61        return;
62    }
63
64    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
65
66    let docker = match Docker::connect_with_local_defaults() {
67        Ok(d) => d,
68        Err(e) => {
69            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
70            return;
71        }
72    };
73
74    for id in ids {
75        match docker
76            .remove_container(
77                &id,
78                Some(RemoveContainerOptions {
79                    force: true,
80                    ..Default::default()
81                }),
82            )
83            .await
84        {
85            Ok(_) => {
86                tracing::debug!("Cleaned up container {}", id);
87                untrack_container(&id);
88            }
89            Err(e) => {
90                tracing::warn!("Failed to cleanup container {}: {}", id, e);
91            }
92        }
93    }
94}
95
96// Header constants for container operations
97
98/// Timeout (seconds) for connecting to the Docker daemon.
99const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
100
101/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
102pub const HEADER_ACTION: &str = "CamelContainerAction";
103
104/// Header key for specifying the container image to use for "run" operations.
105pub const HEADER_IMAGE: &str = "CamelContainerImage";
106
107/// Header key for specifying or receiving the container ID.
108pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
109
110/// Header key for the log stream type (stdout or stderr).
111pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
112
113/// Header key for the log timestamp.
114pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
115
116/// Header key for specifying the container name for "run" operations.
117pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
118
119/// Header key for the result status of a container operation (e.g., "success").
120pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
121
122/// Header key for specifying the command to execute in a container.
123pub const HEADER_CMD: &str = "CamelContainerCmd";
124
125/// Header key for specifying the network name for network operations.
126pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
127
128/// Header key for the exit code of an exec operation.
129pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
130
131/// Header key for specifying volume mounts.
132pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
133
134/// Header key for the exec instance ID.
135pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
136
137// ---------------------------------------------------------------------------
138// ContainerGlobalConfig
139// ---------------------------------------------------------------------------
140
141/// Global configuration for Container component.
142/// Supports serde deserialization with defaults and builder methods.
143/// These are the fallback defaults when URI params are not set.
144#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
145#[serde(default)]
146pub struct ContainerGlobalConfig {
147    /// The Docker host URL (default: "unix:///var/run/docker.sock").
148    pub docker_host: String,
149}
150
151impl Default for ContainerGlobalConfig {
152    fn default() -> Self {
153        Self {
154            docker_host: "unix:///var/run/docker.sock".to_string(),
155        }
156    }
157}
158
159impl ContainerGlobalConfig {
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
165        self.docker_host = v.into();
166        self
167    }
168}
169
170// ---------------------------------------------------------------------------
171// ContainerConfig (endpoint configuration)
172// ---------------------------------------------------------------------------
173
174/// Configuration for the container component endpoint.
175///
176/// This struct holds the parsed URI configuration including the operation type,
177/// optional container image, and Docker host connection details.
178#[derive(Debug, Clone)]
179pub struct ContainerConfig {
180    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
181    pub operation: String,
182    /// The container image to use for "run" operations (can be overridden via header).
183    pub image: Option<String>,
184    /// The container name to use for "run" operations (can be overridden via header).
185    pub name: Option<String>,
186    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
187    pub host: Option<String>,
188    /// Command to run in the container (e.g., "sleep 30").
189    pub cmd: Option<String>,
190    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
191    pub ports: Option<String>,
192    /// Environment variables in format "KEY=value,KEY2=value2".
193    pub env: Option<String>,
194    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
195    pub network: Option<String>,
196    /// Container ID or name for logs consumer.
197    pub container_id: Option<String>,
198    /// Follow log output (default: true for consumer).
199    pub follow: bool,
200    /// Include timestamps in logs (default: false).
201    pub timestamps: bool,
202    /// Number of lines to show from the end of logs (default: all).
203    pub tail: Option<String>,
204    /// Automatically pull the image if not present (default: true).
205    pub auto_pull: bool,
206    /// Automatically remove the container when it exits (default: true).
207    pub auto_remove: bool,
208    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
209    pub volumes: Option<String>,
210    /// User to run the container or exec command as (e.g., "root").
211    pub user: Option<String>,
212    /// Working directory inside the container.
213    pub workdir: Option<String>,
214    /// Whether to detach from the exec process (default: false).
215    pub detach: bool,
216    /// Network driver for network-create (e.g., "bridge", "overlay").
217    pub driver: Option<String>,
218    /// Whether to force the operation (default: false).
219    pub force: bool,
220}
221
222impl ContainerConfig {
223    /// Parses a container URI into a `ContainerConfig`.
224    ///
225    /// # Arguments
226    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
227    ///
228    /// # Errors
229    /// Returns an error if the URI scheme is not "container".
230    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
231        let parts = parse_uri(uri)?;
232        if parts.scheme != "container" {
233            return Err(CamelError::InvalidUri(format!(
234                "expected scheme 'container', got '{}'",
235                parts.scheme
236            )));
237        }
238
239        let image = parts.params.get("image").cloned();
240        let name = parts.params.get("name").cloned();
241        let cmd = parts.params.get("cmd").cloned();
242        let ports = parts.params.get("ports").cloned();
243        let env = parts.params.get("env").cloned();
244        let network = parts.params.get("network").cloned();
245        let container_id = parts.params.get("containerId").cloned();
246        let follow = parts
247            .params
248            .get("follow")
249            .map(|v| v.eq_ignore_ascii_case("true"))
250            .unwrap_or(true);
251        let timestamps = parts
252            .params
253            .get("timestamps")
254            .map(|v| v.eq_ignore_ascii_case("true"))
255            .unwrap_or(false);
256        let tail = parts.params.get("tail").cloned();
257        let auto_pull = parts
258            .params
259            .get("autoPull")
260            .map(|v| v.eq_ignore_ascii_case("true"))
261            .unwrap_or(true);
262        let auto_remove = parts
263            .params
264            .get("autoRemove")
265            .map(|v| v.eq_ignore_ascii_case("true"))
266            .unwrap_or(true);
267        // host is only set from URI param; global config defaults are applied later
268        let host = parts.params.get("host").cloned();
269        let volumes = parts.params.get("volumes").cloned();
270        let user = parts.params.get("user").cloned();
271        let workdir = parts.params.get("workdir").cloned();
272        let detach = parts
273            .params
274            .get("detach")
275            .map(|v| v.eq_ignore_ascii_case("true"))
276            .unwrap_or(false);
277        let driver = parts.params.get("driver").cloned();
278        let force = parts
279            .params
280            .get("force")
281            .map(|v| v.eq_ignore_ascii_case("true"))
282            .unwrap_or(false);
283
284        Ok(Self {
285            operation: parts.path,
286            image,
287            name,
288            host,
289            cmd,
290            ports,
291            env,
292            network,
293            container_id,
294            follow,
295            timestamps,
296            tail,
297            auto_pull,
298            auto_remove,
299            volumes,
300            user,
301            workdir,
302            detach,
303            driver,
304            force,
305        })
306    }
307
308    /// Apply global config defaults to this endpoint config.
309    /// Only sets values that are currently `None`.
310    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
311        if self.host.is_none() {
312            self.host = Some(global.docker_host.clone());
313        }
314    }
315
316    fn docker_socket_path(&self) -> Result<&str, CamelError> {
317        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
318            "npipe:////./pipe/docker_engine"
319        } else {
320            "unix:///var/run/docker.sock"
321        });
322
323        if host.starts_with("unix://") || host.starts_with("npipe://") {
324            return Ok(host);
325        }
326
327        if host.contains("://") {
328            return Err(CamelError::ProcessorError(format!(
329                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
330                host
331            )));
332        }
333
334        Ok(host)
335    }
336
337    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
338        let socket_path = self.docker_socket_path()?;
339        Docker::connect_with_socket(
340            socket_path,
341            DOCKER_CONNECT_TIMEOUT_SECS,
342            bollard::API_DEFAULT_VERSION,
343        )
344        .map_err(|e| {
345            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
346        })
347    }
348
349    /// Connects to the Docker daemon using the configured host.
350    ///
351    /// This method establishes a Unix socket connection to Docker and verifies
352    /// the connection by sending a ping request.
353    ///
354    /// # Errors
355    /// Returns an error if the connection fails or the ping request fails.
356    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
357        let docker = self.connect_docker_client()?;
358        docker
359            .ping()
360            .await
361            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
362        Ok(docker)
363    }
364
365    #[allow(clippy::type_complexity)]
366    fn parse_ports(&self) -> Option<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>)> {
367        let ports_str = self.ports.as_ref()?;
368
369        let mut exposed_ports: Vec<String> = Vec::new();
370        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
371
372        for mapping in ports_str.split(',') {
373            let mapping = mapping.trim();
374            if mapping.is_empty() {
375                continue;
376            }
377
378            let (host_port, container_spec) = mapping.split_once(':')?;
379
380            let (container_port, protocol) = if container_spec.contains('/') {
381                let parts: Vec<&str> = container_spec.split('/').collect();
382                (parts[0], parts[1])
383            } else {
384                (container_spec, "tcp")
385            };
386
387            let container_key = format!("{}/{}", container_port, protocol);
388
389            exposed_ports.push(container_key.clone());
390
391            port_bindings.insert(
392                container_key,
393                Some(vec![PortBinding {
394                    host_ip: None,
395                    host_port: Some(host_port.to_string()),
396                }]),
397            );
398        }
399
400        if exposed_ports.is_empty() {
401            None
402        } else {
403            Some((exposed_ports, port_bindings))
404        }
405    }
406
407    fn parse_env(&self) -> Option<Vec<String>> {
408        let env_str = self.env.as_ref()?;
409
410        let env_vars: Vec<String> = env_str
411            .split(',')
412            .map(|s| s.trim().to_string())
413            .filter(|s| !s.is_empty())
414            .collect();
415
416        if env_vars.is_empty() {
417            None
418        } else {
419            Some(env_vars)
420        }
421    }
422
423    #[cfg(test)]
424    #[allow(clippy::type_complexity)]
425    fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
426        self.volumes.as_deref().and_then(parse_volume_str)
427    }
428}
429
430/// Parses a volume specification string into bind mounts and anonymous volumes.
431///
432/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
433/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
434#[allow(clippy::type_complexity)]
435fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
436    let mut binds: Vec<String> = Vec::new();
437    let mut anonymous_volumes: Vec<String> = Vec::new();
438
439    for entry in volumes_str.split(',') {
440        let entry = entry.trim();
441        if entry.is_empty() {
442            continue;
443        }
444
445        let segments: Vec<&str> = entry.split(':').collect();
446
447        match segments.len() {
448            3 => {
449                let source = segments[0];
450                let target = segments[1];
451                let mode = segments[2];
452                if mode != "ro" && mode != "rw" {
453                    continue;
454                }
455                binds.push(format!("{}:{}:{}", source, target, mode));
456            }
457            2 => {
458                let a = segments[0];
459                let b = segments[1];
460                if b == "ro" || b == "rw" {
461                    anonymous_volumes.push(a.to_string());
462                } else {
463                    binds.push(format!("{}:{}", a, b));
464                }
465            }
466            1 => {
467                anonymous_volumes.push(segments[0].to_string());
468            }
469            _ => continue,
470        }
471    }
472
473    if binds.is_empty() && anonymous_volumes.is_empty() {
474        None
475    } else {
476        Some((binds, anonymous_volumes))
477    }
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481enum ProducerOperation {
482    List,
483    Run,
484    Start,
485    Stop,
486    Remove,
487    Exec,
488    NetworkCreate,
489    NetworkConnect,
490    NetworkDisconnect,
491    NetworkRemove,
492    NetworkList,
493}
494
495fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
496    match operation {
497        "list" => Ok(ProducerOperation::List),
498        "run" => Ok(ProducerOperation::Run),
499        "start" => Ok(ProducerOperation::Start),
500        "stop" => Ok(ProducerOperation::Stop),
501        "remove" => Ok(ProducerOperation::Remove),
502        "exec" => Ok(ProducerOperation::Exec),
503        "network-create" => Ok(ProducerOperation::NetworkCreate),
504        "network-connect" => Ok(ProducerOperation::NetworkConnect),
505        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
506        "network-remove" => Ok(ProducerOperation::NetworkRemove),
507        "network-list" => Ok(ProducerOperation::NetworkList),
508        _ => Err(CamelError::ProcessorError(format!(
509            "Unknown container operation: {}",
510            operation
511        ))),
512    }
513}
514
515fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
516    exchange
517        .input
518        .header(HEADER_CONTAINER_NAME)
519        .and_then(|v| v.as_str().map(|s| s.to_string()))
520        .or_else(|| config.name.clone())
521}
522
523async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
524    let images = docker
525        .list_images(None::<ListImagesOptions>)
526        .await
527        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
528
529    Ok(images.iter().any(|img| {
530        img.repo_tags
531            .iter()
532            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
533    }))
534}
535
536async fn pull_image_with_progress(
537    docker: &Docker,
538    image: &str,
539    timeout_secs: u64,
540) -> Result<(), CamelError> {
541    use futures::StreamExt;
542
543    tracing::info!("Pulling image: {}", image);
544
545    let mut stream = docker.create_image(
546        Some(CreateImageOptions {
547            from_image: Some(image.to_string()),
548            ..Default::default()
549        }),
550        None,
551        None,
552    );
553
554    let start = std::time::Instant::now();
555    let mut last_progress = std::time::Instant::now();
556
557    while let Some(item) = stream.next().await {
558        if start.elapsed().as_secs() > timeout_secs {
559            return Err(CamelError::ProcessorError(format!(
560                "Image pull timeout after {}s. Try manually: docker pull {}",
561                timeout_secs, image
562            )));
563        }
564
565        match item {
566            Ok(update) => {
567                // Log progress every 2 seconds
568                if last_progress.elapsed().as_secs() >= 2 {
569                    if let Some(status) = update.status {
570                        tracing::debug!("Pull progress: {}", status);
571                    }
572                    last_progress = std::time::Instant::now();
573                }
574            }
575            Err(e) => {
576                let err_str = e.to_string().to_lowercase();
577                if err_str.contains("unauthorized") || err_str.contains("401") {
578                    return Err(CamelError::ProcessorError(format!(
579                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
580                        image
581                    )));
582                }
583                if err_str.contains("not found") || err_str.contains("404") {
584                    return Err(CamelError::ProcessorError(format!(
585                        "Image '{}' not found in registry. Check the image name and tag",
586                        image
587                    )));
588                }
589                return Err(CamelError::ProcessorError(format!(
590                    "Failed to pull image '{}': {}",
591                    image, e
592                )));
593            }
594        }
595    }
596
597    tracing::info!("Successfully pulled image: {}", image);
598    Ok(())
599}
600
601async fn ensure_image_available(
602    docker: &Docker,
603    image: &str,
604    auto_pull: bool,
605    timeout_secs: u64,
606) -> Result<(), CamelError> {
607    if image_exists_locally(docker, image).await? {
608        tracing::debug!("Image '{}' already available locally", image);
609        return Ok(());
610    }
611
612    if !auto_pull {
613        return Err(CamelError::ProcessorError(format!(
614            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
615            image, image
616        )));
617    }
618
619    pull_image_with_progress(docker, image, timeout_secs).await
620}
621
622fn format_docker_event(event: &bollard::models::EventMessage) -> String {
623    let action = event.action.as_deref().unwrap_or("unknown");
624    let actor = event.actor.as_ref();
625
626    let container_name = actor
627        .and_then(|a| a.attributes.as_ref())
628        .and_then(|attrs| attrs.get("name"))
629        .map(|s| s.as_str())
630        .unwrap_or("unknown");
631
632    let image = actor
633        .and_then(|a| a.attributes.as_ref())
634        .and_then(|attrs| attrs.get("image"))
635        .map(|s| s.as_str())
636        .unwrap_or("");
637
638    let exit_code = actor
639        .and_then(|a| a.attributes.as_ref())
640        .and_then(|attrs| attrs.get("exitCode"))
641        .map(|s| s.as_str());
642
643    match action {
644        "create" => {
645            if image.is_empty() {
646                format!("[CREATE] Container {}", container_name)
647            } else {
648                format!("[CREATE] Container {} ({})", container_name, image)
649            }
650        }
651        "start" => format!("[START]  Container {}", container_name),
652        "die" => {
653            if let Some(code) = exit_code {
654                format!("[DIE]    Container {} (exit: {})", container_name, code)
655            } else {
656                format!("[DIE]    Container {}", container_name)
657            }
658        }
659        "destroy" => format!("[DESTROY] Container {}", container_name),
660        "stop" => format!("[STOP]   Container {}", container_name),
661        "pause" => format!("[PAUSE]  Container {}", container_name),
662        "unpause" => format!("[UNPAUSE] Container {}", container_name),
663        "restart" => format!("[RESTART] Container {}", container_name),
664        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
665    }
666}
667
668async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
669    create: CreateFn,
670    start: StartFn,
671    remove: RemoveFn,
672) -> Result<String, CamelError>
673where
674    CreateFn: FnOnce() -> CreateFut,
675    CreateFut: Future<Output = Result<String, CamelError>>,
676    StartFn: FnOnce(String) -> StartFut,
677    StartFut: Future<Output = Result<(), CamelError>>,
678    RemoveFn: FnOnce(String) -> RemoveFut,
679    RemoveFut: Future<Output = Result<(), CamelError>>,
680{
681    let container_id = create().await?;
682    if let Err(start_err) = start(container_id.clone()).await {
683        if let Err(remove_err) = remove(container_id.clone()).await {
684            return Err(CamelError::ProcessorError(format!(
685                "Failed to start container: {}. Cleanup failed: {}",
686                start_err, remove_err
687            )));
688        }
689        return Err(start_err);
690    }
691
692    Ok(container_id)
693}
694
695async fn handle_list(
696    docker: Docker,
697    _config: ContainerConfig,
698    exchange: &mut Exchange,
699) -> Result<(), CamelError> {
700    let containers = docker
701        .list_containers(None::<ListContainersOptions>)
702        .await
703        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
704
705    let json_value = serde_json::to_value(&containers).map_err(|e| {
706        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
707    })?;
708
709    exchange.input.body = Body::Json(json_value);
710    exchange.input.set_header(
711        HEADER_ACTION_RESULT,
712        serde_json::Value::String("success".to_string()),
713    );
714    Ok(())
715}
716
717async fn handle_run(
718    docker: Docker,
719    config: ContainerConfig,
720    exchange: &mut Exchange,
721) -> Result<(), CamelError> {
722    let image = exchange
723        .input
724        .header(HEADER_IMAGE)
725        .and_then(|v| v.as_str().map(|s| s.to_string()))
726        .or(config.image.clone())
727        .ok_or_else(|| {
728            CamelError::ProcessorError(
729                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
730            )
731        })?;
732
733    let image = if !image.contains(':') && !image.contains('@') {
734        format!("{}:latest", image)
735    } else {
736        image
737    };
738
739    let pull_timeout = 300;
740    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
741        .await
742        .map_err(|e| {
743            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
744        })?;
745
746    let container_name = resolve_container_name(exchange, &config);
747    let container_name_ref = container_name.as_deref().unwrap_or("");
748    let cmd_parts: Option<Vec<String>> = config
749        .cmd
750        .as_ref()
751        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
752    let auto_remove = config.auto_remove;
753    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
754    let env_vars = config.parse_env();
755    let network_mode = config.network.clone();
756
757    let volumes_str = exchange
758        .input
759        .header(HEADER_VOLUMES)
760        .and_then(|v| v.as_str().map(|s| s.to_string()))
761        .or(config.volumes.clone());
762    let (binds, anon_volumes) = volumes_str
763        .as_deref()
764        .and_then(parse_volume_str)
765        .unwrap_or_default();
766
767    let docker_create = docker.clone();
768    let docker_start = docker.clone();
769    let docker_remove = docker.clone();
770
771    let container_id = run_container_with_cleanup(
772        move || async move {
773            let create_options = CreateContainerOptions {
774                name: Some(container_name_ref.to_string()),
775                ..Default::default()
776            };
777            let container_config = ContainerCreateBody {
778                image: Some(image.clone()),
779                cmd: cmd_parts,
780                env: env_vars,
781                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
782                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
783                host_config: Some(HostConfig {
784                    auto_remove: Some(auto_remove),
785                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
786                    network_mode,
787                    binds: if binds.is_empty() { None } else { Some(binds) },
788                    ..Default::default()
789                }),
790                ..Default::default()
791            };
792
793            let create_response = docker_create
794                .create_container(Some(create_options), container_config)
795                .await
796                .map_err(|e| {
797                    let err_str = e.to_string().to_lowercase();
798                    if err_str.contains("409") || err_str.contains("conflict") {
799                        CamelError::ProcessorError(format!(
800                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
801                            container_name_ref
802                        ))
803                    } else {
804                        CamelError::ProcessorError(format!(
805                            "Failed to create container: {}",
806                            e
807                        ))
808                    }
809                })?;
810
811            Ok(create_response.id)
812        },
813        move |container_id| async move {
814            docker_start
815                .start_container(&container_id, None::<StartContainerOptions>)
816                .await
817                .map_err(|e| {
818                    CamelError::ProcessorError(format!(
819                        "Failed to start container: {}",
820                        e
821                    ))
822                })
823        },
824        move |container_id| async move {
825            docker_remove
826                .remove_container(&container_id, None)
827                .await
828                .map_err(|e| {
829                    CamelError::ProcessorError(format!(
830                        "Failed to remove container after start failure: {}",
831                        e
832                    ))
833                })
834        },
835    )
836    .await?;
837
838    track_container(container_id.clone());
839
840    exchange
841        .input
842        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
843    exchange.input.set_header(
844        HEADER_ACTION_RESULT,
845        serde_json::Value::String("success".to_string()),
846    );
847    Ok(())
848}
849
850async fn handle_lifecycle(
851    docker: Docker,
852    _config: ContainerConfig,
853    exchange: &mut Exchange,
854    operation: ProducerOperation,
855    operation_name: &str,
856) -> Result<(), CamelError> {
857    let container_id = exchange
858        .input
859        .header(HEADER_CONTAINER_ID)
860        .and_then(|v| v.as_str().map(|s| s.to_string()))
861        .ok_or_else(|| {
862            CamelError::ProcessorError(format!(
863                "{} header is required for {} operation",
864                HEADER_CONTAINER_ID, operation_name
865            ))
866        })?;
867
868    match operation {
869        ProducerOperation::Start => {
870            docker
871                .start_container(&container_id, None::<StartContainerOptions>)
872                .await
873                .map_err(|e| {
874                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
875                })?;
876        }
877        ProducerOperation::Stop => {
878            docker
879                .stop_container(&container_id, None)
880                .await
881                .map_err(|e| {
882                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
883                })?;
884        }
885        ProducerOperation::Remove => {
886            docker
887                .remove_container(&container_id, None)
888                .await
889                .map_err(|e| {
890                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
891                })?;
892            untrack_container(&container_id);
893        }
894        _ => {}
895    }
896
897    exchange.input.set_header(
898        HEADER_ACTION_RESULT,
899        serde_json::Value::String("success".to_string()),
900    );
901    Ok(())
902}
903
904async fn handle_exec(
905    docker: Docker,
906    config: ContainerConfig,
907    exchange: &mut Exchange,
908) -> Result<(), CamelError> {
909    let container_id = exchange
910        .input
911        .header(HEADER_CONTAINER_ID)
912        .and_then(|v| v.as_str().map(|s| s.to_string()))
913        .or(config.container_id.clone())
914        .ok_or_else(|| {
915            CamelError::ProcessorError(format!(
916                "{} header or containerId param is required for exec operation",
917                HEADER_CONTAINER_ID
918            ))
919        })?;
920
921    let cmd = exchange
922        .input
923        .header(HEADER_CMD)
924        .and_then(|v| v.as_str().map(|s| s.to_string()))
925        .or(config.cmd.clone())
926        .ok_or_else(|| {
927            CamelError::ProcessorError(
928                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
929            )
930        })?;
931
932    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
933    let env_vars = config.parse_env();
934
935    let exec_config = bollard::exec::CreateExecOptions {
936        cmd: Some(cmd_parts),
937        env: env_vars,
938        user: config.user.clone(),
939        working_dir: config.workdir.clone(),
940        attach_stdout: Some(true),
941        attach_stderr: Some(true),
942        ..Default::default()
943    };
944
945    let create_result = docker
946        .create_exec(&container_id, exec_config)
947        .await
948        .map_err(|e| {
949            let err_str = e.to_string().to_lowercase();
950            if err_str.contains("404") || err_str.contains("no such") {
951                CamelError::ProcessorError(format!(
952                    "Container '{}' not found for exec",
953                    container_id
954                ))
955            } else {
956                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
957            }
958        })?;
959
960    let exec_id = create_result.id;
961
962    if config.detach {
963        docker
964            .start_exec(
965                &exec_id,
966                Some(bollard::exec::StartExecOptions {
967                    detach: true,
968                    ..Default::default()
969                }),
970            )
971            .await
972            .map_err(|e| {
973                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
974            })?;
975
976        exchange
977            .input
978            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
979        exchange
980            .input
981            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
982    } else {
983        let start_result = docker
984            .start_exec(&exec_id, None)
985            .await
986            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
987
988        let mut output = String::new();
989
990        match start_result {
991            bollard::exec::StartExecResults::Attached {
992                output: mut stream, ..
993            } => {
994                use futures::StreamExt;
995                while let Some(msg) = stream.next().await {
996                    match msg {
997                        Ok(bollard::container::LogOutput::StdOut { message }) => {
998                            output.push_str(&String::from_utf8_lossy(&message));
999                        }
1000                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1001                            output.push_str(&String::from_utf8_lossy(&message));
1002                        }
1003                        Ok(_) => {}
1004                        Err(e) => {
1005                            output.push_str(&format!("[error reading stream: {}]", e));
1006                        }
1007                    }
1008                }
1009            }
1010            bollard::exec::StartExecResults::Detached => {}
1011        }
1012
1013        let inspect = docker
1014            .inspect_exec(&exec_id)
1015            .await
1016            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1017
1018        let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1019
1020        let output = output.trim_end().to_string();
1021        exchange.input.body = Body::Text(output);
1022        exchange.input.set_header(
1023            HEADER_EXIT_CODE,
1024            serde_json::Value::Number(exit_code.into()),
1025        );
1026        exchange
1027            .input
1028            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1029    }
1030
1031    exchange.input.set_header(
1032        HEADER_ACTION_RESULT,
1033        serde_json::Value::String("success".to_string()),
1034    );
1035    Ok(())
1036}
1037
1038async fn handle_network_create(
1039    docker: Docker,
1040    config: ContainerConfig,
1041    exchange: &mut Exchange,
1042) -> Result<(), CamelError> {
1043    let network_name = exchange
1044        .input
1045        .header(HEADER_CONTAINER_NAME)
1046        .and_then(|v| v.as_str().map(|s| s.to_string()))
1047        .or(config.name.clone())
1048        .ok_or_else(|| {
1049            CamelError::ProcessorError(
1050                "CamelContainerName header or name param is required for network-create"
1051                    .to_string(),
1052            )
1053        })?;
1054
1055    let driver = config.driver.as_deref().unwrap_or("bridge");
1056
1057    let options = NetworkCreateRequest {
1058        name: network_name.clone(),
1059        driver: Some(driver.to_string()),
1060        ..Default::default()
1061    };
1062
1063    let result = docker.create_network(options).await.map_err(|e| {
1064        let err_str = e.to_string().to_lowercase();
1065        if err_str.contains("409") || err_str.contains("already exists") {
1066            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1067        } else {
1068            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1069        }
1070    })?;
1071
1072    let network_id = result.id.clone();
1073    let json_value = serde_json::to_value(&result).map_err(|e| {
1074        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1075    })?;
1076
1077    exchange.input.body = Body::Json(json_value);
1078    exchange
1079        .input
1080        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1081    exchange.input.set_header(
1082        HEADER_ACTION_RESULT,
1083        serde_json::Value::String("success".to_string()),
1084    );
1085    Ok(())
1086}
1087
1088async fn handle_network_connect(
1089    docker: Docker,
1090    config: ContainerConfig,
1091    exchange: &mut Exchange,
1092) -> Result<(), CamelError> {
1093    let network = exchange
1094        .input
1095        .header(HEADER_NETWORK)
1096        .and_then(|v| v.as_str().map(|s| s.to_string()))
1097        .or(config.network.clone())
1098        .ok_or_else(|| {
1099            CamelError::ProcessorError(
1100                "CamelContainerNetwork header or network param is required for network-connect"
1101                    .to_string(),
1102            )
1103        })?;
1104
1105    let container = exchange
1106        .input
1107        .header(HEADER_CONTAINER_ID)
1108        .and_then(|v| v.as_str().map(|s| s.to_string()))
1109        .or(config.container_id.clone())
1110        .ok_or_else(|| {
1111            CamelError::ProcessorError(
1112                "CamelContainerId header or container param is required for network-connect"
1113                    .to_string(),
1114            )
1115        })?;
1116
1117    docker
1118        .connect_network(
1119            &network,
1120            NetworkConnectRequest {
1121                container,
1122                ..Default::default()
1123            },
1124        )
1125        .await
1126        .map_err(|e| {
1127            let err_str = e.to_string().to_lowercase();
1128            if err_str.contains("404") || err_str.contains("not found") {
1129                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1130            } else {
1131                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1132            }
1133        })?;
1134
1135    exchange.input.set_header(
1136        HEADER_ACTION_RESULT,
1137        serde_json::Value::String("success".to_string()),
1138    );
1139    Ok(())
1140}
1141
1142async fn handle_network_disconnect(
1143    docker: Docker,
1144    config: ContainerConfig,
1145    exchange: &mut Exchange,
1146) -> Result<(), CamelError> {
1147    let network = exchange
1148        .input
1149        .header(HEADER_NETWORK)
1150        .and_then(|v| v.as_str().map(|s| s.to_string()))
1151        .or(config.network.clone())
1152        .ok_or_else(|| {
1153            CamelError::ProcessorError(
1154                "CamelContainerNetwork header or network param is required for network-disconnect"
1155                    .to_string(),
1156            )
1157        })?;
1158
1159    let container = exchange
1160        .input
1161        .header(HEADER_CONTAINER_ID)
1162        .and_then(|v| v.as_str().map(|s| s.to_string()))
1163        .or(config.container_id.clone())
1164        .ok_or_else(|| {
1165            CamelError::ProcessorError(
1166                "CamelContainerId header or container param is required for network-disconnect"
1167                    .to_string(),
1168            )
1169        })?;
1170
1171    docker
1172        .disconnect_network(
1173            &network,
1174            NetworkDisconnectRequest {
1175                container,
1176                force: Some(config.force),
1177            },
1178        )
1179        .await
1180        .map_err(|e| {
1181            let err_str = e.to_string().to_lowercase();
1182            if err_str.contains("404") || err_str.contains("not found") {
1183                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1184            } else {
1185                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1186            }
1187        })?;
1188
1189    exchange.input.set_header(
1190        HEADER_ACTION_RESULT,
1191        serde_json::Value::String("success".to_string()),
1192    );
1193    Ok(())
1194}
1195
1196async fn handle_network_remove(
1197    docker: Docker,
1198    config: ContainerConfig,
1199    exchange: &mut Exchange,
1200) -> Result<(), CamelError> {
1201    let network = exchange
1202        .input
1203        .header(HEADER_NETWORK)
1204        .and_then(|v| v.as_str().map(|s| s.to_string()))
1205        .or(config.network.clone())
1206        .ok_or_else(|| {
1207            CamelError::ProcessorError(
1208                "CamelContainerNetwork header or network param is required for network-remove"
1209                    .to_string(),
1210            )
1211        })?;
1212
1213    docker.remove_network(&network).await.map_err(|e| {
1214        let err_str = e.to_string().to_lowercase();
1215        if err_str.contains("404") || err_str.contains("not found") {
1216            CamelError::ProcessorError(format!("Network '{}' not found", network))
1217        } else if err_str.contains("409") || err_str.contains("in use") {
1218            CamelError::ProcessorError(format!(
1219                "Network '{}' is in use and cannot be removed",
1220                network
1221            ))
1222        } else {
1223            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1224        }
1225    })?;
1226
1227    exchange.input.set_header(
1228        HEADER_ACTION_RESULT,
1229        serde_json::Value::String("success".to_string()),
1230    );
1231    Ok(())
1232}
1233
1234async fn handle_network_list(
1235    docker: Docker,
1236    _config: ContainerConfig,
1237    exchange: &mut Exchange,
1238) -> Result<(), CamelError> {
1239    let networks = docker
1240        .list_networks(None::<ListNetworksOptions>)
1241        .await
1242        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1243
1244    let json_value = serde_json::to_value(&networks)
1245        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1246
1247    exchange.input.body = Body::Json(json_value);
1248    exchange.input.set_header(
1249        HEADER_ACTION_RESULT,
1250        serde_json::Value::String("success".to_string()),
1251    );
1252    Ok(())
1253}
1254
1255/// Producer for executing container operations.
1256///
1257/// This producer handles synchronous container operations like listing,
1258/// creating, starting, stopping, and removing containers.
1259#[derive(Clone)]
1260pub struct ContainerProducer {
1261    config: ContainerConfig,
1262    docker: Docker,
1263}
1264
1265impl Service<Exchange> for ContainerProducer {
1266    type Response = Exchange;
1267    type Error = CamelError;
1268    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1269
1270    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1271        Poll::Ready(Ok(()))
1272    }
1273
1274    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1275        let config = self.config.clone();
1276        let docker = self.docker.clone();
1277        Box::pin(async move {
1278            let operation_name = exchange
1279                .input
1280                .header(HEADER_ACTION)
1281                .and_then(|v| v.as_str().map(|s| s.to_string()))
1282                .unwrap_or_else(|| config.operation.clone());
1283
1284            let operation = parse_producer_operation(&operation_name)?;
1285
1286            match operation {
1287                ProducerOperation::List => {
1288                    handle_list(docker, config, &mut exchange).await?;
1289                }
1290                ProducerOperation::Run => {
1291                    handle_run(docker, config, &mut exchange).await?;
1292                }
1293                ProducerOperation::Start => {
1294                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1295                        .await?;
1296                }
1297                ProducerOperation::Stop => {
1298                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1299                        .await?;
1300                }
1301                ProducerOperation::Remove => {
1302                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1303                        .await?;
1304                }
1305                ProducerOperation::Exec => {
1306                    handle_exec(docker, config, &mut exchange).await?;
1307                }
1308                ProducerOperation::NetworkCreate => {
1309                    handle_network_create(docker, config, &mut exchange).await?;
1310                }
1311                ProducerOperation::NetworkConnect => {
1312                    handle_network_connect(docker, config, &mut exchange).await?;
1313                }
1314                ProducerOperation::NetworkDisconnect => {
1315                    handle_network_disconnect(docker, config, &mut exchange).await?;
1316                }
1317                ProducerOperation::NetworkRemove => {
1318                    handle_network_remove(docker, config, &mut exchange).await?;
1319                }
1320                ProducerOperation::NetworkList => {
1321                    handle_network_list(docker, config, &mut exchange).await?;
1322                }
1323            }
1324
1325            Ok(exchange)
1326        })
1327    }
1328}
1329
1330/// Consumer for receiving Docker container events or logs.
1331///
1332/// This consumer subscribes to Docker events or container logs and forwards them
1333/// to the route as exchanges. It implements automatic reconnection on connection failures.
1334pub struct ContainerConsumer {
1335    config: ContainerConfig,
1336}
1337
1338#[async_trait]
1339impl Consumer for ContainerConsumer {
1340    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1341        match self.config.operation.as_str() {
1342            "events" => self.start_events_consumer(context).await,
1343            "logs" => self.start_logs_consumer(context).await,
1344            _ => Err(CamelError::EndpointCreationFailed(format!(
1345                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1346                self.config.operation
1347            ))),
1348        }
1349    }
1350
1351    async fn stop(&mut self) -> Result<(), CamelError> {
1352        Ok(())
1353    }
1354
1355    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1356        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1357    }
1358}
1359
1360impl ContainerConsumer {
1361    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1362        use futures::StreamExt;
1363
1364        loop {
1365            if context.is_cancelled() {
1366                tracing::info!("Container events consumer shutting down");
1367                return Ok(());
1368            }
1369
1370            let docker = match self.config.connect_docker().await {
1371                Ok(d) => d,
1372                Err(e) => {
1373                    tracing::error!(
1374                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
1375                        e
1376                    );
1377                    tokio::select! {
1378                        _ = context.cancelled() => {
1379                            tracing::info!("Container events consumer shutting down");
1380                            return Ok(());
1381                        }
1382                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1383                    }
1384                    continue;
1385                }
1386            };
1387
1388            let mut event_stream = docker.events(None::<EventsOptions>);
1389
1390            loop {
1391                tokio::select! {
1392                    _ = context.cancelled() => {
1393                        tracing::info!("Container events consumer shutting down");
1394                        return Ok(());
1395                    }
1396
1397                    msg = event_stream.next() => {
1398                        match msg {
1399                            Some(Ok(event)) => {
1400                                let formatted = format_docker_event(&event);
1401                                let message = Message::new(Body::Text(formatted));
1402                                let exchange = Exchange::new(message);
1403
1404                                if let Err(e) = context.send(exchange).await {
1405                                    tracing::error!("Failed to send exchange: {:?}", e);
1406                                    break;
1407                                }
1408                            }
1409                            Some(Err(e)) => {
1410                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1411                                break;
1412                            }
1413                            None => {
1414                                tracing::info!("Docker event stream ended. Reconnecting...");
1415                                break;
1416                            }
1417                        }
1418                    }
1419                }
1420            }
1421
1422            tokio::select! {
1423                _ = context.cancelled() => {
1424                    tracing::info!("Container events consumer shutting down");
1425                    return Ok(());
1426                }
1427                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1428            }
1429        }
1430    }
1431
1432    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1433        use futures::StreamExt;
1434
1435        let container_id = self.config.container_id.clone().ok_or_else(|| {
1436            CamelError::EndpointCreationFailed(
1437                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1438                    .to_string(),
1439            )
1440        })?;
1441
1442        loop {
1443            if context.is_cancelled() {
1444                tracing::info!("Container logs consumer shutting down");
1445                return Ok(());
1446            }
1447
1448            let docker = match self.config.connect_docker().await {
1449                Ok(d) => d,
1450                Err(e) => {
1451                    tracing::error!(
1452                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1453                        e
1454                    );
1455                    tokio::select! {
1456                        _ = context.cancelled() => {
1457                            tracing::info!("Container logs consumer shutting down");
1458                            return Ok(());
1459                        }
1460                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1461                    }
1462                    continue;
1463                }
1464            };
1465
1466            let tail = self
1467                .config
1468                .tail
1469                .clone()
1470                .unwrap_or_else(|| "all".to_string());
1471
1472            let options = LogsOptions {
1473                follow: self.config.follow,
1474                stdout: true,
1475                stderr: true,
1476                timestamps: self.config.timestamps,
1477                tail,
1478                ..Default::default()
1479            };
1480
1481            let mut log_stream = docker.logs(&container_id, Some(options));
1482            let container_id_header = container_id.clone();
1483
1484            loop {
1485                tokio::select! {
1486                    _ = context.cancelled() => {
1487                        tracing::info!("Container logs consumer shutting down");
1488                        return Ok(());
1489                    }
1490
1491                    msg = log_stream.next() => {
1492                        match msg {
1493                            Some(Ok(log_output)) => {
1494                                let (stream_type, content) = match log_output {
1495                                    bollard::container::LogOutput::StdOut { message } => {
1496                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1497                                    }
1498                                    bollard::container::LogOutput::StdErr { message } => {
1499                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1500                                    }
1501                                    bollard::container::LogOutput::Console { message } => {
1502                                        ("console", String::from_utf8_lossy(&message).into_owned())
1503                                    }
1504                                    bollard::container::LogOutput::StdIn { message } => {
1505                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1506                                    }
1507                                };
1508
1509                                let content = content.trim_end();
1510                                if content.is_empty() {
1511                                    continue;
1512                                }
1513
1514                                let mut message = Message::new(Body::Text(content.to_string()));
1515                                message.set_header(
1516                                    HEADER_CONTAINER_ID,
1517                                    serde_json::Value::String(container_id_header.clone()),
1518                                );
1519                                message.set_header(
1520                                    HEADER_LOG_STREAM,
1521                                    serde_json::Value::String(stream_type.to_string()),
1522                                );
1523
1524                                if self.config.timestamps
1525                                    && let Some(ts) = extract_timestamp(content) {
1526                                        message.set_header(
1527                                            HEADER_LOG_TIMESTAMP,
1528                                            serde_json::Value::String(ts),
1529                                        );
1530                                    }
1531
1532                                let exchange = Exchange::new(message);
1533
1534                                if let Err(e) = context.send(exchange).await {
1535                                    tracing::error!("Failed to send log exchange: {:?}", e);
1536                                    break;
1537                                }
1538                            }
1539                            Some(Err(e)) => {
1540                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1541                                break;
1542                            }
1543                            None => {
1544                                if self.config.follow {
1545                                    tracing::info!("Docker log stream ended. Reconnecting...");
1546                                    break;
1547                                } else {
1548                                    tracing::info!("Container logs consumer finished (follow=false)");
1549                                    return Ok(());
1550                                }
1551                            }
1552                        }
1553                    }
1554                }
1555            }
1556
1557            tokio::select! {
1558                _ = context.cancelled() => {
1559                    tracing::info!("Container logs consumer shutting down");
1560                    return Ok(());
1561                }
1562                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1563            }
1564        }
1565    }
1566}
1567
1568fn extract_timestamp(log_line: &str) -> Option<String> {
1569    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1570    if parts.len() > 1 && parts[0].contains('T') {
1571        Some(parts[0].to_string())
1572    } else {
1573        None
1574    }
1575}
1576
1577/// Component for creating container endpoints.
1578///
1579/// This component handles URIs with the "container" scheme and creates
1580/// appropriate producer and consumer endpoints for Docker operations.
1581///
1582/// Containers created via `run` operation are tracked globally and can be
1583/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1584pub struct ContainerComponent {
1585    config: Option<ContainerGlobalConfig>,
1586}
1587
1588impl ContainerComponent {
1589    /// Creates a new container component instance without global config.
1590    pub fn new() -> Self {
1591        Self { config: None }
1592    }
1593
1594    /// Creates a container component with the given global config.
1595    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1596        Self {
1597            config: Some(config),
1598        }
1599    }
1600
1601    /// Creates a container component with optional global config.
1602    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1603        Self { config }
1604    }
1605}
1606
1607impl Default for ContainerComponent {
1608    fn default() -> Self {
1609        Self::new()
1610    }
1611}
1612
1613impl Component for ContainerComponent {
1614    fn scheme(&self) -> &str {
1615        "container"
1616    }
1617
1618    fn create_endpoint(
1619        &self,
1620        uri: &str,
1621        _ctx: &dyn camel_component_api::ComponentContext,
1622    ) -> Result<Box<dyn Endpoint>, CamelError> {
1623        let mut config = ContainerConfig::from_uri(uri)?;
1624        // Apply global defaults if present and URI didn't set them
1625        if let Some(ref global) = self.config {
1626            config.apply_global_defaults(global);
1627        }
1628        Ok(Box::new(ContainerEndpoint {
1629            uri: uri.to_string(),
1630            config,
1631        }))
1632    }
1633}
1634
1635/// Endpoint for container operations.
1636///
1637/// This endpoint creates producers for executing container operations
1638/// and consumers for receiving container events.
1639pub struct ContainerEndpoint {
1640    uri: String,
1641    config: ContainerConfig,
1642}
1643
1644impl ContainerEndpoint {
1645    /// Returns the Docker host configured for this endpoint.
1646    /// Returns `None` if not set (for testing purposes).
1647    pub fn docker_host(&self) -> Option<&str> {
1648        self.config.host.as_deref()
1649    }
1650}
1651
1652impl Endpoint for ContainerEndpoint {
1653    fn uri(&self) -> &str {
1654        &self.uri
1655    }
1656
1657    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1658        Ok(Box::new(ContainerConsumer {
1659            config: self.config.clone(),
1660        }))
1661    }
1662
1663    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1664        let docker = self.config.connect_docker_client()?;
1665        Ok(BoxProcessor::new(ContainerProducer {
1666            config: self.config.clone(),
1667            docker,
1668        }))
1669    }
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674    use super::*;
1675    use camel_component_api::NoOpComponentContext;
1676
1677    #[test]
1678    fn test_container_config() {
1679        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1680        assert_eq!(config.operation, "run");
1681        assert_eq!(config.image.as_deref(), Some("alpine"));
1682        // host is None by default; global config applies it later
1683        assert!(config.host.is_none());
1684    }
1685
1686    #[test]
1687    fn test_global_config_applied_to_endpoint() {
1688        // When global config is set and URI doesn't specify host,
1689        // apply_global_defaults should set host from global config.
1690        let global =
1691            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1692        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1693        assert!(
1694            config.host.is_none(),
1695            "URI without ?host= should leave host as None"
1696        );
1697        config.apply_global_defaults(&global);
1698        assert_eq!(
1699            config.host.as_deref(),
1700            Some("unix:///custom/docker.sock"),
1701            "global docker_host must be applied when URI did not set host"
1702        );
1703    }
1704
1705    #[test]
1706    fn test_uri_param_wins_over_global_config() {
1707        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1708        let global =
1709            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1710        let mut config =
1711            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1712                .unwrap();
1713        assert_eq!(
1714            config.host.as_deref(),
1715            Some("unix:///override.sock"),
1716            "URI-set host should be parsed correctly"
1717        );
1718        config.apply_global_defaults(&global);
1719        assert_eq!(
1720            config.host.as_deref(),
1721            Some("unix:///override.sock"),
1722            "global config must NOT override a host already set by URI"
1723        );
1724    }
1725
1726    #[test]
1727    fn test_container_config_parses_name() {
1728        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1729        assert_eq!(config.name.as_deref(), Some("my-container"));
1730    }
1731
1732    #[test]
1733    fn test_parse_producer_operation_known() {
1734        assert_eq!(
1735            parse_producer_operation("list").unwrap(),
1736            ProducerOperation::List
1737        );
1738        assert_eq!(
1739            parse_producer_operation("run").unwrap(),
1740            ProducerOperation::Run
1741        );
1742        assert_eq!(
1743            parse_producer_operation("start").unwrap(),
1744            ProducerOperation::Start
1745        );
1746        assert_eq!(
1747            parse_producer_operation("stop").unwrap(),
1748            ProducerOperation::Stop
1749        );
1750        assert_eq!(
1751            parse_producer_operation("remove").unwrap(),
1752            ProducerOperation::Remove
1753        );
1754    }
1755
1756    #[test]
1757    fn test_parse_producer_operation_unknown() {
1758        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1759        match err {
1760            CamelError::ProcessorError(msg) => {
1761                assert!(
1762                    msg.contains("Unknown container operation"),
1763                    "Unexpected error message: {}",
1764                    msg
1765                );
1766            }
1767            _ => panic!("Expected ProcessorError for unknown operation"),
1768        }
1769    }
1770
1771    #[test]
1772    fn test_parse_producer_operation_new_variants() {
1773        assert_eq!(
1774            parse_producer_operation("exec").unwrap(),
1775            ProducerOperation::Exec
1776        );
1777        assert_eq!(
1778            parse_producer_operation("network-create").unwrap(),
1779            ProducerOperation::NetworkCreate
1780        );
1781        assert_eq!(
1782            parse_producer_operation("network-connect").unwrap(),
1783            ProducerOperation::NetworkConnect
1784        );
1785        assert_eq!(
1786            parse_producer_operation("network-disconnect").unwrap(),
1787            ProducerOperation::NetworkDisconnect
1788        );
1789        assert_eq!(
1790            parse_producer_operation("network-remove").unwrap(),
1791            ProducerOperation::NetworkRemove
1792        );
1793        assert_eq!(
1794            parse_producer_operation("network-list").unwrap(),
1795            ProducerOperation::NetworkList
1796        );
1797    }
1798
1799    #[test]
1800    fn test_resolve_container_name_header_overrides_config() {
1801        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1802        let mut exchange = Exchange::new(Message::new(""));
1803        exchange.input.set_header(
1804            HEADER_CONTAINER_NAME,
1805            serde_json::Value::String("header-name".to_string()),
1806        );
1807
1808        let resolved = resolve_container_name(&exchange, &config);
1809        assert_eq!(resolved.as_deref(), Some("header-name"));
1810    }
1811
1812    #[test]
1813    fn test_container_config_rejects_tcp_host() {
1814        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1815        let err = config.connect_docker_client().unwrap_err();
1816        match err {
1817            CamelError::ProcessorError(msg) => {
1818                assert!(
1819                    msg.to_lowercase().contains("tcp"),
1820                    "Expected TCP scheme error, got: {}",
1821                    msg
1822                );
1823            }
1824            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1825        }
1826    }
1827
1828    #[tokio::test]
1829    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1830        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1831        let remove_called_clone = remove_called.clone();
1832
1833        let result = run_container_with_cleanup(
1834            || async { Ok("container-123".to_string()) },
1835            |_id| async move {
1836                Err(CamelError::ProcessorError(
1837                    "Failed to start container".to_string(),
1838                ))
1839            },
1840            move |_id| {
1841                let remove_called_inner = remove_called_clone.clone();
1842                async move {
1843                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1844                    Ok(())
1845                }
1846            },
1847        )
1848        .await;
1849
1850        assert!(result.is_err(), "Expected start failure to bubble up");
1851        assert!(
1852            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1853            "Expected cleanup to remove container"
1854        );
1855    }
1856
1857    #[test]
1858    fn test_container_component_creates_endpoint() {
1859        let component = ContainerComponent::new();
1860        assert_eq!(component.scheme(), "container");
1861        let ctx = NoOpComponentContext;
1862        let endpoint = component
1863            .create_endpoint("container:run?image=alpine", &ctx)
1864            .unwrap();
1865        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1866    }
1867
1868    #[test]
1869    fn test_container_config_parses_ports() {
1870        let config =
1871            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1872        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1873    }
1874
1875    #[test]
1876    fn test_container_config_parses_env() {
1877        let config =
1878            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1879        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1880    }
1881
1882    #[test]
1883    fn test_container_config_parses_logs_options() {
1884        let config = ContainerConfig::from_uri(
1885            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1886        )
1887        .unwrap();
1888        assert_eq!(config.operation, "logs");
1889        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1890        assert!(config.follow);
1891        assert!(config.timestamps);
1892        assert_eq!(config.tail.as_deref(), Some("100"));
1893    }
1894
1895    #[test]
1896    fn test_container_config_logs_defaults() {
1897        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1898        assert!(config.follow); // default: true
1899        assert!(!config.timestamps); // default: false
1900        assert!(config.tail.is_none()); // default: None (all)
1901    }
1902
1903    #[test]
1904    fn test_parse_ports_single() {
1905        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1906        let (exposed, bindings) = config.parse_ports().unwrap();
1907
1908        assert!(exposed.contains(&"80/tcp".to_string()));
1909        assert!(bindings.contains_key("80/tcp"));
1910
1911        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1912        assert_eq!(binding.len(), 1);
1913        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1914    }
1915
1916    #[test]
1917    fn test_parse_ports_multiple() {
1918        let config =
1919            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1920        let (exposed, bindings) = config.parse_ports().unwrap();
1921
1922        assert!(exposed.contains(&"80/tcp".to_string()));
1923        assert!(exposed.contains(&"443/tcp".to_string()));
1924        assert_eq!(bindings.len(), 2);
1925    }
1926
1927    #[test]
1928    fn test_parse_ports_with_protocol() {
1929        let config =
1930            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1931                .unwrap();
1932        let (exposed, _bindings) = config.parse_ports().unwrap();
1933
1934        assert!(exposed.contains(&"80/tcp".to_string()));
1935        assert!(exposed.contains(&"53/udp".to_string()));
1936    }
1937
1938    #[test]
1939    fn test_parse_ports_none() {
1940        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1941        assert!(config.parse_ports().is_none());
1942    }
1943
1944    #[test]
1945    fn test_parse_env_single() {
1946        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1947        let env = config.parse_env().unwrap();
1948
1949        assert_eq!(env.len(), 1);
1950        assert_eq!(env[0], "FOO=bar");
1951    }
1952
1953    #[test]
1954    fn test_parse_env_multiple() {
1955        let config =
1956            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1957                .unwrap();
1958        let env = config.parse_env().unwrap();
1959
1960        assert_eq!(env.len(), 3);
1961        assert!(env.contains(&"FOO=bar".to_string()));
1962        assert!(env.contains(&"BAZ=qux".to_string()));
1963        assert!(env.contains(&"NUM=123".to_string()));
1964    }
1965
1966    #[test]
1967    fn test_parse_env_none() {
1968        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1969        assert!(config.parse_env().is_none());
1970    }
1971
1972    use camel_component_api::Message;
1973    use std::sync::Arc;
1974
1975    #[tokio::test]
1976    async fn test_container_producer_resolves_operation_from_header() {
1977        // Try to connect to Docker - if fails, return early
1978        let docker = match Docker::connect_with_local_defaults() {
1979            Ok(d) => d,
1980            Err(_) => {
1981                eprintln!("Skipping test: Could not connect to Docker daemon");
1982                return;
1983            }
1984        };
1985
1986        if docker.ping().await.is_err() {
1987            eprintln!("Skipping test: Docker daemon not responding to ping");
1988            return;
1989        }
1990
1991        let component = ContainerComponent::new();
1992        let ctx = NoOpComponentContext;
1993        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1994
1995        let ctx = ProducerContext::new();
1996        let mut producer = endpoint.create_producer(&ctx).unwrap();
1997
1998        let mut exchange = Exchange::new(Message::new(""));
1999        exchange
2000            .input
2001            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2002
2003        use tower::ServiceExt;
2004        let result = producer
2005            .ready()
2006            .await
2007            .unwrap()
2008            .call(exchange)
2009            .await
2010            .unwrap();
2011
2012        // For list operation, the result header should be on the input message
2013        assert_eq!(
2014            result
2015                .input
2016                .header(HEADER_ACTION_RESULT)
2017                .map(|v| v.as_str().unwrap()),
2018            Some("success")
2019        );
2020    }
2021
2022    #[tokio::test]
2023    async fn test_container_producer_connection_error_on_invalid_host() {
2024        // Test that an invalid host (nonexistent socket) results in a connection error
2025        let component = ContainerComponent::new();
2026        let ctx = NoOpComponentContext;
2027        let endpoint = component
2028            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2029            .unwrap();
2030
2031        let ctx = ProducerContext::new();
2032        let result = endpoint.create_producer(&ctx);
2033
2034        // The producer should return an error because it cannot connect to the invalid socket
2035        assert!(
2036            result.is_err(),
2037            "Expected error when connecting to invalid host"
2038        );
2039        let err = result.unwrap_err();
2040        match &err {
2041            CamelError::ProcessorError(msg) => {
2042                assert!(
2043                    msg.to_lowercase().contains("connection")
2044                        || msg.to_lowercase().contains("connect")
2045                        || msg.to_lowercase().contains("socket")
2046                        || msg.contains("docker"),
2047                    "Error message should indicate connection failure, got: {}",
2048                    msg
2049                );
2050            }
2051            _ => panic!("Expected ProcessorError, got: {:?}", err),
2052        }
2053    }
2054
2055    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2056    #[tokio::test]
2057    async fn test_container_producer_lifecycle_operations_missing_id() {
2058        // Try to connect to Docker - if fails, return early
2059        let docker = match Docker::connect_with_local_defaults() {
2060            Ok(d) => d,
2061            Err(_) => {
2062                eprintln!("Skipping test: Could not connect to Docker daemon");
2063                return;
2064            }
2065        };
2066
2067        if docker.ping().await.is_err() {
2068            eprintln!("Skipping test: Docker daemon not responding to ping");
2069            return;
2070        }
2071
2072        let component = ContainerComponent::new();
2073        let ctx = NoOpComponentContext;
2074        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2075        let ctx = ProducerContext::new();
2076        let mut producer = endpoint.create_producer(&ctx).unwrap();
2077
2078        // Test each lifecycle operation without CamelContainerId header
2079        for operation in ["start", "stop", "remove"] {
2080            let mut exchange = Exchange::new(Message::new(""));
2081            exchange.input.set_header(
2082                HEADER_ACTION,
2083                serde_json::Value::String(operation.to_string()),
2084            );
2085            // Deliberately NOT setting CamelContainerId header
2086
2087            use tower::ServiceExt;
2088            let result = producer.ready().await.unwrap().call(exchange).await;
2089
2090            assert!(
2091                result.is_err(),
2092                "Expected error for {} operation without CamelContainerId",
2093                operation
2094            );
2095            let err = result.unwrap_err();
2096            match &err {
2097                CamelError::ProcessorError(msg) => {
2098                    assert!(
2099                        msg.contains(HEADER_CONTAINER_ID),
2100                        "Error message should mention {}, got: {}",
2101                        HEADER_CONTAINER_ID,
2102                        msg
2103                    );
2104                }
2105                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2106            }
2107        }
2108    }
2109
2110    /// Test that stop operation returns an error for a nonexistent container.
2111    #[tokio::test]
2112    async fn test_container_producer_stop_nonexistent() {
2113        // Try to connect to Docker - if fails, return early
2114        let docker = match Docker::connect_with_local_defaults() {
2115            Ok(d) => d,
2116            Err(_) => {
2117                eprintln!("Skipping test: Could not connect to Docker daemon");
2118                return;
2119            }
2120        };
2121
2122        if docker.ping().await.is_err() {
2123            eprintln!("Skipping test: Docker daemon not responding to ping");
2124            return;
2125        }
2126
2127        let component = ContainerComponent::new();
2128        let ctx = NoOpComponentContext;
2129        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2130        let ctx = ProducerContext::new();
2131        let mut producer = endpoint.create_producer(&ctx).unwrap();
2132
2133        let mut exchange = Exchange::new(Message::new(""));
2134        exchange
2135            .input
2136            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2137        exchange.input.set_header(
2138            HEADER_CONTAINER_ID,
2139            serde_json::Value::String("nonexistent-container-123".into()),
2140        );
2141
2142        use tower::ServiceExt;
2143        let result = producer.ready().await.unwrap().call(exchange).await;
2144
2145        assert!(
2146            result.is_err(),
2147            "Expected error when stopping nonexistent container"
2148        );
2149        let err = result.unwrap_err();
2150        match &err {
2151            CamelError::ProcessorError(msg) => {
2152                // Docker API returns 404 with "No such container" message
2153                assert!(
2154                    msg.to_lowercase().contains("no such container")
2155                        || msg.to_lowercase().contains("not found")
2156                        || msg.contains("404"),
2157                    "Error message should indicate container not found, got: {}",
2158                    msg
2159                );
2160            }
2161            _ => panic!("Expected ProcessorError, got: {:?}", err),
2162        }
2163    }
2164
2165    /// Test that run operation returns an error when no image is provided.
2166    #[tokio::test]
2167    async fn test_container_producer_run_missing_image() {
2168        // Try to connect to Docker - if fails, return early
2169        let docker = match Docker::connect_with_local_defaults() {
2170            Ok(d) => d,
2171            Err(_) => {
2172                eprintln!("Skipping test: Could not connect to Docker daemon");
2173                return;
2174            }
2175        };
2176
2177        if docker.ping().await.is_err() {
2178            eprintln!("Skipping test: Docker daemon not responding to ping");
2179            return;
2180        }
2181
2182        // Create producer without an image in the URI
2183        let component = ContainerComponent::new();
2184        let ctx = NoOpComponentContext;
2185        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2186        let ctx = ProducerContext::new();
2187        let mut producer = endpoint.create_producer(&ctx).unwrap();
2188
2189        let mut exchange = Exchange::new(Message::new(""));
2190        exchange
2191            .input
2192            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2193        // Deliberately NOT setting CamelContainerImage header
2194
2195        use tower::ServiceExt;
2196        let result = producer.ready().await.unwrap().call(exchange).await;
2197
2198        assert!(
2199            result.is_err(),
2200            "Expected error for run operation without image"
2201        );
2202        let err = result.unwrap_err();
2203        match &err {
2204            CamelError::ProcessorError(msg) => {
2205                assert!(
2206                    msg.to_lowercase().contains("image"),
2207                    "Error message should mention 'image', got: {}",
2208                    msg
2209                );
2210            }
2211            _ => panic!("Expected ProcessorError, got: {:?}", err),
2212        }
2213    }
2214
2215    /// Test that run operation uses image from header.
2216    #[tokio::test]
2217    async fn test_container_producer_run_image_from_header() {
2218        // Try to connect to Docker - if fails, return early
2219        let docker = match Docker::connect_with_local_defaults() {
2220            Ok(d) => d,
2221            Err(_) => {
2222                eprintln!("Skipping test: Could not connect to Docker daemon");
2223                return;
2224            }
2225        };
2226
2227        if docker.ping().await.is_err() {
2228            eprintln!("Skipping test: Docker daemon not responding to ping");
2229            return;
2230        }
2231
2232        // Create producer without an image in the URI
2233        let component = ContainerComponent::new();
2234        let ctx = NoOpComponentContext;
2235        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2236        let ctx = ProducerContext::new();
2237        let mut producer = endpoint.create_producer(&ctx).unwrap();
2238
2239        let mut exchange = Exchange::new(Message::new(""));
2240        exchange
2241            .input
2242            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2243        // Set a non-existent image to test that the run operation attempts to use it
2244        exchange.input.set_header(
2245            HEADER_IMAGE,
2246            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2247        );
2248
2249        use tower::ServiceExt;
2250        let result = producer.ready().await.unwrap().call(exchange).await;
2251
2252        // The operation should fail because the image doesn't exist
2253        assert!(
2254            result.is_err(),
2255            "Expected error when running container with nonexistent image"
2256        );
2257        let err = result.unwrap_err();
2258        match &err {
2259            CamelError::ProcessorError(msg) => {
2260                // Docker API returns an error about the missing image
2261                assert!(
2262                    msg.to_lowercase().contains("no such image")
2263                        || msg.to_lowercase().contains("not found")
2264                        || msg.to_lowercase().contains("image")
2265                        || msg.to_lowercase().contains("pull")
2266                        || msg.contains("404"),
2267                    "Error message should indicate image issue, got: {}",
2268                    msg
2269                );
2270            }
2271            _ => panic!("Expected ProcessorError, got: {:?}", err),
2272        }
2273    }
2274
2275    /// Integration test: Actually run a container with alpine:latest.
2276    /// This test verifies the full flow: create → start → set headers.
2277    #[tokio::test]
2278    async fn test_container_producer_run_alpine_container() {
2279        let docker = match Docker::connect_with_local_defaults() {
2280            Ok(d) => d,
2281            Err(_) => {
2282                eprintln!("Skipping test: Could not connect to Docker daemon");
2283                return;
2284            }
2285        };
2286
2287        if docker.ping().await.is_err() {
2288            eprintln!("Skipping test: Docker daemon not responding to ping");
2289            return;
2290        }
2291
2292        // Pull alpine:latest if not present
2293        let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2294        let has_alpine = images
2295            .iter()
2296            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2297
2298        if !has_alpine {
2299            eprintln!("Pulling alpine:latest image...");
2300            let mut stream = docker.create_image(
2301                Some(CreateImageOptions {
2302                    from_image: Some("alpine:latest".to_string()),
2303                    ..Default::default()
2304                }),
2305                None,
2306                None,
2307            );
2308
2309            use futures::StreamExt;
2310            while let Some(_item) = stream.next().await {
2311                // Wait for pull to complete
2312            }
2313            eprintln!("Image pulled successfully");
2314        }
2315
2316        // Create producer
2317        let component = ContainerComponent::new();
2318        let ctx = NoOpComponentContext;
2319        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2320        let ctx = ProducerContext::new();
2321        let mut producer = endpoint.create_producer(&ctx).unwrap();
2322
2323        // Run container with unique name
2324        let timestamp = std::time::SystemTime::now()
2325            .duration_since(std::time::UNIX_EPOCH)
2326            .unwrap()
2327            .as_millis();
2328        let container_name = format!("test-rust-camel-{}", timestamp);
2329        let mut exchange = Exchange::new(Message::new(""));
2330        exchange.input.set_header(
2331            HEADER_IMAGE,
2332            serde_json::Value::String("alpine:latest".into()),
2333        );
2334        exchange.input.set_header(
2335            HEADER_CONTAINER_NAME,
2336            serde_json::Value::String(container_name.clone()),
2337        );
2338
2339        use tower::ServiceExt;
2340        let result = producer
2341            .ready()
2342            .await
2343            .unwrap()
2344            .call(exchange)
2345            .await
2346            .expect("Container run should succeed");
2347
2348        // Verify container ID was set
2349        let container_id = result
2350            .input
2351            .header(HEADER_CONTAINER_ID)
2352            .and_then(|v| v.as_str().map(|s| s.to_string()))
2353            .expect("Expected container ID header");
2354        assert!(!container_id.is_empty(), "Container ID should not be empty");
2355
2356        // Verify success header
2357        assert_eq!(
2358            result
2359                .input
2360                .header(HEADER_ACTION_RESULT)
2361                .and_then(|v| v.as_str()),
2362            Some("success")
2363        );
2364
2365        // Verify container exists in Docker
2366        let inspect = docker
2367            .inspect_container(&container_id, None)
2368            .await
2369            .expect("Container should exist");
2370        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2371
2372        // Cleanup: remove container
2373        docker
2374            .remove_container(
2375                &container_id,
2376                Some(RemoveContainerOptions {
2377                    force: true,
2378                    ..Default::default()
2379                }),
2380            )
2381            .await
2382            .ok();
2383
2384        eprintln!("✅ Container {} created and cleaned up", container_id);
2385    }
2386
2387    /// Test that consumer returns an error for unsupported operations.
2388    #[tokio::test]
2389    async fn test_container_consumer_unsupported_operation() {
2390        use tokio::sync::mpsc;
2391
2392        let component = ContainerComponent::new();
2393        let ctx = NoOpComponentContext;
2394        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2395        let mut consumer = endpoint.create_consumer().unwrap();
2396
2397        // Create a minimal ConsumerContext
2398        let (tx, _rx) = mpsc::channel(16);
2399        let cancel_token = tokio_util::sync::CancellationToken::new();
2400        let context = ConsumerContext::new(tx, cancel_token);
2401
2402        let result = consumer.start(context).await;
2403
2404        // Should return error because "run" is not a supported consumer operation
2405        assert!(
2406            result.is_err(),
2407            "Expected error for unsupported consumer operation"
2408        );
2409        let err = result.unwrap_err();
2410        match &err {
2411            CamelError::EndpointCreationFailed(msg) => {
2412                assert!(
2413                    msg.contains("Consumer only supports 'events' or 'logs'"),
2414                    "Error message should mention events or logs support, got: {}",
2415                    msg
2416                );
2417            }
2418            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2419        }
2420    }
2421
2422    #[test]
2423    fn test_container_consumer_concurrency_model_is_concurrent() {
2424        let consumer = ContainerConsumer {
2425            config: ContainerConfig::from_uri("container:events").unwrap(),
2426        };
2427
2428        assert_eq!(
2429            consumer.concurrency_model(),
2430            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2431        );
2432    }
2433
2434    /// Test that consumer gracefully shuts down when cancellation is requested.
2435    /// This test requires a running Docker daemon. If Docker is not available, the test
2436    /// will be ignored.
2437    #[tokio::test]
2438    async fn test_container_consumer_cancellation() {
2439        use std::sync::atomic::{AtomicBool, Ordering};
2440        use tokio::sync::mpsc;
2441
2442        // Try to connect to Docker - if fails, return early
2443        let docker = match Docker::connect_with_local_defaults() {
2444            Ok(d) => d,
2445            Err(_) => {
2446                eprintln!("Skipping test: Could not connect to Docker daemon");
2447                return;
2448            }
2449        };
2450
2451        if docker.ping().await.is_err() {
2452            eprintln!("Skipping test: Docker daemon not responding to ping");
2453            return;
2454        }
2455
2456        let component = ContainerComponent::new();
2457        let ctx = NoOpComponentContext;
2458        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2459        let mut consumer = endpoint.create_consumer().unwrap();
2460
2461        // Create a ConsumerContext
2462        let (tx, _rx) = mpsc::channel(16);
2463        let cancel_token = tokio_util::sync::CancellationToken::new();
2464        let context = ConsumerContext::new(tx, cancel_token.clone());
2465
2466        // Track if the consumer task has completed
2467        let completed = Arc::new(AtomicBool::new(false));
2468        let completed_clone = completed.clone();
2469
2470        // Spawn consumer in background
2471        let handle = tokio::spawn(async move {
2472            let result = consumer.start(context).await;
2473            // Mark as completed when done
2474            completed_clone.store(true, Ordering::SeqCst);
2475            result
2476        });
2477
2478        // Wait a bit for the consumer to start
2479        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2480
2481        // Consumer should still be running (not completed)
2482        assert!(
2483            !completed.load(Ordering::SeqCst),
2484            "Consumer should still be running before cancellation"
2485        );
2486
2487        // Request cancellation
2488        cancel_token.cancel();
2489
2490        // Wait for the task to complete (with timeout)
2491        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2492
2493        // Task should have completed (not timed out)
2494        assert!(
2495            result.is_ok(),
2496            "Consumer should gracefully shut down after cancellation"
2497        );
2498
2499        // Verify the consumer completed
2500        assert!(
2501            completed.load(Ordering::SeqCst),
2502            "Consumer should have completed after cancellation"
2503        );
2504    }
2505
2506    /// Integration test for listing containers.
2507    /// This test requires a running Docker daemon. If Docker is not available, the test
2508    /// will return early and be effectively ignored.
2509    #[tokio::test]
2510    async fn test_container_producer_list_containers() {
2511        // Try to connect to Docker using default config
2512        // If ping fails, return early (effectively ignoring this test)
2513        let docker = match Docker::connect_with_local_defaults() {
2514            Ok(d) => d,
2515            Err(_) => {
2516                eprintln!("Skipping test: Could not connect to Docker daemon");
2517                return;
2518            }
2519        };
2520
2521        if docker.ping().await.is_err() {
2522            eprintln!("Skipping test: Docker daemon not responding to ping");
2523            return;
2524        }
2525
2526        // Create producer with list operation
2527        let component = ContainerComponent::new();
2528        let ctx = NoOpComponentContext;
2529        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2530
2531        let ctx = ProducerContext::new();
2532        let mut producer = endpoint.create_producer(&ctx).unwrap();
2533
2534        // Create exchange with list operation in header
2535        let mut exchange = Exchange::new(Message::new(""));
2536        exchange
2537            .input
2538            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2539
2540        // Call the producer
2541        use tower::ServiceExt;
2542        let result = producer
2543            .ready()
2544            .await
2545            .unwrap()
2546            .call(exchange)
2547            .await
2548            .expect("Producer should succeed when Docker is available");
2549
2550        // Assert that the input exchange body is a JSON array
2551        // (because list_containers should put the JSON result in the body)
2552        match &result.input.body {
2553            camel_component_api::Body::Json(json_value) => {
2554                assert!(
2555                    json_value.is_array(),
2556                    "Expected input body to be a JSON array, got: {:?}",
2557                    json_value
2558                );
2559            }
2560            other => panic!("Expected Body::Json with array, got: {:?}", other),
2561        }
2562    }
2563
2564    #[test]
2565    fn test_container_config_parses_volumes() {
2566        let config = ContainerConfig::from_uri(
2567            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2568        )
2569        .unwrap();
2570        assert_eq!(
2571            config.volumes.as_deref(),
2572            Some("./html:/usr/share/nginx/html:ro")
2573        );
2574    }
2575
2576    #[test]
2577    fn test_container_config_parses_exec_params() {
2578        let config = ContainerConfig::from_uri(
2579            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2580        )
2581        .unwrap();
2582        assert_eq!(config.operation, "exec");
2583        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2584        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2585        assert_eq!(config.user.as_deref(), Some("root"));
2586        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2587        assert!(config.detach);
2588    }
2589
2590    #[test]
2591    fn test_container_config_parses_network_create_params() {
2592        let config =
2593            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2594                .unwrap();
2595        assert_eq!(config.operation, "network-create");
2596        assert_eq!(config.name.as_deref(), Some("my-net"));
2597        assert_eq!(config.driver.as_deref(), Some("bridge"));
2598    }
2599
2600    #[test]
2601    fn test_container_config_defaults_new_fields() {
2602        let config = ContainerConfig::from_uri("container:list").unwrap();
2603        assert!(config.volumes.is_none());
2604        assert!(config.user.is_none());
2605        assert!(config.workdir.is_none());
2606        assert!(!config.detach);
2607        assert!(config.driver.is_none());
2608        assert!(!config.force);
2609    }
2610
2611    #[test]
2612    fn test_parse_volumes_bind_mount() {
2613        let config = ContainerConfig::from_uri(
2614            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2615        )
2616        .unwrap();
2617        let (binds, anon) = config.parse_volumes().unwrap();
2618        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2619        assert!(anon.is_empty());
2620    }
2621
2622    #[test]
2623    fn test_parse_volumes_named_volume() {
2624        let config =
2625            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2626                .unwrap();
2627        let (binds, anon) = config.parse_volumes().unwrap();
2628        assert_eq!(binds, vec!["data:/var/lib/data"]);
2629        assert!(anon.is_empty());
2630    }
2631
2632    #[test]
2633    fn test_parse_volumes_anonymous() {
2634        let config =
2635            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2636        let (binds, anon) = config.parse_volumes().unwrap();
2637        assert!(binds.is_empty());
2638        assert!(anon.contains(&"/tmp/app-data".to_string()));
2639    }
2640
2641    #[test]
2642    fn test_parse_volumes_anonymous_with_mode() {
2643        let config =
2644            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2645                .unwrap();
2646        let (binds, anon) = config.parse_volumes().unwrap();
2647        assert!(binds.is_empty());
2648        assert!(anon.contains(&"/tmp/app-data".to_string()));
2649    }
2650
2651    #[test]
2652    fn test_parse_volumes_multiple() {
2653        let config = ContainerConfig::from_uri(
2654            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2655        )
2656        .unwrap();
2657        let (binds, anon) = config.parse_volumes().unwrap();
2658        assert_eq!(binds.len(), 2);
2659        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2660        assert!(binds.contains(&"data:/var/log/app".to_string()));
2661        assert!(anon.is_empty());
2662    }
2663
2664    #[test]
2665    fn test_parse_volumes_mixed() {
2666        let config = ContainerConfig::from_uri(
2667            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2668        )
2669        .unwrap();
2670        let (binds, anon) = config.parse_volumes().unwrap();
2671        assert_eq!(binds.len(), 1);
2672        assert!(anon.contains(&"/tmp/cache".to_string()));
2673    }
2674
2675    #[test]
2676    fn test_parse_volumes_none() {
2677        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2678        assert!(config.parse_volumes().is_none());
2679    }
2680
2681    #[test]
2682    fn test_parse_volumes_empty_entry_skipped() {
2683        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2684        assert!(config.parse_volumes().is_none());
2685    }
2686
2687    #[test]
2688    fn test_parse_volumes_rw_mode() {
2689        let config =
2690            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2691                .unwrap();
2692        let (binds, _) = config.parse_volumes().unwrap();
2693        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2694    }
2695
2696    #[tokio::test]
2697    async fn test_container_producer_network_lifecycle() {
2698        let docker = match Docker::connect_with_local_defaults() {
2699            Ok(d) => d,
2700            Err(_) => {
2701                eprintln!("Skipping test: Could not connect to Docker daemon");
2702                return;
2703            }
2704        };
2705        if docker.ping().await.is_err() {
2706            eprintln!("Skipping test: Docker daemon not responding to ping");
2707            return;
2708        }
2709
2710        let network_name = format!("camel-test-{}", std::process::id());
2711
2712        let component = ContainerComponent::new();
2713        let component_ctx = NoOpComponentContext;
2714
2715        // Create network
2716        let endpoint = component
2717            .create_endpoint(
2718                &format!("container:network-create?name={}", network_name),
2719                &component_ctx,
2720            )
2721            .unwrap();
2722        let ctx = ProducerContext::new();
2723        let mut producer = endpoint.create_producer(&ctx).unwrap();
2724
2725        let mut exchange = Exchange::new(Message::new(""));
2726        exchange.input.set_header(
2727            HEADER_ACTION,
2728            serde_json::Value::String("network-create".into()),
2729        );
2730
2731        use tower::ServiceExt;
2732        let result = producer
2733            .ready()
2734            .await
2735            .unwrap()
2736            .call(exchange)
2737            .await
2738            .expect("Network create should succeed");
2739
2740        let network_id = result
2741            .input
2742            .header(HEADER_NETWORK)
2743            .and_then(|v| v.as_str().map(|s| s.to_string()))
2744            .expect("Should have network ID");
2745
2746        assert!(!network_id.is_empty());
2747
2748        // List networks
2749        let endpoint = component
2750            .create_endpoint("container:network-list", &component_ctx)
2751            .unwrap();
2752        let mut producer = endpoint.create_producer(&ctx).unwrap();
2753
2754        let mut exchange = Exchange::new(Message::new(""));
2755        exchange.input.set_header(
2756            HEADER_ACTION,
2757            serde_json::Value::String("network-list".into()),
2758        );
2759
2760        let list_result = producer
2761            .ready()
2762            .await
2763            .unwrap()
2764            .call(exchange)
2765            .await
2766            .expect("Network list should succeed");
2767
2768        match &list_result.input.body {
2769            Body::Json(json_value) => {
2770                assert!(json_value.is_array(), "Expected JSON array");
2771            }
2772            other => panic!("Expected Body::Json, got: {:?}", other),
2773        }
2774
2775        // Remove network
2776        let endpoint = component
2777            .create_endpoint(
2778                &format!("container:network-remove?network={}", network_name),
2779                &component_ctx,
2780            )
2781            .unwrap();
2782        let mut producer = endpoint.create_producer(&ctx).unwrap();
2783
2784        let mut exchange = Exchange::new(Message::new(""));
2785        exchange.input.set_header(
2786            HEADER_ACTION,
2787            serde_json::Value::String("network-remove".into()),
2788        );
2789
2790        let remove_result = producer
2791            .ready()
2792            .await
2793            .unwrap()
2794            .call(exchange)
2795            .await
2796            .expect("Network remove should succeed");
2797
2798        let action_result = remove_result
2799            .input
2800            .header(HEADER_ACTION_RESULT)
2801            .and_then(|v| v.as_str());
2802        assert_eq!(action_result, Some("success"));
2803    }
2804}