Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24/// Global tracker for containers created by this component.
25/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
26static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29/// Registers a container ID for tracking (will be cleaned up on shutdown).
30fn track_container(id: String) {
31    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32        tracker.insert(id);
33    }
34}
35
36/// Removes a container ID from tracking (when it's been removed naturally).
37fn untrack_container(id: &str) {
38    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39        tracker.remove(id);
40    }
41}
42
43/// Cleans up all tracked containers. Call this on application shutdown.
44pub async fn cleanup_tracked_containers() {
45    let ids: Vec<String> = {
46        match CONTAINER_TRACKER.lock() {
47            Ok(tracker) => tracker.iter().cloned().collect(),
48            Err(_) => return,
49        }
50    };
51
52    if ids.is_empty() {
53        return;
54    }
55
56    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58    let docker = match Docker::connect_with_local_defaults() {
59        Ok(d) => d,
60        Err(e) => {
61            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62            return;
63        }
64    };
65
66    for id in ids {
67        match docker
68            .remove_container(
69                &id,
70                Some(bollard::container::RemoveContainerOptions {
71                    force: true,
72                    ..Default::default()
73                }),
74            )
75            .await
76        {
77            Ok(_) => {
78                tracing::debug!("Cleaned up container {}", id);
79                untrack_container(&id);
80            }
81            Err(e) => {
82                tracing::warn!("Failed to cleanup container {}: {}", id, e);
83            }
84        }
85    }
86}
87
88// Header constants for container operations
89
90/// Timeout (seconds) for connecting to the Docker daemon.
91const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
94pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96/// Header key for specifying the container image to use for "run" operations.
97pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99/// Header key for specifying or receiving the container ID.
100pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102/// Header key for the log stream type (stdout or stderr).
103pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105/// Header key for the log timestamp.
106pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108/// Header key for specifying the container name for "run" operations.
109pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111/// Header key for the result status of a container operation (e.g., "success").
112pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114/// Header key for specifying the command to execute in a container.
115pub const HEADER_CMD: &str = "CamelContainerCmd";
116
117/// Header key for specifying the network name for network operations.
118pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
119
120/// Header key for the exit code of an exec operation.
121pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
122
123/// Header key for specifying volume mounts.
124pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
125
126/// Header key for the exec instance ID.
127pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
128
129// ---------------------------------------------------------------------------
130// ContainerGlobalConfig
131// ---------------------------------------------------------------------------
132
133/// Global configuration for Container component.
134/// Supports serde deserialization with defaults and builder methods.
135/// These are the fallback defaults when URI params are not set.
136#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
137#[serde(default)]
138pub struct ContainerGlobalConfig {
139    /// The Docker host URL (default: "unix:///var/run/docker.sock").
140    pub docker_host: String,
141}
142
143impl Default for ContainerGlobalConfig {
144    fn default() -> Self {
145        Self {
146            docker_host: "unix:///var/run/docker.sock".to_string(),
147        }
148    }
149}
150
151impl ContainerGlobalConfig {
152    pub fn new() -> Self {
153        Self::default()
154    }
155
156    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
157        self.docker_host = v.into();
158        self
159    }
160}
161
162// ---------------------------------------------------------------------------
163// ContainerConfig (endpoint configuration)
164// ---------------------------------------------------------------------------
165
166/// Configuration for the container component endpoint.
167///
168/// This struct holds the parsed URI configuration including the operation type,
169/// optional container image, and Docker host connection details.
170#[derive(Debug, Clone)]
171pub struct ContainerConfig {
172    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
173    pub operation: String,
174    /// The container image to use for "run" operations (can be overridden via header).
175    pub image: Option<String>,
176    /// The container name to use for "run" operations (can be overridden via header).
177    pub name: Option<String>,
178    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
179    pub host: Option<String>,
180    /// Command to run in the container (e.g., "sleep 30").
181    pub cmd: Option<String>,
182    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
183    pub ports: Option<String>,
184    /// Environment variables in format "KEY=value,KEY2=value2".
185    pub env: Option<String>,
186    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
187    pub network: Option<String>,
188    /// Container ID or name for logs consumer.
189    pub container_id: Option<String>,
190    /// Follow log output (default: true for consumer).
191    pub follow: bool,
192    /// Include timestamps in logs (default: false).
193    pub timestamps: bool,
194    /// Number of lines to show from the end of logs (default: all).
195    pub tail: Option<String>,
196    /// Automatically pull the image if not present (default: true).
197    pub auto_pull: bool,
198    /// Automatically remove the container when it exits (default: true).
199    pub auto_remove: bool,
200    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
201    pub volumes: Option<String>,
202    /// User to run the container or exec command as (e.g., "root").
203    pub user: Option<String>,
204    /// Working directory inside the container.
205    pub workdir: Option<String>,
206    /// Whether to detach from the exec process (default: false).
207    pub detach: bool,
208    /// Network driver for network-create (e.g., "bridge", "overlay").
209    pub driver: Option<String>,
210    /// Whether to force the operation (default: false).
211    pub force: bool,
212}
213
214impl ContainerConfig {
215    /// Parses a container URI into a `ContainerConfig`.
216    ///
217    /// # Arguments
218    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
219    ///
220    /// # Errors
221    /// Returns an error if the URI scheme is not "container".
222    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
223        let parts = parse_uri(uri)?;
224        if parts.scheme != "container" {
225            return Err(CamelError::InvalidUri(format!(
226                "expected scheme 'container', got '{}'",
227                parts.scheme
228            )));
229        }
230
231        let image = parts.params.get("image").cloned();
232        let name = parts.params.get("name").cloned();
233        let cmd = parts.params.get("cmd").cloned();
234        let ports = parts.params.get("ports").cloned();
235        let env = parts.params.get("env").cloned();
236        let network = parts.params.get("network").cloned();
237        let container_id = parts.params.get("containerId").cloned();
238        let follow = parts
239            .params
240            .get("follow")
241            .map(|v| v.eq_ignore_ascii_case("true"))
242            .unwrap_or(true);
243        let timestamps = parts
244            .params
245            .get("timestamps")
246            .map(|v| v.eq_ignore_ascii_case("true"))
247            .unwrap_or(false);
248        let tail = parts.params.get("tail").cloned();
249        let auto_pull = parts
250            .params
251            .get("autoPull")
252            .map(|v| v.eq_ignore_ascii_case("true"))
253            .unwrap_or(true);
254        let auto_remove = parts
255            .params
256            .get("autoRemove")
257            .map(|v| v.eq_ignore_ascii_case("true"))
258            .unwrap_or(true);
259        // host is only set from URI param; global config defaults are applied later
260        let host = parts.params.get("host").cloned();
261        let volumes = parts.params.get("volumes").cloned();
262        let user = parts.params.get("user").cloned();
263        let workdir = parts.params.get("workdir").cloned();
264        let detach = parts
265            .params
266            .get("detach")
267            .map(|v| v.eq_ignore_ascii_case("true"))
268            .unwrap_or(false);
269        let driver = parts.params.get("driver").cloned();
270        let force = parts
271            .params
272            .get("force")
273            .map(|v| v.eq_ignore_ascii_case("true"))
274            .unwrap_or(false);
275
276        Ok(Self {
277            operation: parts.path,
278            image,
279            name,
280            host,
281            cmd,
282            ports,
283            env,
284            network,
285            container_id,
286            follow,
287            timestamps,
288            tail,
289            auto_pull,
290            auto_remove,
291            volumes,
292            user,
293            workdir,
294            detach,
295            driver,
296            force,
297        })
298    }
299
300    /// Apply global config defaults to this endpoint config.
301    /// Only sets values that are currently `None`.
302    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
303        if self.host.is_none() {
304            self.host = Some(global.docker_host.clone());
305        }
306    }
307
308    fn docker_socket_path(&self) -> Result<&str, CamelError> {
309        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
310            "npipe:////./pipe/docker_engine"
311        } else {
312            "unix:///var/run/docker.sock"
313        });
314
315        if host.starts_with("unix://") || host.starts_with("npipe://") {
316            return Ok(host);
317        }
318
319        if host.contains("://") {
320            return Err(CamelError::ProcessorError(format!(
321                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
322                host
323            )));
324        }
325
326        Ok(host)
327    }
328
329    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
330        let socket_path = self.docker_socket_path()?;
331        Docker::connect_with_socket(
332            socket_path,
333            DOCKER_CONNECT_TIMEOUT_SECS,
334            bollard::API_DEFAULT_VERSION,
335        )
336        .map_err(|e| {
337            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
338        })
339    }
340
341    /// Connects to the Docker daemon using the configured host.
342    ///
343    /// This method establishes a Unix socket connection to Docker and verifies
344    /// the connection by sending a ping request.
345    ///
346    /// # Errors
347    /// Returns an error if the connection fails or the ping request fails.
348    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
349        let docker = self.connect_docker_client()?;
350        docker
351            .ping()
352            .await
353            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
354        Ok(docker)
355    }
356
357    #[allow(clippy::type_complexity)]
358    fn parse_ports(
359        &self,
360    ) -> Option<(
361        HashMap<String, HashMap<(), ()>>,
362        HashMap<String, Option<Vec<PortBinding>>>,
363    )> {
364        let ports_str = self.ports.as_ref()?;
365
366        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
367        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
368
369        for mapping in ports_str.split(',') {
370            let mapping = mapping.trim();
371            if mapping.is_empty() {
372                continue;
373            }
374
375            let (host_port, container_spec) = mapping.split_once(':')?;
376
377            let (container_port, protocol) = if container_spec.contains('/') {
378                let parts: Vec<&str> = container_spec.split('/').collect();
379                (parts[0], parts[1])
380            } else {
381                (container_spec, "tcp")
382            };
383
384            let container_key = format!("{}/{}", container_port, protocol);
385
386            exposed_ports.insert(container_key.clone(), HashMap::new());
387
388            port_bindings.insert(
389                container_key,
390                Some(vec![PortBinding {
391                    host_ip: None,
392                    host_port: Some(host_port.to_string()),
393                }]),
394            );
395        }
396
397        if exposed_ports.is_empty() {
398            None
399        } else {
400            Some((exposed_ports, port_bindings))
401        }
402    }
403
404    fn parse_env(&self) -> Option<Vec<String>> {
405        let env_str = self.env.as_ref()?;
406
407        let env_vars: Vec<String> = env_str
408            .split(',')
409            .map(|s| s.trim().to_string())
410            .filter(|s| !s.is_empty())
411            .collect();
412
413        if env_vars.is_empty() {
414            None
415        } else {
416            Some(env_vars)
417        }
418    }
419
420    #[cfg(test)]
421    #[allow(clippy::type_complexity)]
422    fn parse_volumes(&self) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
423        self.volumes.as_deref().and_then(parse_volume_str)
424    }
425}
426
427/// Parses a volume specification string into bind mounts and anonymous volumes.
428///
429/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
430/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
431#[allow(clippy::type_complexity)]
432fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
433    let mut binds: Vec<String> = Vec::new();
434    let mut anonymous_volumes: HashMap<String, HashMap<(), ()>> = HashMap::new();
435
436    for entry in volumes_str.split(',') {
437        let entry = entry.trim();
438        if entry.is_empty() {
439            continue;
440        }
441
442        let segments: Vec<&str> = entry.split(':').collect();
443
444        match segments.len() {
445            3 => {
446                let source = segments[0];
447                let target = segments[1];
448                let mode = segments[2];
449                if mode != "ro" && mode != "rw" {
450                    continue;
451                }
452                binds.push(format!("{}:{}:{}", source, target, mode));
453            }
454            2 => {
455                let a = segments[0];
456                let b = segments[1];
457                if b == "ro" || b == "rw" {
458                    anonymous_volumes.insert(a.to_string(), HashMap::new());
459                } else {
460                    binds.push(format!("{}:{}", a, b));
461                }
462            }
463            1 => {
464                anonymous_volumes.insert(segments[0].to_string(), HashMap::new());
465            }
466            _ => continue,
467        }
468    }
469
470    if binds.is_empty() && anonymous_volumes.is_empty() {
471        None
472    } else {
473        Some((binds, anonymous_volumes))
474    }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478enum ProducerOperation {
479    List,
480    Run,
481    Start,
482    Stop,
483    Remove,
484    Exec,
485    NetworkCreate,
486    NetworkConnect,
487    NetworkDisconnect,
488    NetworkRemove,
489    NetworkList,
490}
491
492fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
493    match operation {
494        "list" => Ok(ProducerOperation::List),
495        "run" => Ok(ProducerOperation::Run),
496        "start" => Ok(ProducerOperation::Start),
497        "stop" => Ok(ProducerOperation::Stop),
498        "remove" => Ok(ProducerOperation::Remove),
499        "exec" => Ok(ProducerOperation::Exec),
500        "network-create" => Ok(ProducerOperation::NetworkCreate),
501        "network-connect" => Ok(ProducerOperation::NetworkConnect),
502        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
503        "network-remove" => Ok(ProducerOperation::NetworkRemove),
504        "network-list" => Ok(ProducerOperation::NetworkList),
505        _ => Err(CamelError::ProcessorError(format!(
506            "Unknown container operation: {}",
507            operation
508        ))),
509    }
510}
511
512fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
513    exchange
514        .input
515        .header(HEADER_CONTAINER_NAME)
516        .and_then(|v| v.as_str().map(|s| s.to_string()))
517        .or_else(|| config.name.clone())
518}
519
520async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
521    let images = docker
522        .list_images::<&str>(None)
523        .await
524        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
525
526    Ok(images.iter().any(|img| {
527        img.repo_tags
528            .iter()
529            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
530    }))
531}
532
533async fn pull_image_with_progress(
534    docker: &Docker,
535    image: &str,
536    timeout_secs: u64,
537) -> Result<(), CamelError> {
538    use futures::StreamExt;
539
540    tracing::info!("Pulling image: {}", image);
541
542    let mut stream = docker.create_image(
543        Some(bollard::image::CreateImageOptions {
544            from_image: image,
545            ..Default::default()
546        }),
547        None,
548        None,
549    );
550
551    let start = std::time::Instant::now();
552    let mut last_progress = std::time::Instant::now();
553
554    while let Some(item) = stream.next().await {
555        if start.elapsed().as_secs() > timeout_secs {
556            return Err(CamelError::ProcessorError(format!(
557                "Image pull timeout after {}s. Try manually: docker pull {}",
558                timeout_secs, image
559            )));
560        }
561
562        match item {
563            Ok(update) => {
564                // Log progress every 2 seconds
565                if last_progress.elapsed().as_secs() >= 2 {
566                    if let Some(status) = update.status {
567                        tracing::debug!("Pull progress: {}", status);
568                    }
569                    last_progress = std::time::Instant::now();
570                }
571            }
572            Err(e) => {
573                let err_str = e.to_string().to_lowercase();
574                if err_str.contains("unauthorized") || err_str.contains("401") {
575                    return Err(CamelError::ProcessorError(format!(
576                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
577                        image
578                    )));
579                }
580                if err_str.contains("not found") || err_str.contains("404") {
581                    return Err(CamelError::ProcessorError(format!(
582                        "Image '{}' not found in registry. Check the image name and tag",
583                        image
584                    )));
585                }
586                return Err(CamelError::ProcessorError(format!(
587                    "Failed to pull image '{}': {}",
588                    image, e
589                )));
590            }
591        }
592    }
593
594    tracing::info!("Successfully pulled image: {}", image);
595    Ok(())
596}
597
598async fn ensure_image_available(
599    docker: &Docker,
600    image: &str,
601    auto_pull: bool,
602    timeout_secs: u64,
603) -> Result<(), CamelError> {
604    if image_exists_locally(docker, image).await? {
605        tracing::debug!("Image '{}' already available locally", image);
606        return Ok(());
607    }
608
609    if !auto_pull {
610        return Err(CamelError::ProcessorError(format!(
611            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
612            image, image
613        )));
614    }
615
616    pull_image_with_progress(docker, image, timeout_secs).await
617}
618
619fn format_docker_event(event: &bollard::models::EventMessage) -> String {
620    let action = event.action.as_deref().unwrap_or("unknown");
621    let actor = event.actor.as_ref();
622
623    let container_name = actor
624        .and_then(|a| a.attributes.as_ref())
625        .and_then(|attrs| attrs.get("name"))
626        .map(|s| s.as_str())
627        .unwrap_or("unknown");
628
629    let image = actor
630        .and_then(|a| a.attributes.as_ref())
631        .and_then(|attrs| attrs.get("image"))
632        .map(|s| s.as_str())
633        .unwrap_or("");
634
635    let exit_code = actor
636        .and_then(|a| a.attributes.as_ref())
637        .and_then(|attrs| attrs.get("exitCode"))
638        .map(|s| s.as_str());
639
640    match action {
641        "create" => {
642            if image.is_empty() {
643                format!("[CREATE] Container {}", container_name)
644            } else {
645                format!("[CREATE] Container {} ({})", container_name, image)
646            }
647        }
648        "start" => format!("[START]  Container {}", container_name),
649        "die" => {
650            if let Some(code) = exit_code {
651                format!("[DIE]    Container {} (exit: {})", container_name, code)
652            } else {
653                format!("[DIE]    Container {}", container_name)
654            }
655        }
656        "destroy" => format!("[DESTROY] Container {}", container_name),
657        "stop" => format!("[STOP]   Container {}", container_name),
658        "pause" => format!("[PAUSE]  Container {}", container_name),
659        "unpause" => format!("[UNPAUSE] Container {}", container_name),
660        "restart" => format!("[RESTART] Container {}", container_name),
661        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
662    }
663}
664
665async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
666    create: CreateFn,
667    start: StartFn,
668    remove: RemoveFn,
669) -> Result<String, CamelError>
670where
671    CreateFn: FnOnce() -> CreateFut,
672    CreateFut: Future<Output = Result<String, CamelError>>,
673    StartFn: FnOnce(String) -> StartFut,
674    StartFut: Future<Output = Result<(), CamelError>>,
675    RemoveFn: FnOnce(String) -> RemoveFut,
676    RemoveFut: Future<Output = Result<(), CamelError>>,
677{
678    let container_id = create().await?;
679    if let Err(start_err) = start(container_id.clone()).await {
680        if let Err(remove_err) = remove(container_id.clone()).await {
681            return Err(CamelError::ProcessorError(format!(
682                "Failed to start container: {}. Cleanup failed: {}",
683                start_err, remove_err
684            )));
685        }
686        return Err(start_err);
687    }
688
689    Ok(container_id)
690}
691
692async fn handle_list(
693    docker: Docker,
694    _config: ContainerConfig,
695    exchange: &mut Exchange,
696) -> Result<(), CamelError> {
697    let containers = docker
698        .list_containers::<String>(None)
699        .await
700        .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
701
702    let json_value = serde_json::to_value(&containers).map_err(|e| {
703        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
704    })?;
705
706    exchange.input.body = Body::Json(json_value);
707    exchange.input.set_header(
708        HEADER_ACTION_RESULT,
709        serde_json::Value::String("success".to_string()),
710    );
711    Ok(())
712}
713
714async fn handle_run(
715    docker: Docker,
716    config: ContainerConfig,
717    exchange: &mut Exchange,
718) -> Result<(), CamelError> {
719    let image = exchange
720        .input
721        .header(HEADER_IMAGE)
722        .and_then(|v| v.as_str().map(|s| s.to_string()))
723        .or(config.image.clone())
724        .ok_or_else(|| {
725            CamelError::ProcessorError(
726                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
727            )
728        })?;
729
730    let pull_timeout = 300;
731    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
732        .await
733        .map_err(|e| {
734            CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
735        })?;
736
737    let container_name = resolve_container_name(exchange, &config);
738    let container_name_ref = container_name.as_deref().unwrap_or("");
739    let cmd_parts: Option<Vec<String>> = config
740        .cmd
741        .as_ref()
742        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
743    let auto_remove = config.auto_remove;
744    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
745    let env_vars = config.parse_env();
746    let network_mode = config.network.clone();
747
748    let volumes_str = exchange
749        .input
750        .header(HEADER_VOLUMES)
751        .and_then(|v| v.as_str().map(|s| s.to_string()))
752        .or(config.volumes.clone());
753    let (binds, anon_volumes) = volumes_str
754        .as_deref()
755        .and_then(parse_volume_str)
756        .unwrap_or_default();
757
758    let docker_create = docker.clone();
759    let docker_start = docker.clone();
760    let docker_remove = docker.clone();
761
762    let container_id = run_container_with_cleanup(
763        move || async move {
764            let create_options = bollard::container::CreateContainerOptions {
765                name: container_name_ref,
766                ..Default::default()
767            };
768            let container_config = bollard::container::Config::<String> {
769                image: Some(image.clone()),
770                cmd: cmd_parts,
771                env: env_vars,
772                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
773                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
774                host_config: Some(HostConfig {
775                    auto_remove: Some(auto_remove),
776                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
777                    network_mode,
778                    binds: if binds.is_empty() { None } else { Some(binds) },
779                    ..Default::default()
780                }),
781                ..Default::default()
782            };
783
784            let create_response = docker_create
785                .create_container(Some(create_options), container_config)
786                .await
787                .map_err(|e| {
788                    let err_str = e.to_string().to_lowercase();
789                    if err_str.contains("409") || err_str.contains("conflict") {
790                        CamelError::ProcessorError(format!(
791                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
792                            container_name_ref
793                        ))
794                    } else {
795                        CamelError::ProcessorError(format!(
796                            "Failed to create container: {}",
797                            e
798                        ))
799                    }
800                })?;
801
802            Ok(create_response.id)
803        },
804        move |container_id| async move {
805            docker_start
806                .start_container::<String>(&container_id, None)
807                .await
808                .map_err(|e| {
809                    CamelError::ProcessorError(format!(
810                        "Failed to start container: {}",
811                        e
812                    ))
813                })
814        },
815        move |container_id| async move {
816            docker_remove
817                .remove_container(&container_id, None)
818                .await
819                .map_err(|e| {
820                    CamelError::ProcessorError(format!(
821                        "Failed to remove container after start failure: {}",
822                        e
823                    ))
824                })
825        },
826    )
827    .await?;
828
829    track_container(container_id.clone());
830
831    exchange
832        .input
833        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
834    exchange.input.set_header(
835        HEADER_ACTION_RESULT,
836        serde_json::Value::String("success".to_string()),
837    );
838    Ok(())
839}
840
841async fn handle_lifecycle(
842    docker: Docker,
843    _config: ContainerConfig,
844    exchange: &mut Exchange,
845    operation: ProducerOperation,
846    operation_name: &str,
847) -> Result<(), CamelError> {
848    let container_id = exchange
849        .input
850        .header(HEADER_CONTAINER_ID)
851        .and_then(|v| v.as_str().map(|s| s.to_string()))
852        .ok_or_else(|| {
853            CamelError::ProcessorError(format!(
854                "{} header is required for {} operation",
855                HEADER_CONTAINER_ID, operation_name
856            ))
857        })?;
858
859    match operation {
860        ProducerOperation::Start => {
861            docker
862                .start_container::<String>(&container_id, None)
863                .await
864                .map_err(|e| {
865                    CamelError::ProcessorError(format!("Failed to start container: {}", e))
866                })?;
867        }
868        ProducerOperation::Stop => {
869            docker
870                .stop_container(&container_id, None)
871                .await
872                .map_err(|e| {
873                    CamelError::ProcessorError(format!("Failed to stop container: {}", e))
874                })?;
875        }
876        ProducerOperation::Remove => {
877            docker
878                .remove_container(&container_id, None)
879                .await
880                .map_err(|e| {
881                    CamelError::ProcessorError(format!("Failed to remove container: {}", e))
882                })?;
883            untrack_container(&container_id);
884        }
885        _ => {}
886    }
887
888    exchange.input.set_header(
889        HEADER_ACTION_RESULT,
890        serde_json::Value::String("success".to_string()),
891    );
892    Ok(())
893}
894
895async fn handle_exec(
896    docker: Docker,
897    config: ContainerConfig,
898    exchange: &mut Exchange,
899) -> Result<(), CamelError> {
900    let container_id = exchange
901        .input
902        .header(HEADER_CONTAINER_ID)
903        .and_then(|v| v.as_str().map(|s| s.to_string()))
904        .or(config.container_id.clone())
905        .ok_or_else(|| {
906            CamelError::ProcessorError(format!(
907                "{} header or containerId param is required for exec operation",
908                HEADER_CONTAINER_ID
909            ))
910        })?;
911
912    let cmd = exchange
913        .input
914        .header(HEADER_CMD)
915        .and_then(|v| v.as_str().map(|s| s.to_string()))
916        .or(config.cmd.clone())
917        .ok_or_else(|| {
918            CamelError::ProcessorError(
919                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
920            )
921        })?;
922
923    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
924    let env_vars = config.parse_env();
925
926    let exec_config = bollard::exec::CreateExecOptions {
927        cmd: Some(cmd_parts),
928        env: env_vars,
929        user: config.user.clone(),
930        working_dir: config.workdir.clone(),
931        attach_stdout: Some(true),
932        attach_stderr: Some(true),
933        ..Default::default()
934    };
935
936    let create_result = docker
937        .create_exec(&container_id, exec_config)
938        .await
939        .map_err(|e| {
940            let err_str = e.to_string().to_lowercase();
941            if err_str.contains("404") || err_str.contains("no such") {
942                CamelError::ProcessorError(format!(
943                    "Container '{}' not found for exec",
944                    container_id
945                ))
946            } else {
947                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
948            }
949        })?;
950
951    let exec_id = create_result.id;
952
953    if config.detach {
954        docker
955            .start_exec(
956                &exec_id,
957                Some(bollard::exec::StartExecOptions {
958                    detach: true,
959                    ..Default::default()
960                }),
961            )
962            .await
963            .map_err(|e| {
964                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
965            })?;
966
967        exchange
968            .input
969            .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
970        exchange
971            .input
972            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
973    } else {
974        let start_result = docker
975            .start_exec(&exec_id, None)
976            .await
977            .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
978
979        let mut output = String::new();
980
981        match start_result {
982            bollard::exec::StartExecResults::Attached {
983                output: mut stream, ..
984            } => {
985                use futures::StreamExt;
986                while let Some(msg) = stream.next().await {
987                    match msg {
988                        Ok(bollard::container::LogOutput::StdOut { message }) => {
989                            output.push_str(&String::from_utf8_lossy(&message));
990                        }
991                        Ok(bollard::container::LogOutput::StdErr { message }) => {
992                            output.push_str(&String::from_utf8_lossy(&message));
993                        }
994                        Ok(_) => {}
995                        Err(e) => {
996                            output.push_str(&format!("[error reading stream: {}]", e));
997                        }
998                    }
999                }
1000            }
1001            bollard::exec::StartExecResults::Detached => {}
1002        }
1003
1004        let inspect = docker
1005            .inspect_exec(&exec_id)
1006            .await
1007            .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1008
1009        let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1010
1011        let output = output.trim_end().to_string();
1012        exchange.input.body = Body::Text(output);
1013        exchange.input.set_header(
1014            HEADER_EXIT_CODE,
1015            serde_json::Value::Number(exit_code.into()),
1016        );
1017        exchange
1018            .input
1019            .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1020    }
1021
1022    exchange.input.set_header(
1023        HEADER_ACTION_RESULT,
1024        serde_json::Value::String("success".to_string()),
1025    );
1026    Ok(())
1027}
1028
1029async fn handle_network_create(
1030    docker: Docker,
1031    config: ContainerConfig,
1032    exchange: &mut Exchange,
1033) -> Result<(), CamelError> {
1034    let network_name = exchange
1035        .input
1036        .header(HEADER_CONTAINER_NAME)
1037        .and_then(|v| v.as_str().map(|s| s.to_string()))
1038        .or(config.name.clone())
1039        .ok_or_else(|| {
1040            CamelError::ProcessorError(
1041                "CamelContainerName header or name param is required for network-create"
1042                    .to_string(),
1043            )
1044        })?;
1045
1046    let driver = config.driver.as_deref().unwrap_or("bridge");
1047
1048    let options = bollard::network::CreateNetworkOptions {
1049        name: network_name.clone(),
1050        driver: driver.to_string(),
1051        ..Default::default()
1052    };
1053
1054    let result = docker.create_network(options).await.map_err(|e| {
1055        let err_str = e.to_string().to_lowercase();
1056        if err_str.contains("409") || err_str.contains("already exists") {
1057            CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1058        } else {
1059            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1060        }
1061    })?;
1062
1063    let network_id = result.id.clone();
1064    let json_value = serde_json::to_value(&result).map_err(|e| {
1065        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1066    })?;
1067
1068    exchange.input.body = Body::Json(json_value);
1069    exchange
1070        .input
1071        .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1072    exchange.input.set_header(
1073        HEADER_ACTION_RESULT,
1074        serde_json::Value::String("success".to_string()),
1075    );
1076    Ok(())
1077}
1078
1079async fn handle_network_connect(
1080    docker: Docker,
1081    config: ContainerConfig,
1082    exchange: &mut Exchange,
1083) -> Result<(), CamelError> {
1084    let network = exchange
1085        .input
1086        .header(HEADER_NETWORK)
1087        .and_then(|v| v.as_str().map(|s| s.to_string()))
1088        .or(config.network.clone())
1089        .ok_or_else(|| {
1090            CamelError::ProcessorError(
1091                "CamelContainerNetwork header or network param is required for network-connect"
1092                    .to_string(),
1093            )
1094        })?;
1095
1096    let container = exchange
1097        .input
1098        .header(HEADER_CONTAINER_ID)
1099        .and_then(|v| v.as_str().map(|s| s.to_string()))
1100        .or(config.container_id.clone())
1101        .ok_or_else(|| {
1102            CamelError::ProcessorError(
1103                "CamelContainerId header or container param is required for network-connect"
1104                    .to_string(),
1105            )
1106        })?;
1107
1108    docker
1109        .connect_network(
1110            &network,
1111            bollard::network::ConnectNetworkOptions {
1112                container,
1113                ..Default::default()
1114            },
1115        )
1116        .await
1117        .map_err(|e| {
1118            let err_str = e.to_string().to_lowercase();
1119            if err_str.contains("404") || err_str.contains("not found") {
1120                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1121            } else {
1122                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1123            }
1124        })?;
1125
1126    exchange.input.set_header(
1127        HEADER_ACTION_RESULT,
1128        serde_json::Value::String("success".to_string()),
1129    );
1130    Ok(())
1131}
1132
1133async fn handle_network_disconnect(
1134    docker: Docker,
1135    config: ContainerConfig,
1136    exchange: &mut Exchange,
1137) -> Result<(), CamelError> {
1138    let network = exchange
1139        .input
1140        .header(HEADER_NETWORK)
1141        .and_then(|v| v.as_str().map(|s| s.to_string()))
1142        .or(config.network.clone())
1143        .ok_or_else(|| {
1144            CamelError::ProcessorError(
1145                "CamelContainerNetwork header or network param is required for network-disconnect"
1146                    .to_string(),
1147            )
1148        })?;
1149
1150    let container = exchange
1151        .input
1152        .header(HEADER_CONTAINER_ID)
1153        .and_then(|v| v.as_str().map(|s| s.to_string()))
1154        .or(config.container_id.clone())
1155        .ok_or_else(|| {
1156            CamelError::ProcessorError(
1157                "CamelContainerId header or container param is required for network-disconnect"
1158                    .to_string(),
1159            )
1160        })?;
1161
1162    docker
1163        .disconnect_network(
1164            &network,
1165            bollard::network::DisconnectNetworkOptions {
1166                container,
1167                force: config.force,
1168            },
1169        )
1170        .await
1171        .map_err(|e| {
1172            let err_str = e.to_string().to_lowercase();
1173            if err_str.contains("404") || err_str.contains("not found") {
1174                CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1175            } else {
1176                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1177            }
1178        })?;
1179
1180    exchange.input.set_header(
1181        HEADER_ACTION_RESULT,
1182        serde_json::Value::String("success".to_string()),
1183    );
1184    Ok(())
1185}
1186
1187async fn handle_network_remove(
1188    docker: Docker,
1189    config: ContainerConfig,
1190    exchange: &mut Exchange,
1191) -> Result<(), CamelError> {
1192    let network = exchange
1193        .input
1194        .header(HEADER_NETWORK)
1195        .and_then(|v| v.as_str().map(|s| s.to_string()))
1196        .or(config.network.clone())
1197        .ok_or_else(|| {
1198            CamelError::ProcessorError(
1199                "CamelContainerNetwork header or network param is required for network-remove"
1200                    .to_string(),
1201            )
1202        })?;
1203
1204    docker.remove_network(&network).await.map_err(|e| {
1205        let err_str = e.to_string().to_lowercase();
1206        if err_str.contains("404") || err_str.contains("not found") {
1207            CamelError::ProcessorError(format!("Network '{}' not found", network))
1208        } else if err_str.contains("409") || err_str.contains("in use") {
1209            CamelError::ProcessorError(format!(
1210                "Network '{}' is in use and cannot be removed",
1211                network
1212            ))
1213        } else {
1214            CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1215        }
1216    })?;
1217
1218    exchange.input.set_header(
1219        HEADER_ACTION_RESULT,
1220        serde_json::Value::String("success".to_string()),
1221    );
1222    Ok(())
1223}
1224
1225async fn handle_network_list(
1226    docker: Docker,
1227    _config: ContainerConfig,
1228    exchange: &mut Exchange,
1229) -> Result<(), CamelError> {
1230    let networks = docker
1231        .list_networks::<String>(None)
1232        .await
1233        .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1234
1235    let json_value = serde_json::to_value(&networks)
1236        .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1237
1238    exchange.input.body = Body::Json(json_value);
1239    exchange.input.set_header(
1240        HEADER_ACTION_RESULT,
1241        serde_json::Value::String("success".to_string()),
1242    );
1243    Ok(())
1244}
1245
1246/// Producer for executing container operations.
1247///
1248/// This producer handles synchronous container operations like listing,
1249/// creating, starting, stopping, and removing containers.
1250#[derive(Clone)]
1251pub struct ContainerProducer {
1252    config: ContainerConfig,
1253    docker: Docker,
1254}
1255
1256impl Service<Exchange> for ContainerProducer {
1257    type Response = Exchange;
1258    type Error = CamelError;
1259    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1260
1261    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1262        Poll::Ready(Ok(()))
1263    }
1264
1265    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1266        let config = self.config.clone();
1267        let docker = self.docker.clone();
1268        Box::pin(async move {
1269            let operation_name = exchange
1270                .input
1271                .header(HEADER_ACTION)
1272                .and_then(|v| v.as_str().map(|s| s.to_string()))
1273                .unwrap_or_else(|| config.operation.clone());
1274
1275            let operation = parse_producer_operation(&operation_name)?;
1276
1277            match operation {
1278                ProducerOperation::List => {
1279                    handle_list(docker, config, &mut exchange).await?;
1280                }
1281                ProducerOperation::Run => {
1282                    handle_run(docker, config, &mut exchange).await?;
1283                }
1284                ProducerOperation::Start => {
1285                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1286                        .await?;
1287                }
1288                ProducerOperation::Stop => {
1289                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1290                        .await?;
1291                }
1292                ProducerOperation::Remove => {
1293                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1294                        .await?;
1295                }
1296                ProducerOperation::Exec => {
1297                    handle_exec(docker, config, &mut exchange).await?;
1298                }
1299                ProducerOperation::NetworkCreate => {
1300                    handle_network_create(docker, config, &mut exchange).await?;
1301                }
1302                ProducerOperation::NetworkConnect => {
1303                    handle_network_connect(docker, config, &mut exchange).await?;
1304                }
1305                ProducerOperation::NetworkDisconnect => {
1306                    handle_network_disconnect(docker, config, &mut exchange).await?;
1307                }
1308                ProducerOperation::NetworkRemove => {
1309                    handle_network_remove(docker, config, &mut exchange).await?;
1310                }
1311                ProducerOperation::NetworkList => {
1312                    handle_network_list(docker, config, &mut exchange).await?;
1313                }
1314            }
1315
1316            Ok(exchange)
1317        })
1318    }
1319}
1320
1321/// Consumer for receiving Docker container events or logs.
1322///
1323/// This consumer subscribes to Docker events or container logs and forwards them
1324/// to the route as exchanges. It implements automatic reconnection on connection failures.
1325pub struct ContainerConsumer {
1326    config: ContainerConfig,
1327}
1328
1329#[async_trait]
1330impl Consumer for ContainerConsumer {
1331    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1332        match self.config.operation.as_str() {
1333            "events" => self.start_events_consumer(context).await,
1334            "logs" => self.start_logs_consumer(context).await,
1335            _ => Err(CamelError::EndpointCreationFailed(format!(
1336                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1337                self.config.operation
1338            ))),
1339        }
1340    }
1341
1342    async fn stop(&mut self) -> Result<(), CamelError> {
1343        Ok(())
1344    }
1345
1346    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1347        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1348    }
1349}
1350
1351impl ContainerConsumer {
1352    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1353        use futures::StreamExt;
1354
1355        loop {
1356            if context.is_cancelled() {
1357                tracing::info!("Container events consumer shutting down");
1358                return Ok(());
1359            }
1360
1361            let docker = match self.config.connect_docker().await {
1362                Ok(d) => d,
1363                Err(e) => {
1364                    tracing::error!(
1365                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
1366                        e
1367                    );
1368                    tokio::select! {
1369                        _ = context.cancelled() => {
1370                            tracing::info!("Container events consumer shutting down");
1371                            return Ok(());
1372                        }
1373                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1374                    }
1375                    continue;
1376                }
1377            };
1378
1379            let mut event_stream = docker.events::<String>(None);
1380
1381            loop {
1382                tokio::select! {
1383                    _ = context.cancelled() => {
1384                        tracing::info!("Container events consumer shutting down");
1385                        return Ok(());
1386                    }
1387
1388                    msg = event_stream.next() => {
1389                        match msg {
1390                            Some(Ok(event)) => {
1391                                let formatted = format_docker_event(&event);
1392                                let message = Message::new(Body::Text(formatted));
1393                                let exchange = Exchange::new(message);
1394
1395                                if let Err(e) = context.send(exchange).await {
1396                                    tracing::error!("Failed to send exchange: {:?}", e);
1397                                    break;
1398                                }
1399                            }
1400                            Some(Err(e)) => {
1401                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1402                                break;
1403                            }
1404                            None => {
1405                                tracing::info!("Docker event stream ended. Reconnecting...");
1406                                break;
1407                            }
1408                        }
1409                    }
1410                }
1411            }
1412
1413            tokio::select! {
1414                _ = context.cancelled() => {
1415                    tracing::info!("Container events consumer shutting down");
1416                    return Ok(());
1417                }
1418                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1419            }
1420        }
1421    }
1422
1423    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1424        use futures::StreamExt;
1425
1426        let container_id = self.config.container_id.clone().ok_or_else(|| {
1427            CamelError::EndpointCreationFailed(
1428                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1429                    .to_string(),
1430            )
1431        })?;
1432
1433        loop {
1434            if context.is_cancelled() {
1435                tracing::info!("Container logs consumer shutting down");
1436                return Ok(());
1437            }
1438
1439            let docker = match self.config.connect_docker().await {
1440                Ok(d) => d,
1441                Err(e) => {
1442                    tracing::error!(
1443                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1444                        e
1445                    );
1446                    tokio::select! {
1447                        _ = context.cancelled() => {
1448                            tracing::info!("Container logs consumer shutting down");
1449                            return Ok(());
1450                        }
1451                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1452                    }
1453                    continue;
1454                }
1455            };
1456
1457            let tail = self
1458                .config
1459                .tail
1460                .clone()
1461                .unwrap_or_else(|| "all".to_string());
1462
1463            let options = bollard::container::LogsOptions::<String> {
1464                follow: self.config.follow,
1465                stdout: true,
1466                stderr: true,
1467                timestamps: self.config.timestamps,
1468                tail,
1469                ..Default::default()
1470            };
1471
1472            let mut log_stream = docker.logs(&container_id, Some(options));
1473            let container_id_header = container_id.clone();
1474
1475            loop {
1476                tokio::select! {
1477                    _ = context.cancelled() => {
1478                        tracing::info!("Container logs consumer shutting down");
1479                        return Ok(());
1480                    }
1481
1482                    msg = log_stream.next() => {
1483                        match msg {
1484                            Some(Ok(log_output)) => {
1485                                let (stream_type, content) = match log_output {
1486                                    bollard::container::LogOutput::StdOut { message } => {
1487                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1488                                    }
1489                                    bollard::container::LogOutput::StdErr { message } => {
1490                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1491                                    }
1492                                    bollard::container::LogOutput::Console { message } => {
1493                                        ("console", String::from_utf8_lossy(&message).into_owned())
1494                                    }
1495                                    bollard::container::LogOutput::StdIn { message } => {
1496                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1497                                    }
1498                                };
1499
1500                                let content = content.trim_end();
1501                                if content.is_empty() {
1502                                    continue;
1503                                }
1504
1505                                let mut message = Message::new(Body::Text(content.to_string()));
1506                                message.set_header(
1507                                    HEADER_CONTAINER_ID,
1508                                    serde_json::Value::String(container_id_header.clone()),
1509                                );
1510                                message.set_header(
1511                                    HEADER_LOG_STREAM,
1512                                    serde_json::Value::String(stream_type.to_string()),
1513                                );
1514
1515                                if self.config.timestamps
1516                                    && let Some(ts) = extract_timestamp(content) {
1517                                        message.set_header(
1518                                            HEADER_LOG_TIMESTAMP,
1519                                            serde_json::Value::String(ts),
1520                                        );
1521                                    }
1522
1523                                let exchange = Exchange::new(message);
1524
1525                                if let Err(e) = context.send(exchange).await {
1526                                    tracing::error!("Failed to send log exchange: {:?}", e);
1527                                    break;
1528                                }
1529                            }
1530                            Some(Err(e)) => {
1531                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1532                                break;
1533                            }
1534                            None => {
1535                                if self.config.follow {
1536                                    tracing::info!("Docker log stream ended. Reconnecting...");
1537                                    break;
1538                                } else {
1539                                    tracing::info!("Container logs consumer finished (follow=false)");
1540                                    return Ok(());
1541                                }
1542                            }
1543                        }
1544                    }
1545                }
1546            }
1547
1548            tokio::select! {
1549                _ = context.cancelled() => {
1550                    tracing::info!("Container logs consumer shutting down");
1551                    return Ok(());
1552                }
1553                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1554            }
1555        }
1556    }
1557}
1558
1559fn extract_timestamp(log_line: &str) -> Option<String> {
1560    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1561    if parts.len() > 1 && parts[0].contains('T') {
1562        Some(parts[0].to_string())
1563    } else {
1564        None
1565    }
1566}
1567
1568/// Component for creating container endpoints.
1569///
1570/// This component handles URIs with the "container" scheme and creates
1571/// appropriate producer and consumer endpoints for Docker operations.
1572///
1573/// Containers created via `run` operation are tracked globally and can be
1574/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1575pub struct ContainerComponent {
1576    config: Option<ContainerGlobalConfig>,
1577}
1578
1579impl ContainerComponent {
1580    /// Creates a new container component instance without global config.
1581    pub fn new() -> Self {
1582        Self { config: None }
1583    }
1584
1585    /// Creates a container component with the given global config.
1586    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1587        Self {
1588            config: Some(config),
1589        }
1590    }
1591
1592    /// Creates a container component with optional global config.
1593    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1594        Self { config }
1595    }
1596}
1597
1598impl Default for ContainerComponent {
1599    fn default() -> Self {
1600        Self::new()
1601    }
1602}
1603
1604impl Component for ContainerComponent {
1605    fn scheme(&self) -> &str {
1606        "container"
1607    }
1608
1609    fn create_endpoint(
1610        &self,
1611        uri: &str,
1612        _ctx: &dyn camel_component_api::ComponentContext,
1613    ) -> Result<Box<dyn Endpoint>, CamelError> {
1614        let mut config = ContainerConfig::from_uri(uri)?;
1615        // Apply global defaults if present and URI didn't set them
1616        if let Some(ref global) = self.config {
1617            config.apply_global_defaults(global);
1618        }
1619        Ok(Box::new(ContainerEndpoint {
1620            uri: uri.to_string(),
1621            config,
1622        }))
1623    }
1624}
1625
1626/// Endpoint for container operations.
1627///
1628/// This endpoint creates producers for executing container operations
1629/// and consumers for receiving container events.
1630pub struct ContainerEndpoint {
1631    uri: String,
1632    config: ContainerConfig,
1633}
1634
1635impl ContainerEndpoint {
1636    /// Returns the Docker host configured for this endpoint.
1637    /// Returns `None` if not set (for testing purposes).
1638    pub fn docker_host(&self) -> Option<&str> {
1639        self.config.host.as_deref()
1640    }
1641}
1642
1643impl Endpoint for ContainerEndpoint {
1644    fn uri(&self) -> &str {
1645        &self.uri
1646    }
1647
1648    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1649        Ok(Box::new(ContainerConsumer {
1650            config: self.config.clone(),
1651        }))
1652    }
1653
1654    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1655        let docker = self.config.connect_docker_client()?;
1656        Ok(BoxProcessor::new(ContainerProducer {
1657            config: self.config.clone(),
1658            docker,
1659        }))
1660    }
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665    use super::*;
1666    use camel_component_api::NoOpComponentContext;
1667
1668    #[test]
1669    fn test_container_config() {
1670        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1671        assert_eq!(config.operation, "run");
1672        assert_eq!(config.image.as_deref(), Some("alpine"));
1673        // host is None by default; global config applies it later
1674        assert!(config.host.is_none());
1675    }
1676
1677    #[test]
1678    fn test_global_config_applied_to_endpoint() {
1679        // When global config is set and URI doesn't specify host,
1680        // apply_global_defaults should set host from global config.
1681        let global =
1682            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1683        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1684        assert!(
1685            config.host.is_none(),
1686            "URI without ?host= should leave host as None"
1687        );
1688        config.apply_global_defaults(&global);
1689        assert_eq!(
1690            config.host.as_deref(),
1691            Some("unix:///custom/docker.sock"),
1692            "global docker_host must be applied when URI did not set host"
1693        );
1694    }
1695
1696    #[test]
1697    fn test_uri_param_wins_over_global_config() {
1698        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1699        let global =
1700            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1701        let mut config =
1702            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1703                .unwrap();
1704        assert_eq!(
1705            config.host.as_deref(),
1706            Some("unix:///override.sock"),
1707            "URI-set host should be parsed correctly"
1708        );
1709        config.apply_global_defaults(&global);
1710        assert_eq!(
1711            config.host.as_deref(),
1712            Some("unix:///override.sock"),
1713            "global config must NOT override a host already set by URI"
1714        );
1715    }
1716
1717    #[test]
1718    fn test_container_config_parses_name() {
1719        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1720        assert_eq!(config.name.as_deref(), Some("my-container"));
1721    }
1722
1723    #[test]
1724    fn test_parse_producer_operation_known() {
1725        assert_eq!(
1726            parse_producer_operation("list").unwrap(),
1727            ProducerOperation::List
1728        );
1729        assert_eq!(
1730            parse_producer_operation("run").unwrap(),
1731            ProducerOperation::Run
1732        );
1733        assert_eq!(
1734            parse_producer_operation("start").unwrap(),
1735            ProducerOperation::Start
1736        );
1737        assert_eq!(
1738            parse_producer_operation("stop").unwrap(),
1739            ProducerOperation::Stop
1740        );
1741        assert_eq!(
1742            parse_producer_operation("remove").unwrap(),
1743            ProducerOperation::Remove
1744        );
1745    }
1746
1747    #[test]
1748    fn test_parse_producer_operation_unknown() {
1749        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1750        match err {
1751            CamelError::ProcessorError(msg) => {
1752                assert!(
1753                    msg.contains("Unknown container operation"),
1754                    "Unexpected error message: {}",
1755                    msg
1756                );
1757            }
1758            _ => panic!("Expected ProcessorError for unknown operation"),
1759        }
1760    }
1761
1762    #[test]
1763    fn test_parse_producer_operation_new_variants() {
1764        assert_eq!(
1765            parse_producer_operation("exec").unwrap(),
1766            ProducerOperation::Exec
1767        );
1768        assert_eq!(
1769            parse_producer_operation("network-create").unwrap(),
1770            ProducerOperation::NetworkCreate
1771        );
1772        assert_eq!(
1773            parse_producer_operation("network-connect").unwrap(),
1774            ProducerOperation::NetworkConnect
1775        );
1776        assert_eq!(
1777            parse_producer_operation("network-disconnect").unwrap(),
1778            ProducerOperation::NetworkDisconnect
1779        );
1780        assert_eq!(
1781            parse_producer_operation("network-remove").unwrap(),
1782            ProducerOperation::NetworkRemove
1783        );
1784        assert_eq!(
1785            parse_producer_operation("network-list").unwrap(),
1786            ProducerOperation::NetworkList
1787        );
1788    }
1789
1790    #[test]
1791    fn test_resolve_container_name_header_overrides_config() {
1792        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1793        let mut exchange = Exchange::new(Message::new(""));
1794        exchange.input.set_header(
1795            HEADER_CONTAINER_NAME,
1796            serde_json::Value::String("header-name".to_string()),
1797        );
1798
1799        let resolved = resolve_container_name(&exchange, &config);
1800        assert_eq!(resolved.as_deref(), Some("header-name"));
1801    }
1802
1803    #[test]
1804    fn test_container_config_rejects_tcp_host() {
1805        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1806        let err = config.connect_docker_client().unwrap_err();
1807        match err {
1808            CamelError::ProcessorError(msg) => {
1809                assert!(
1810                    msg.to_lowercase().contains("tcp"),
1811                    "Expected TCP scheme error, got: {}",
1812                    msg
1813                );
1814            }
1815            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1816        }
1817    }
1818
1819    #[tokio::test]
1820    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1821        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1822        let remove_called_clone = remove_called.clone();
1823
1824        let result = run_container_with_cleanup(
1825            || async { Ok("container-123".to_string()) },
1826            |_id| async move {
1827                Err(CamelError::ProcessorError(
1828                    "Failed to start container".to_string(),
1829                ))
1830            },
1831            move |_id| {
1832                let remove_called_inner = remove_called_clone.clone();
1833                async move {
1834                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1835                    Ok(())
1836                }
1837            },
1838        )
1839        .await;
1840
1841        assert!(result.is_err(), "Expected start failure to bubble up");
1842        assert!(
1843            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1844            "Expected cleanup to remove container"
1845        );
1846    }
1847
1848    #[test]
1849    fn test_container_component_creates_endpoint() {
1850        let component = ContainerComponent::new();
1851        assert_eq!(component.scheme(), "container");
1852        let ctx = NoOpComponentContext;
1853        let endpoint = component
1854            .create_endpoint("container:run?image=alpine", &ctx)
1855            .unwrap();
1856        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1857    }
1858
1859    #[test]
1860    fn test_container_config_parses_ports() {
1861        let config =
1862            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1863        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1864    }
1865
1866    #[test]
1867    fn test_container_config_parses_env() {
1868        let config =
1869            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1870        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1871    }
1872
1873    #[test]
1874    fn test_container_config_parses_logs_options() {
1875        let config = ContainerConfig::from_uri(
1876            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1877        )
1878        .unwrap();
1879        assert_eq!(config.operation, "logs");
1880        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1881        assert!(config.follow);
1882        assert!(config.timestamps);
1883        assert_eq!(config.tail.as_deref(), Some("100"));
1884    }
1885
1886    #[test]
1887    fn test_container_config_logs_defaults() {
1888        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1889        assert!(config.follow); // default: true
1890        assert!(!config.timestamps); // default: false
1891        assert!(config.tail.is_none()); // default: None (all)
1892    }
1893
1894    #[test]
1895    fn test_parse_ports_single() {
1896        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1897        let (exposed, bindings) = config.parse_ports().unwrap();
1898
1899        assert!(exposed.contains_key("80/tcp"));
1900        assert!(bindings.contains_key("80/tcp"));
1901
1902        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1903        assert_eq!(binding.len(), 1);
1904        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1905    }
1906
1907    #[test]
1908    fn test_parse_ports_multiple() {
1909        let config =
1910            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1911        let (exposed, bindings) = config.parse_ports().unwrap();
1912
1913        assert!(exposed.contains_key("80/tcp"));
1914        assert!(exposed.contains_key("443/tcp"));
1915        assert_eq!(bindings.len(), 2);
1916    }
1917
1918    #[test]
1919    fn test_parse_ports_with_protocol() {
1920        let config =
1921            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1922                .unwrap();
1923        let (exposed, _bindings) = config.parse_ports().unwrap();
1924
1925        assert!(exposed.contains_key("80/tcp"));
1926        assert!(exposed.contains_key("53/udp"));
1927    }
1928
1929    #[test]
1930    fn test_parse_ports_none() {
1931        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1932        assert!(config.parse_ports().is_none());
1933    }
1934
1935    #[test]
1936    fn test_parse_env_single() {
1937        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1938        let env = config.parse_env().unwrap();
1939
1940        assert_eq!(env.len(), 1);
1941        assert_eq!(env[0], "FOO=bar");
1942    }
1943
1944    #[test]
1945    fn test_parse_env_multiple() {
1946        let config =
1947            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1948                .unwrap();
1949        let env = config.parse_env().unwrap();
1950
1951        assert_eq!(env.len(), 3);
1952        assert!(env.contains(&"FOO=bar".to_string()));
1953        assert!(env.contains(&"BAZ=qux".to_string()));
1954        assert!(env.contains(&"NUM=123".to_string()));
1955    }
1956
1957    #[test]
1958    fn test_parse_env_none() {
1959        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1960        assert!(config.parse_env().is_none());
1961    }
1962
1963    use camel_component_api::Message;
1964    use std::sync::Arc;
1965
1966    #[tokio::test]
1967    async fn test_container_producer_resolves_operation_from_header() {
1968        // Try to connect to Docker - if fails, return early
1969        let docker = match Docker::connect_with_local_defaults() {
1970            Ok(d) => d,
1971            Err(_) => {
1972                eprintln!("Skipping test: Could not connect to Docker daemon");
1973                return;
1974            }
1975        };
1976
1977        if docker.ping().await.is_err() {
1978            eprintln!("Skipping test: Docker daemon not responding to ping");
1979            return;
1980        }
1981
1982        let component = ContainerComponent::new();
1983        let ctx = NoOpComponentContext;
1984        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1985
1986        let ctx = ProducerContext::new();
1987        let mut producer = endpoint.create_producer(&ctx).unwrap();
1988
1989        let mut exchange = Exchange::new(Message::new(""));
1990        exchange
1991            .input
1992            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1993
1994        use tower::ServiceExt;
1995        let result = producer
1996            .ready()
1997            .await
1998            .unwrap()
1999            .call(exchange)
2000            .await
2001            .unwrap();
2002
2003        // For list operation, the result header should be on the input message
2004        assert_eq!(
2005            result
2006                .input
2007                .header(HEADER_ACTION_RESULT)
2008                .map(|v| v.as_str().unwrap()),
2009            Some("success")
2010        );
2011    }
2012
2013    #[tokio::test]
2014    async fn test_container_producer_connection_error_on_invalid_host() {
2015        // Test that an invalid host (nonexistent socket) results in a connection error
2016        let component = ContainerComponent::new();
2017        let ctx = NoOpComponentContext;
2018        let endpoint = component
2019            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2020            .unwrap();
2021
2022        let ctx = ProducerContext::new();
2023        let result = endpoint.create_producer(&ctx);
2024
2025        // The producer should return an error because it cannot connect to the invalid socket
2026        assert!(
2027            result.is_err(),
2028            "Expected error when connecting to invalid host"
2029        );
2030        let err = result.unwrap_err();
2031        match &err {
2032            CamelError::ProcessorError(msg) => {
2033                assert!(
2034                    msg.to_lowercase().contains("connection")
2035                        || msg.to_lowercase().contains("connect")
2036                        || msg.to_lowercase().contains("socket")
2037                        || msg.contains("docker"),
2038                    "Error message should indicate connection failure, got: {}",
2039                    msg
2040                );
2041            }
2042            _ => panic!("Expected ProcessorError, got: {:?}", err),
2043        }
2044    }
2045
2046    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2047    #[tokio::test]
2048    async fn test_container_producer_lifecycle_operations_missing_id() {
2049        // Try to connect to Docker - if fails, return early
2050        let docker = match Docker::connect_with_local_defaults() {
2051            Ok(d) => d,
2052            Err(_) => {
2053                eprintln!("Skipping test: Could not connect to Docker daemon");
2054                return;
2055            }
2056        };
2057
2058        if docker.ping().await.is_err() {
2059            eprintln!("Skipping test: Docker daemon not responding to ping");
2060            return;
2061        }
2062
2063        let component = ContainerComponent::new();
2064        let ctx = NoOpComponentContext;
2065        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2066        let ctx = ProducerContext::new();
2067        let mut producer = endpoint.create_producer(&ctx).unwrap();
2068
2069        // Test each lifecycle operation without CamelContainerId header
2070        for operation in ["start", "stop", "remove"] {
2071            let mut exchange = Exchange::new(Message::new(""));
2072            exchange.input.set_header(
2073                HEADER_ACTION,
2074                serde_json::Value::String(operation.to_string()),
2075            );
2076            // Deliberately NOT setting CamelContainerId header
2077
2078            use tower::ServiceExt;
2079            let result = producer.ready().await.unwrap().call(exchange).await;
2080
2081            assert!(
2082                result.is_err(),
2083                "Expected error for {} operation without CamelContainerId",
2084                operation
2085            );
2086            let err = result.unwrap_err();
2087            match &err {
2088                CamelError::ProcessorError(msg) => {
2089                    assert!(
2090                        msg.contains(HEADER_CONTAINER_ID),
2091                        "Error message should mention {}, got: {}",
2092                        HEADER_CONTAINER_ID,
2093                        msg
2094                    );
2095                }
2096                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2097            }
2098        }
2099    }
2100
2101    /// Test that stop operation returns an error for a nonexistent container.
2102    #[tokio::test]
2103    async fn test_container_producer_stop_nonexistent() {
2104        // Try to connect to Docker - if fails, return early
2105        let docker = match Docker::connect_with_local_defaults() {
2106            Ok(d) => d,
2107            Err(_) => {
2108                eprintln!("Skipping test: Could not connect to Docker daemon");
2109                return;
2110            }
2111        };
2112
2113        if docker.ping().await.is_err() {
2114            eprintln!("Skipping test: Docker daemon not responding to ping");
2115            return;
2116        }
2117
2118        let component = ContainerComponent::new();
2119        let ctx = NoOpComponentContext;
2120        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2121        let ctx = ProducerContext::new();
2122        let mut producer = endpoint.create_producer(&ctx).unwrap();
2123
2124        let mut exchange = Exchange::new(Message::new(""));
2125        exchange
2126            .input
2127            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2128        exchange.input.set_header(
2129            HEADER_CONTAINER_ID,
2130            serde_json::Value::String("nonexistent-container-123".into()),
2131        );
2132
2133        use tower::ServiceExt;
2134        let result = producer.ready().await.unwrap().call(exchange).await;
2135
2136        assert!(
2137            result.is_err(),
2138            "Expected error when stopping nonexistent container"
2139        );
2140        let err = result.unwrap_err();
2141        match &err {
2142            CamelError::ProcessorError(msg) => {
2143                // Docker API returns 404 with "No such container" message
2144                assert!(
2145                    msg.to_lowercase().contains("no such container")
2146                        || msg.to_lowercase().contains("not found")
2147                        || msg.contains("404"),
2148                    "Error message should indicate container not found, got: {}",
2149                    msg
2150                );
2151            }
2152            _ => panic!("Expected ProcessorError, got: {:?}", err),
2153        }
2154    }
2155
2156    /// Test that run operation returns an error when no image is provided.
2157    #[tokio::test]
2158    async fn test_container_producer_run_missing_image() {
2159        // Try to connect to Docker - if fails, return early
2160        let docker = match Docker::connect_with_local_defaults() {
2161            Ok(d) => d,
2162            Err(_) => {
2163                eprintln!("Skipping test: Could not connect to Docker daemon");
2164                return;
2165            }
2166        };
2167
2168        if docker.ping().await.is_err() {
2169            eprintln!("Skipping test: Docker daemon not responding to ping");
2170            return;
2171        }
2172
2173        // Create producer without an image in the URI
2174        let component = ContainerComponent::new();
2175        let ctx = NoOpComponentContext;
2176        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2177        let ctx = ProducerContext::new();
2178        let mut producer = endpoint.create_producer(&ctx).unwrap();
2179
2180        let mut exchange = Exchange::new(Message::new(""));
2181        exchange
2182            .input
2183            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2184        // Deliberately NOT setting CamelContainerImage header
2185
2186        use tower::ServiceExt;
2187        let result = producer.ready().await.unwrap().call(exchange).await;
2188
2189        assert!(
2190            result.is_err(),
2191            "Expected error for run operation without image"
2192        );
2193        let err = result.unwrap_err();
2194        match &err {
2195            CamelError::ProcessorError(msg) => {
2196                assert!(
2197                    msg.to_lowercase().contains("image"),
2198                    "Error message should mention 'image', got: {}",
2199                    msg
2200                );
2201            }
2202            _ => panic!("Expected ProcessorError, got: {:?}", err),
2203        }
2204    }
2205
2206    /// Test that run operation uses image from header.
2207    #[tokio::test]
2208    async fn test_container_producer_run_image_from_header() {
2209        // Try to connect to Docker - if fails, return early
2210        let docker = match Docker::connect_with_local_defaults() {
2211            Ok(d) => d,
2212            Err(_) => {
2213                eprintln!("Skipping test: Could not connect to Docker daemon");
2214                return;
2215            }
2216        };
2217
2218        if docker.ping().await.is_err() {
2219            eprintln!("Skipping test: Docker daemon not responding to ping");
2220            return;
2221        }
2222
2223        // Create producer without an image in the URI
2224        let component = ContainerComponent::new();
2225        let ctx = NoOpComponentContext;
2226        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2227        let ctx = ProducerContext::new();
2228        let mut producer = endpoint.create_producer(&ctx).unwrap();
2229
2230        let mut exchange = Exchange::new(Message::new(""));
2231        exchange
2232            .input
2233            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2234        // Set a non-existent image to test that the run operation attempts to use it
2235        exchange.input.set_header(
2236            HEADER_IMAGE,
2237            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2238        );
2239
2240        use tower::ServiceExt;
2241        let result = producer.ready().await.unwrap().call(exchange).await;
2242
2243        // The operation should fail because the image doesn't exist
2244        assert!(
2245            result.is_err(),
2246            "Expected error when running container with nonexistent image"
2247        );
2248        let err = result.unwrap_err();
2249        match &err {
2250            CamelError::ProcessorError(msg) => {
2251                // Docker API returns an error about the missing image
2252                assert!(
2253                    msg.to_lowercase().contains("no such image")
2254                        || msg.to_lowercase().contains("not found")
2255                        || msg.to_lowercase().contains("image")
2256                        || msg.to_lowercase().contains("pull")
2257                        || msg.contains("404"),
2258                    "Error message should indicate image issue, got: {}",
2259                    msg
2260                );
2261            }
2262            _ => panic!("Expected ProcessorError, got: {:?}", err),
2263        }
2264    }
2265
2266    /// Integration test: Actually run a container with alpine:latest.
2267    /// This test verifies the full flow: create → start → set headers.
2268    #[tokio::test]
2269    async fn test_container_producer_run_alpine_container() {
2270        let docker = match Docker::connect_with_local_defaults() {
2271            Ok(d) => d,
2272            Err(_) => {
2273                eprintln!("Skipping test: Could not connect to Docker daemon");
2274                return;
2275            }
2276        };
2277
2278        if docker.ping().await.is_err() {
2279            eprintln!("Skipping test: Docker daemon not responding to ping");
2280            return;
2281        }
2282
2283        // Pull alpine:latest if not present
2284        let images = docker.list_images::<&str>(None).await.unwrap();
2285        let has_alpine = images
2286            .iter()
2287            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2288
2289        if !has_alpine {
2290            eprintln!("Pulling alpine:latest image...");
2291            let mut stream = docker.create_image(
2292                Some(bollard::image::CreateImageOptions {
2293                    from_image: "alpine:latest",
2294                    ..Default::default()
2295                }),
2296                None,
2297                None,
2298            );
2299
2300            use futures::StreamExt;
2301            while let Some(_item) = stream.next().await {
2302                // Wait for pull to complete
2303            }
2304            eprintln!("Image pulled successfully");
2305        }
2306
2307        // Create producer
2308        let component = ContainerComponent::new();
2309        let ctx = NoOpComponentContext;
2310        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2311        let ctx = ProducerContext::new();
2312        let mut producer = endpoint.create_producer(&ctx).unwrap();
2313
2314        // Run container with unique name
2315        let timestamp = std::time::SystemTime::now()
2316            .duration_since(std::time::UNIX_EPOCH)
2317            .unwrap()
2318            .as_millis();
2319        let container_name = format!("test-rust-camel-{}", timestamp);
2320        let mut exchange = Exchange::new(Message::new(""));
2321        exchange.input.set_header(
2322            HEADER_IMAGE,
2323            serde_json::Value::String("alpine:latest".into()),
2324        );
2325        exchange.input.set_header(
2326            HEADER_CONTAINER_NAME,
2327            serde_json::Value::String(container_name.clone()),
2328        );
2329
2330        use tower::ServiceExt;
2331        let result = producer
2332            .ready()
2333            .await
2334            .unwrap()
2335            .call(exchange)
2336            .await
2337            .expect("Container run should succeed");
2338
2339        // Verify container ID was set
2340        let container_id = result
2341            .input
2342            .header(HEADER_CONTAINER_ID)
2343            .and_then(|v| v.as_str().map(|s| s.to_string()))
2344            .expect("Expected container ID header");
2345        assert!(!container_id.is_empty(), "Container ID should not be empty");
2346
2347        // Verify success header
2348        assert_eq!(
2349            result
2350                .input
2351                .header(HEADER_ACTION_RESULT)
2352                .and_then(|v| v.as_str()),
2353            Some("success")
2354        );
2355
2356        // Verify container exists in Docker
2357        let inspect = docker
2358            .inspect_container(&container_id, None)
2359            .await
2360            .expect("Container should exist");
2361        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2362
2363        // Cleanup: remove container
2364        docker
2365            .remove_container(
2366                &container_id,
2367                Some(bollard::container::RemoveContainerOptions {
2368                    force: true,
2369                    ..Default::default()
2370                }),
2371            )
2372            .await
2373            .ok();
2374
2375        eprintln!("✅ Container {} created and cleaned up", container_id);
2376    }
2377
2378    /// Test that consumer returns an error for unsupported operations.
2379    #[tokio::test]
2380    async fn test_container_consumer_unsupported_operation() {
2381        use tokio::sync::mpsc;
2382
2383        let component = ContainerComponent::new();
2384        let ctx = NoOpComponentContext;
2385        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2386        let mut consumer = endpoint.create_consumer().unwrap();
2387
2388        // Create a minimal ConsumerContext
2389        let (tx, _rx) = mpsc::channel(16);
2390        let cancel_token = tokio_util::sync::CancellationToken::new();
2391        let context = ConsumerContext::new(tx, cancel_token);
2392
2393        let result = consumer.start(context).await;
2394
2395        // Should return error because "run" is not a supported consumer operation
2396        assert!(
2397            result.is_err(),
2398            "Expected error for unsupported consumer operation"
2399        );
2400        let err = result.unwrap_err();
2401        match &err {
2402            CamelError::EndpointCreationFailed(msg) => {
2403                assert!(
2404                    msg.contains("Consumer only supports 'events' or 'logs'"),
2405                    "Error message should mention events or logs support, got: {}",
2406                    msg
2407                );
2408            }
2409            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2410        }
2411    }
2412
2413    #[test]
2414    fn test_container_consumer_concurrency_model_is_concurrent() {
2415        let consumer = ContainerConsumer {
2416            config: ContainerConfig::from_uri("container:events").unwrap(),
2417        };
2418
2419        assert_eq!(
2420            consumer.concurrency_model(),
2421            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2422        );
2423    }
2424
2425    /// Test that consumer gracefully shuts down when cancellation is requested.
2426    /// This test requires a running Docker daemon. If Docker is not available, the test
2427    /// will be ignored.
2428    #[tokio::test]
2429    async fn test_container_consumer_cancellation() {
2430        use std::sync::atomic::{AtomicBool, Ordering};
2431        use tokio::sync::mpsc;
2432
2433        // Try to connect to Docker - if fails, return early
2434        let docker = match Docker::connect_with_local_defaults() {
2435            Ok(d) => d,
2436            Err(_) => {
2437                eprintln!("Skipping test: Could not connect to Docker daemon");
2438                return;
2439            }
2440        };
2441
2442        if docker.ping().await.is_err() {
2443            eprintln!("Skipping test: Docker daemon not responding to ping");
2444            return;
2445        }
2446
2447        let component = ContainerComponent::new();
2448        let ctx = NoOpComponentContext;
2449        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2450        let mut consumer = endpoint.create_consumer().unwrap();
2451
2452        // Create a ConsumerContext
2453        let (tx, _rx) = mpsc::channel(16);
2454        let cancel_token = tokio_util::sync::CancellationToken::new();
2455        let context = ConsumerContext::new(tx, cancel_token.clone());
2456
2457        // Track if the consumer task has completed
2458        let completed = Arc::new(AtomicBool::new(false));
2459        let completed_clone = completed.clone();
2460
2461        // Spawn consumer in background
2462        let handle = tokio::spawn(async move {
2463            let result = consumer.start(context).await;
2464            // Mark as completed when done
2465            completed_clone.store(true, Ordering::SeqCst);
2466            result
2467        });
2468
2469        // Wait a bit for the consumer to start
2470        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2471
2472        // Consumer should still be running (not completed)
2473        assert!(
2474            !completed.load(Ordering::SeqCst),
2475            "Consumer should still be running before cancellation"
2476        );
2477
2478        // Request cancellation
2479        cancel_token.cancel();
2480
2481        // Wait for the task to complete (with timeout)
2482        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2483
2484        // Task should have completed (not timed out)
2485        assert!(
2486            result.is_ok(),
2487            "Consumer should gracefully shut down after cancellation"
2488        );
2489
2490        // Verify the consumer completed
2491        assert!(
2492            completed.load(Ordering::SeqCst),
2493            "Consumer should have completed after cancellation"
2494        );
2495    }
2496
2497    /// Integration test for listing containers.
2498    /// This test requires a running Docker daemon. If Docker is not available, the test
2499    /// will return early and be effectively ignored.
2500    #[tokio::test]
2501    async fn test_container_producer_list_containers() {
2502        // Try to connect to Docker using default config
2503        // If ping fails, return early (effectively ignoring this test)
2504        let docker = match Docker::connect_with_local_defaults() {
2505            Ok(d) => d,
2506            Err(_) => {
2507                eprintln!("Skipping test: Could not connect to Docker daemon");
2508                return;
2509            }
2510        };
2511
2512        if docker.ping().await.is_err() {
2513            eprintln!("Skipping test: Docker daemon not responding to ping");
2514            return;
2515        }
2516
2517        // Create producer with list operation
2518        let component = ContainerComponent::new();
2519        let ctx = NoOpComponentContext;
2520        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2521
2522        let ctx = ProducerContext::new();
2523        let mut producer = endpoint.create_producer(&ctx).unwrap();
2524
2525        // Create exchange with list operation in header
2526        let mut exchange = Exchange::new(Message::new(""));
2527        exchange
2528            .input
2529            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2530
2531        // Call the producer
2532        use tower::ServiceExt;
2533        let result = producer
2534            .ready()
2535            .await
2536            .unwrap()
2537            .call(exchange)
2538            .await
2539            .expect("Producer should succeed when Docker is available");
2540
2541        // Assert that the input exchange body is a JSON array
2542        // (because list_containers should put the JSON result in the body)
2543        match &result.input.body {
2544            camel_component_api::Body::Json(json_value) => {
2545                assert!(
2546                    json_value.is_array(),
2547                    "Expected input body to be a JSON array, got: {:?}",
2548                    json_value
2549                );
2550            }
2551            other => panic!("Expected Body::Json with array, got: {:?}", other),
2552        }
2553    }
2554
2555    #[test]
2556    fn test_container_config_parses_volumes() {
2557        let config = ContainerConfig::from_uri(
2558            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2559        )
2560        .unwrap();
2561        assert_eq!(
2562            config.volumes.as_deref(),
2563            Some("./html:/usr/share/nginx/html:ro")
2564        );
2565    }
2566
2567    #[test]
2568    fn test_container_config_parses_exec_params() {
2569        let config = ContainerConfig::from_uri(
2570            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2571        )
2572        .unwrap();
2573        assert_eq!(config.operation, "exec");
2574        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2575        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2576        assert_eq!(config.user.as_deref(), Some("root"));
2577        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2578        assert!(config.detach);
2579    }
2580
2581    #[test]
2582    fn test_container_config_parses_network_create_params() {
2583        let config =
2584            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2585                .unwrap();
2586        assert_eq!(config.operation, "network-create");
2587        assert_eq!(config.name.as_deref(), Some("my-net"));
2588        assert_eq!(config.driver.as_deref(), Some("bridge"));
2589    }
2590
2591    #[test]
2592    fn test_container_config_defaults_new_fields() {
2593        let config = ContainerConfig::from_uri("container:list").unwrap();
2594        assert!(config.volumes.is_none());
2595        assert!(config.user.is_none());
2596        assert!(config.workdir.is_none());
2597        assert!(!config.detach);
2598        assert!(config.driver.is_none());
2599        assert!(!config.force);
2600    }
2601
2602    #[test]
2603    fn test_parse_volumes_bind_mount() {
2604        let config = ContainerConfig::from_uri(
2605            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2606        )
2607        .unwrap();
2608        let (binds, anon) = config.parse_volumes().unwrap();
2609        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2610        assert!(anon.is_empty());
2611    }
2612
2613    #[test]
2614    fn test_parse_volumes_named_volume() {
2615        let config =
2616            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2617                .unwrap();
2618        let (binds, anon) = config.parse_volumes().unwrap();
2619        assert_eq!(binds, vec!["data:/var/lib/data"]);
2620        assert!(anon.is_empty());
2621    }
2622
2623    #[test]
2624    fn test_parse_volumes_anonymous() {
2625        let config =
2626            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2627        let (binds, anon) = config.parse_volumes().unwrap();
2628        assert!(binds.is_empty());
2629        assert!(anon.contains_key("/tmp/app-data"));
2630    }
2631
2632    #[test]
2633    fn test_parse_volumes_anonymous_with_mode() {
2634        let config =
2635            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2636                .unwrap();
2637        let (binds, anon) = config.parse_volumes().unwrap();
2638        assert!(binds.is_empty());
2639        assert!(anon.contains_key("/tmp/app-data"));
2640    }
2641
2642    #[test]
2643    fn test_parse_volumes_multiple() {
2644        let config = ContainerConfig::from_uri(
2645            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2646        )
2647        .unwrap();
2648        let (binds, anon) = config.parse_volumes().unwrap();
2649        assert_eq!(binds.len(), 2);
2650        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2651        assert!(binds.contains(&"data:/var/log/app".to_string()));
2652        assert!(anon.is_empty());
2653    }
2654
2655    #[test]
2656    fn test_parse_volumes_mixed() {
2657        let config = ContainerConfig::from_uri(
2658            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2659        )
2660        .unwrap();
2661        let (binds, anon) = config.parse_volumes().unwrap();
2662        assert_eq!(binds.len(), 1);
2663        assert!(anon.contains_key("/tmp/cache"));
2664    }
2665
2666    #[test]
2667    fn test_parse_volumes_none() {
2668        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2669        assert!(config.parse_volumes().is_none());
2670    }
2671
2672    #[test]
2673    fn test_parse_volumes_empty_entry_skipped() {
2674        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2675        assert!(config.parse_volumes().is_none());
2676    }
2677
2678    #[test]
2679    fn test_parse_volumes_rw_mode() {
2680        let config =
2681            ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2682                .unwrap();
2683        let (binds, _) = config.parse_volumes().unwrap();
2684        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2685    }
2686
2687    #[tokio::test]
2688    async fn test_container_producer_network_lifecycle() {
2689        let docker = match Docker::connect_with_local_defaults() {
2690            Ok(d) => d,
2691            Err(_) => {
2692                eprintln!("Skipping test: Could not connect to Docker daemon");
2693                return;
2694            }
2695        };
2696        if docker.ping().await.is_err() {
2697            eprintln!("Skipping test: Docker daemon not responding to ping");
2698            return;
2699        }
2700
2701        let network_name = format!("camel-test-{}", std::process::id());
2702
2703        let component = ContainerComponent::new();
2704        let component_ctx = NoOpComponentContext;
2705
2706        // Create network
2707        let endpoint = component
2708            .create_endpoint(
2709                &format!("container:network-create?name={}", network_name),
2710                &component_ctx,
2711            )
2712            .unwrap();
2713        let ctx = ProducerContext::new();
2714        let mut producer = endpoint.create_producer(&ctx).unwrap();
2715
2716        let mut exchange = Exchange::new(Message::new(""));
2717        exchange.input.set_header(
2718            HEADER_ACTION,
2719            serde_json::Value::String("network-create".into()),
2720        );
2721
2722        use tower::ServiceExt;
2723        let result = producer
2724            .ready()
2725            .await
2726            .unwrap()
2727            .call(exchange)
2728            .await
2729            .expect("Network create should succeed");
2730
2731        let network_id = result
2732            .input
2733            .header(HEADER_NETWORK)
2734            .and_then(|v| v.as_str().map(|s| s.to_string()))
2735            .expect("Should have network ID");
2736
2737        assert!(!network_id.is_empty());
2738
2739        // List networks
2740        let endpoint = component
2741            .create_endpoint("container:network-list", &component_ctx)
2742            .unwrap();
2743        let mut producer = endpoint.create_producer(&ctx).unwrap();
2744
2745        let mut exchange = Exchange::new(Message::new(""));
2746        exchange.input.set_header(
2747            HEADER_ACTION,
2748            serde_json::Value::String("network-list".into()),
2749        );
2750
2751        let list_result = producer
2752            .ready()
2753            .await
2754            .unwrap()
2755            .call(exchange)
2756            .await
2757            .expect("Network list should succeed");
2758
2759        match &list_result.input.body {
2760            Body::Json(json_value) => {
2761                assert!(json_value.is_array(), "Expected JSON array");
2762            }
2763            other => panic!("Expected Body::Json, got: {:?}", other),
2764        }
2765
2766        // Remove network
2767        let endpoint = component
2768            .create_endpoint(
2769                &format!("container:network-remove?network={}", network_name),
2770                &component_ctx,
2771            )
2772            .unwrap();
2773        let mut producer = endpoint.create_producer(&ctx).unwrap();
2774
2775        let mut exchange = Exchange::new(Message::new(""));
2776        exchange.input.set_header(
2777            HEADER_ACTION,
2778            serde_json::Value::String("network-remove".into()),
2779        );
2780
2781        let remove_result = producer
2782            .ready()
2783            .await
2784            .unwrap()
2785            .call(exchange)
2786            .await
2787            .expect("Network remove should succeed");
2788
2789        let action_result = remove_result
2790            .input
2791            .header(HEADER_ACTION_RESULT)
2792            .and_then(|v| v.as_str());
2793        assert_eq!(action_result, Some("success"));
2794    }
2795}