Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24/// Global tracker for containers created by this component.
25/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
26static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29/// Registers a container ID for tracking (will be cleaned up on shutdown).
30fn track_container(id: String) {
31    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32        tracker.insert(id);
33    }
34}
35
36/// Removes a container ID from tracking (when it's been removed naturally).
37fn untrack_container(id: &str) {
38    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39        tracker.remove(id);
40    }
41}
42
43/// Cleans up all tracked containers. Call this on application shutdown.
44pub async fn cleanup_tracked_containers() {
45    let ids: Vec<String> = {
46        match CONTAINER_TRACKER.lock() {
47            Ok(tracker) => tracker.iter().cloned().collect(),
48            Err(_) => return,
49        }
50    };
51
52    if ids.is_empty() {
53        return;
54    }
55
56    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58    let docker = match Docker::connect_with_local_defaults() {
59        Ok(d) => d,
60        Err(e) => {
61            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62            return;
63        }
64    };
65
66    for id in ids {
67        match docker
68            .remove_container(
69                &id,
70                Some(bollard::container::RemoveContainerOptions {
71                    force: true,
72                    ..Default::default()
73                }),
74            )
75            .await
76        {
77            Ok(_) => {
78                tracing::debug!("Cleaned up container {}", id);
79                untrack_container(&id);
80            }
81            Err(e) => {
82                tracing::warn!("Failed to cleanup container {}: {}", id, e);
83            }
84        }
85    }
86}
87
88// Header constants for container operations
89
90/// Timeout (seconds) for connecting to the Docker daemon.
91const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
94pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96/// Header key for specifying the container image to use for "run" operations.
97pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99/// Header key for specifying or receiving the container ID.
100pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102/// Header key for the log stream type (stdout or stderr).
103pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105/// Header key for the log timestamp.
106pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108/// Header key for specifying the container name for "run" operations.
109pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111/// Header key for the result status of a container operation (e.g., "success").
112pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114/// Header key for specifying the command to execute in a container.
115pub const HEADER_CMD: &str = "CamelContainerCmd";
116
117/// Header key for specifying the network name for network operations.
118pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
119
120/// Header key for the exit code of an exec operation.
121pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
122
123/// Header key for specifying volume mounts.
124pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
125
126/// Header key for the exec instance ID.
127pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
128
129// ---------------------------------------------------------------------------
130// ContainerGlobalConfig
131// ---------------------------------------------------------------------------
132
133/// Global configuration for Container component.
134/// Supports serde deserialization with defaults and builder methods.
135/// These are the fallback defaults when URI params are not set.
136#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
137#[serde(default)]
138pub struct ContainerGlobalConfig {
139    /// The Docker host URL (default: "unix:///var/run/docker.sock").
140    pub docker_host: String,
141}
142
143impl Default for ContainerGlobalConfig {
144    fn default() -> Self {
145        Self {
146            docker_host: "unix:///var/run/docker.sock".to_string(),
147        }
148    }
149}
150
151impl ContainerGlobalConfig {
152    pub fn new() -> Self {
153        Self::default()
154    }
155
156    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
157        self.docker_host = v.into();
158        self
159    }
160}
161
162// ---------------------------------------------------------------------------
163// ContainerConfig (endpoint configuration)
164// ---------------------------------------------------------------------------
165
166/// Configuration for the container component endpoint.
167///
168/// This struct holds the parsed URI configuration including the operation type,
169/// optional container image, and Docker host connection details.
170#[derive(Debug, Clone)]
171pub struct ContainerConfig {
172    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
173    pub operation: String,
174    /// The container image to use for "run" operations (can be overridden via header).
175    pub image: Option<String>,
176    /// The container name to use for "run" operations (can be overridden via header).
177    pub name: Option<String>,
178    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
179    pub host: Option<String>,
180    /// Command to run in the container (e.g., "sleep 30").
181    pub cmd: Option<String>,
182    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
183    pub ports: Option<String>,
184    /// Environment variables in format "KEY=value,KEY2=value2".
185    pub env: Option<String>,
186    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
187    pub network: Option<String>,
188    /// Container ID or name for logs consumer.
189    pub container_id: Option<String>,
190    /// Follow log output (default: true for consumer).
191    pub follow: bool,
192    /// Include timestamps in logs (default: false).
193    pub timestamps: bool,
194    /// Number of lines to show from the end of logs (default: all).
195    pub tail: Option<String>,
196    /// Automatically pull the image if not present (default: true).
197    pub auto_pull: bool,
198    /// Automatically remove the container when it exits (default: true).
199    pub auto_remove: bool,
200    /// Volume mounts in format "host:container:ro" (e.g., "./html:/usr/share/nginx/html:ro").
201    pub volumes: Option<String>,
202    /// User to run the container or exec command as (e.g., "root").
203    pub user: Option<String>,
204    /// Working directory inside the container.
205    pub workdir: Option<String>,
206    /// Whether to detach from the exec process (default: false).
207    pub detach: bool,
208    /// Network driver for network-create (e.g., "bridge", "overlay").
209    pub driver: Option<String>,
210    /// Whether to force the operation (default: false).
211    pub force: bool,
212}
213
214impl ContainerConfig {
215    /// Parses a container URI into a `ContainerConfig`.
216    ///
217    /// # Arguments
218    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
219    ///
220    /// # Errors
221    /// Returns an error if the URI scheme is not "container".
222    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
223        let parts = parse_uri(uri)?;
224        if parts.scheme != "container" {
225            return Err(CamelError::InvalidUri(format!(
226                "expected scheme 'container', got '{}'",
227                parts.scheme
228            )));
229        }
230
231        let image = parts.params.get("image").cloned();
232        let name = parts.params.get("name").cloned();
233        let cmd = parts.params.get("cmd").cloned();
234        let ports = parts.params.get("ports").cloned();
235        let env = parts.params.get("env").cloned();
236        let network = parts.params.get("network").cloned();
237        let container_id = parts.params.get("containerId").cloned();
238        let follow = parts
239            .params
240            .get("follow")
241            .map(|v| v.eq_ignore_ascii_case("true"))
242            .unwrap_or(true);
243        let timestamps = parts
244            .params
245            .get("timestamps")
246            .map(|v| v.eq_ignore_ascii_case("true"))
247            .unwrap_or(false);
248        let tail = parts.params.get("tail").cloned();
249        let auto_pull = parts
250            .params
251            .get("autoPull")
252            .map(|v| v.eq_ignore_ascii_case("true"))
253            .unwrap_or(true);
254        let auto_remove = parts
255            .params
256            .get("autoRemove")
257            .map(|v| v.eq_ignore_ascii_case("true"))
258            .unwrap_or(true);
259        // host is only set from URI param; global config defaults are applied later
260        let host = parts.params.get("host").cloned();
261        let volumes = parts.params.get("volumes").cloned();
262        let user = parts.params.get("user").cloned();
263        let workdir = parts.params.get("workdir").cloned();
264        let detach = parts
265            .params
266            .get("detach")
267            .map(|v| v.eq_ignore_ascii_case("true"))
268            .unwrap_or(false);
269        let driver = parts.params.get("driver").cloned();
270        let force = parts
271            .params
272            .get("force")
273            .map(|v| v.eq_ignore_ascii_case("true"))
274            .unwrap_or(false);
275
276        Ok(Self {
277            operation: parts.path,
278            image,
279            name,
280            host,
281            cmd,
282            ports,
283            env,
284            network,
285            container_id,
286            follow,
287            timestamps,
288            tail,
289            auto_pull,
290            auto_remove,
291            volumes,
292            user,
293            workdir,
294            detach,
295            driver,
296            force,
297        })
298    }
299
300    /// Apply global config defaults to this endpoint config.
301    /// Only sets values that are currently `None`.
302    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
303        if self.host.is_none() {
304            self.host = Some(global.docker_host.clone());
305        }
306    }
307
308    fn docker_socket_path(&self) -> Result<&str, CamelError> {
309        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
310            "npipe:////./pipe/docker_engine"
311        } else {
312            "unix:///var/run/docker.sock"
313        });
314
315        if host.starts_with("unix://") || host.starts_with("npipe://") {
316            return Ok(host);
317        }
318
319        if host.contains("://") {
320            return Err(CamelError::ProcessorError(format!(
321                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
322                host
323            )));
324        }
325
326        Ok(host)
327    }
328
329    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
330        let socket_path = self.docker_socket_path()?;
331        Docker::connect_with_socket(
332            socket_path,
333            DOCKER_CONNECT_TIMEOUT_SECS,
334            bollard::API_DEFAULT_VERSION,
335        )
336        .map_err(|e| {
337            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
338        })
339    }
340
341    /// Connects to the Docker daemon using the configured host.
342    ///
343    /// This method establishes a Unix socket connection to Docker and verifies
344    /// the connection by sending a ping request.
345    ///
346    /// # Errors
347    /// Returns an error if the connection fails or the ping request fails.
348    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
349        let docker = self.connect_docker_client()?;
350        docker
351            .ping()
352            .await
353            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
354        Ok(docker)
355    }
356
357    #[allow(clippy::type_complexity)]
358    fn parse_ports(
359        &self,
360    ) -> Option<(
361        HashMap<String, HashMap<(), ()>>,
362        HashMap<String, Option<Vec<PortBinding>>>,
363    )> {
364        let ports_str = self.ports.as_ref()?;
365
366        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
367        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
368
369        for mapping in ports_str.split(',') {
370            let mapping = mapping.trim();
371            if mapping.is_empty() {
372                continue;
373            }
374
375            let (host_port, container_spec) = mapping.split_once(':')?;
376
377            let (container_port, protocol) = if container_spec.contains('/') {
378                let parts: Vec<&str> = container_spec.split('/').collect();
379                (parts[0], parts[1])
380            } else {
381                (container_spec, "tcp")
382            };
383
384            let container_key = format!("{}/{}", container_port, protocol);
385
386            exposed_ports.insert(container_key.clone(), HashMap::new());
387
388            port_bindings.insert(
389                container_key,
390                Some(vec![PortBinding {
391                    host_ip: None,
392                    host_port: Some(host_port.to_string()),
393                }]),
394            );
395        }
396
397        if exposed_ports.is_empty() {
398            None
399        } else {
400            Some((exposed_ports, port_bindings))
401        }
402    }
403
404    fn parse_env(&self) -> Option<Vec<String>> {
405        let env_str = self.env.as_ref()?;
406
407        let env_vars: Vec<String> = env_str
408            .split(',')
409            .map(|s| s.trim().to_string())
410            .filter(|s| !s.is_empty())
411            .collect();
412
413        if env_vars.is_empty() {
414            None
415        } else {
416            Some(env_vars)
417        }
418    }
419
420    #[cfg(test)]
421    #[allow(clippy::type_complexity)]
422    fn parse_volumes(&self) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
423        self.volumes.as_deref().and_then(parse_volume_str)
424    }
425}
426
427/// Parses a volume specification string into bind mounts and anonymous volumes.
428///
429/// Format: `host:container:ro|rw` for bind mounts, `path` for anonymous volumes,
430/// `path:ro|rw` for anonymous volumes with mode, or `name:container` for named volumes.
431#[allow(clippy::type_complexity)]
432fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
433    let mut binds: Vec<String> = Vec::new();
434    let mut anonymous_volumes: HashMap<String, HashMap<(), ()>> = HashMap::new();
435
436    for entry in volumes_str.split(',') {
437        let entry = entry.trim();
438        if entry.is_empty() {
439            continue;
440        }
441
442        let segments: Vec<&str> = entry.split(':').collect();
443
444        match segments.len() {
445            3 => {
446                let source = segments[0];
447                let target = segments[1];
448                let mode = segments[2];
449                if mode != "ro" && mode != "rw" {
450                    continue;
451                }
452                binds.push(format!("{}:{}:{}", source, target, mode));
453            }
454            2 => {
455                let a = segments[0];
456                let b = segments[1];
457                if b == "ro" || b == "rw" {
458                    anonymous_volumes.insert(a.to_string(), HashMap::new());
459                } else {
460                    binds.push(format!("{}:{}", a, b));
461                }
462            }
463            1 => {
464                anonymous_volumes.insert(segments[0].to_string(), HashMap::new());
465            }
466            _ => continue,
467        }
468    }
469
470    if binds.is_empty() && anonymous_volumes.is_empty() {
471        None
472    } else {
473        Some((binds, anonymous_volumes))
474    }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478enum ProducerOperation {
479    List,
480    Run,
481    Start,
482    Stop,
483    Remove,
484    Exec,
485    NetworkCreate,
486    NetworkConnect,
487    NetworkDisconnect,
488    NetworkRemove,
489    NetworkList,
490}
491
492fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
493    match operation {
494        "list" => Ok(ProducerOperation::List),
495        "run" => Ok(ProducerOperation::Run),
496        "start" => Ok(ProducerOperation::Start),
497        "stop" => Ok(ProducerOperation::Stop),
498        "remove" => Ok(ProducerOperation::Remove),
499        "exec" => Ok(ProducerOperation::Exec),
500        "network-create" => Ok(ProducerOperation::NetworkCreate),
501        "network-connect" => Ok(ProducerOperation::NetworkConnect),
502        "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
503        "network-remove" => Ok(ProducerOperation::NetworkRemove),
504        "network-list" => Ok(ProducerOperation::NetworkList),
505        _ => Err(CamelError::ProcessorError(format!(
506            "Unknown container operation: {}",
507            operation
508        ))),
509    }
510}
511
512fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
513    exchange
514        .input
515        .header(HEADER_CONTAINER_NAME)
516        .and_then(|v| v.as_str().map(|s| s.to_string()))
517        .or_else(|| config.name.clone())
518}
519
520async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
521    let images = docker
522        .list_images::<&str>(None)
523        .await
524        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
525
526    Ok(images.iter().any(|img| {
527        img.repo_tags
528            .iter()
529            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
530    }))
531}
532
533async fn pull_image_with_progress(
534    docker: &Docker,
535    image: &str,
536    timeout_secs: u64,
537) -> Result<(), CamelError> {
538    use futures::StreamExt;
539
540    tracing::info!("Pulling image: {}", image);
541
542    let mut stream = docker.create_image(
543        Some(bollard::image::CreateImageOptions {
544            from_image: image,
545            ..Default::default()
546        }),
547        None,
548        None,
549    );
550
551    let start = std::time::Instant::now();
552    let mut last_progress = std::time::Instant::now();
553
554    while let Some(item) = stream.next().await {
555        if start.elapsed().as_secs() > timeout_secs {
556            return Err(CamelError::ProcessorError(format!(
557                "Image pull timeout after {}s. Try manually: docker pull {}",
558                timeout_secs, image
559            )));
560        }
561
562        match item {
563            Ok(update) => {
564                // Log progress every 2 seconds
565                if last_progress.elapsed().as_secs() >= 2 {
566                    if let Some(status) = update.status {
567                        tracing::debug!("Pull progress: {}", status);
568                    }
569                    last_progress = std::time::Instant::now();
570                }
571            }
572            Err(e) => {
573                let err_str = e.to_string().to_lowercase();
574                if err_str.contains("unauthorized") || err_str.contains("401") {
575                    return Err(CamelError::ProcessorError(format!(
576                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
577                        image
578                    )));
579                }
580                if err_str.contains("not found") || err_str.contains("404") {
581                    return Err(CamelError::ProcessorError(format!(
582                        "Image '{}' not found in registry. Check the image name and tag",
583                        image
584                    )));
585                }
586                return Err(CamelError::ProcessorError(format!(
587                    "Failed to pull image '{}': {}",
588                    image, e
589                )));
590            }
591        }
592    }
593
594    tracing::info!("Successfully pulled image: {}", image);
595    Ok(())
596}
597
598async fn ensure_image_available(
599    docker: &Docker,
600    image: &str,
601    auto_pull: bool,
602    timeout_secs: u64,
603) -> Result<(), CamelError> {
604    if image_exists_locally(docker, image).await? {
605        tracing::debug!("Image '{}' already available locally", image);
606        return Ok(());
607    }
608
609    if !auto_pull {
610        return Err(CamelError::ProcessorError(format!(
611            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
612            image, image
613        )));
614    }
615
616    pull_image_with_progress(docker, image, timeout_secs).await
617}
618
619fn format_docker_event(event: &bollard::models::EventMessage) -> String {
620    let action = event.action.as_deref().unwrap_or("unknown");
621    let actor = event.actor.as_ref();
622
623    let container_name = actor
624        .and_then(|a| a.attributes.as_ref())
625        .and_then(|attrs| attrs.get("name"))
626        .map(|s| s.as_str())
627        .unwrap_or("unknown");
628
629    let image = actor
630        .and_then(|a| a.attributes.as_ref())
631        .and_then(|attrs| attrs.get("image"))
632        .map(|s| s.as_str())
633        .unwrap_or("");
634
635    let exit_code = actor
636        .and_then(|a| a.attributes.as_ref())
637        .and_then(|attrs| attrs.get("exitCode"))
638        .map(|s| s.as_str());
639
640    match action {
641        "create" => {
642            if image.is_empty() {
643                format!("[CREATE] Container {}", container_name)
644            } else {
645                format!("[CREATE] Container {} ({})", container_name, image)
646            }
647        }
648        "start" => format!("[START]  Container {}", container_name),
649        "die" => {
650            if let Some(code) = exit_code {
651                format!("[DIE]    Container {} (exit: {})", container_name, code)
652            } else {
653                format!("[DIE]    Container {}", container_name)
654            }
655        }
656        "destroy" => format!("[DESTROY] Container {}", container_name),
657        "stop" => format!("[STOP]   Container {}", container_name),
658        "pause" => format!("[PAUSE]  Container {}", container_name),
659        "unpause" => format!("[UNPAUSE] Container {}", container_name),
660        "restart" => format!("[RESTART] Container {}", container_name),
661        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
662    }
663}
664
665async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
666    create: CreateFn,
667    start: StartFn,
668    remove: RemoveFn,
669) -> Result<String, CamelError>
670where
671    CreateFn: FnOnce() -> CreateFut,
672    CreateFut: Future<Output = Result<String, CamelError>>,
673    StartFn: FnOnce(String) -> StartFut,
674    StartFut: Future<Output = Result<(), CamelError>>,
675    RemoveFn: FnOnce(String) -> RemoveFut,
676    RemoveFut: Future<Output = Result<(), CamelError>>,
677{
678    let container_id = create().await?;
679    if let Err(start_err) = start(container_id.clone()).await {
680        if let Err(remove_err) = remove(container_id.clone()).await {
681            return Err(CamelError::ProcessorError(format!(
682                "Failed to start container: {}. Cleanup failed: {}",
683                start_err, remove_err
684            )));
685        }
686        return Err(start_err);
687    }
688
689    Ok(container_id)
690}
691
692async fn handle_list(
693    docker: Docker,
694    _config: ContainerConfig,
695    exchange: &mut Exchange,
696) -> Result<(), CamelError> {
697    let containers = docker.list_containers::<String>(None).await.map_err(|e| {
698        CamelError::ProcessorError(format!("Failed to list containers: {}", e))
699    })?;
700
701    let json_value = serde_json::to_value(&containers).map_err(|e| {
702        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
703    })?;
704
705    exchange.input.body = Body::Json(json_value);
706    exchange.input.set_header(
707        HEADER_ACTION_RESULT,
708        serde_json::Value::String("success".to_string()),
709    );
710    Ok(())
711}
712
713async fn handle_run(
714    docker: Docker,
715    config: ContainerConfig,
716    exchange: &mut Exchange,
717) -> Result<(), CamelError> {
718    let image = exchange
719        .input
720        .header(HEADER_IMAGE)
721        .and_then(|v| v.as_str().map(|s| s.to_string()))
722        .or(config.image.clone())
723        .ok_or_else(|| {
724            CamelError::ProcessorError(
725                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
726            )
727        })?;
728
729    let pull_timeout = 300;
730    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
731        .await
732        .map_err(|e| {
733            CamelError::ProcessorError(format!(
734                "Image '{}' not available: {}",
735                image, e
736            ))
737        })?;
738
739    let container_name = resolve_container_name(exchange, &config);
740    let container_name_ref = container_name.as_deref().unwrap_or("");
741    let cmd_parts: Option<Vec<String>> = config
742        .cmd
743        .as_ref()
744        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
745    let auto_remove = config.auto_remove;
746    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
747    let env_vars = config.parse_env();
748    let network_mode = config.network.clone();
749
750    let volumes_str = exchange
751        .input
752        .header(HEADER_VOLUMES)
753        .and_then(|v| v.as_str().map(|s| s.to_string()))
754        .or(config.volumes.clone());
755    let (binds, anon_volumes) = volumes_str
756        .as_deref()
757        .and_then(parse_volume_str)
758        .unwrap_or_default();
759
760    let docker_create = docker.clone();
761    let docker_start = docker.clone();
762    let docker_remove = docker.clone();
763
764    let container_id = run_container_with_cleanup(
765        move || async move {
766            let create_options = bollard::container::CreateContainerOptions {
767                name: container_name_ref,
768                ..Default::default()
769            };
770            let container_config = bollard::container::Config::<String> {
771                image: Some(image.clone()),
772                cmd: cmd_parts,
773                env: env_vars,
774                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
775                volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
776                host_config: Some(HostConfig {
777                    auto_remove: Some(auto_remove),
778                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
779                    network_mode,
780                    binds: if binds.is_empty() { None } else { Some(binds) },
781                    ..Default::default()
782                }),
783                ..Default::default()
784            };
785
786            let create_response = docker_create
787                .create_container(Some(create_options), container_config)
788                .await
789                .map_err(|e| {
790                    let err_str = e.to_string().to_lowercase();
791                    if err_str.contains("409") || err_str.contains("conflict") {
792                        CamelError::ProcessorError(format!(
793                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
794                            container_name_ref
795                        ))
796                    } else {
797                        CamelError::ProcessorError(format!(
798                            "Failed to create container: {}",
799                            e
800                        ))
801                    }
802                })?;
803
804            Ok(create_response.id)
805        },
806        move |container_id| async move {
807            docker_start
808                .start_container::<String>(&container_id, None)
809                .await
810                .map_err(|e| {
811                    CamelError::ProcessorError(format!(
812                        "Failed to start container: {}",
813                        e
814                    ))
815                })
816        },
817        move |container_id| async move {
818            docker_remove
819                .remove_container(&container_id, None)
820                .await
821                .map_err(|e| {
822                    CamelError::ProcessorError(format!(
823                        "Failed to remove container after start failure: {}",
824                        e
825                    ))
826                })
827        },
828    )
829    .await?;
830
831    track_container(container_id.clone());
832
833    exchange
834        .input
835        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
836    exchange.input.set_header(
837        HEADER_ACTION_RESULT,
838        serde_json::Value::String("success".to_string()),
839    );
840    Ok(())
841}
842
843async fn handle_lifecycle(
844    docker: Docker,
845    _config: ContainerConfig,
846    exchange: &mut Exchange,
847    operation: ProducerOperation,
848    operation_name: &str,
849) -> Result<(), CamelError> {
850    let container_id = exchange
851        .input
852        .header(HEADER_CONTAINER_ID)
853        .and_then(|v| v.as_str().map(|s| s.to_string()))
854        .ok_or_else(|| {
855            CamelError::ProcessorError(format!(
856                "{} header is required for {} operation",
857                HEADER_CONTAINER_ID, operation_name
858            ))
859        })?;
860
861    match operation {
862        ProducerOperation::Start => {
863            docker
864                .start_container::<String>(&container_id, None)
865                .await
866                .map_err(|e| {
867                    CamelError::ProcessorError(format!(
868                        "Failed to start container: {}",
869                        e
870                    ))
871                })?;
872        }
873        ProducerOperation::Stop => {
874            docker
875                .stop_container(&container_id, None)
876                .await
877                .map_err(|e| {
878                    CamelError::ProcessorError(format!(
879                        "Failed to stop container: {}",
880                        e
881                    ))
882                })?;
883        }
884        ProducerOperation::Remove => {
885            docker
886                .remove_container(&container_id, None)
887                .await
888                .map_err(|e| {
889                    CamelError::ProcessorError(format!(
890                        "Failed to remove container: {}",
891                        e
892                    ))
893                })?;
894            untrack_container(&container_id);
895        }
896        _ => {}
897    }
898
899    exchange.input.set_header(
900        HEADER_ACTION_RESULT,
901        serde_json::Value::String("success".to_string()),
902    );
903    Ok(())
904}
905
906async fn handle_exec(
907    docker: Docker,
908    config: ContainerConfig,
909    exchange: &mut Exchange,
910) -> Result<(), CamelError> {
911    let container_id = exchange
912        .input
913        .header(HEADER_CONTAINER_ID)
914        .and_then(|v| v.as_str().map(|s| s.to_string()))
915        .or(config.container_id.clone())
916        .ok_or_else(|| {
917            CamelError::ProcessorError(format!(
918                "{} header or containerId param is required for exec operation",
919                HEADER_CONTAINER_ID
920            ))
921        })?;
922
923    let cmd = exchange
924        .input
925        .header(HEADER_CMD)
926        .and_then(|v| v.as_str().map(|s| s.to_string()))
927        .or(config.cmd.clone())
928        .ok_or_else(|| {
929            CamelError::ProcessorError(
930                "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
931            )
932        })?;
933
934    let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
935    let env_vars = config.parse_env();
936
937    let exec_config = bollard::exec::CreateExecOptions {
938        cmd: Some(cmd_parts),
939        env: env_vars,
940        user: config.user.clone(),
941        working_dir: config.workdir.clone(),
942        attach_stdout: Some(true),
943        attach_stderr: Some(true),
944        ..Default::default()
945    };
946
947    let create_result = docker
948        .create_exec(&container_id, exec_config)
949        .await
950        .map_err(|e| {
951            let err_str = e.to_string().to_lowercase();
952            if err_str.contains("404") || err_str.contains("no such") {
953                CamelError::ProcessorError(format!(
954                    "Container '{}' not found for exec",
955                    container_id
956                ))
957            } else {
958                CamelError::ProcessorError(format!("Failed to create exec: {}", e))
959            }
960        })?;
961
962    let exec_id = create_result.id;
963
964    if config.detach {
965        docker
966            .start_exec(
967                &exec_id,
968                Some(bollard::exec::StartExecOptions {
969                    detach: true,
970                    ..Default::default()
971                }),
972            )
973            .await
974            .map_err(|e| {
975                CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
976            })?;
977
978        exchange.input.set_header(
979            HEADER_EXEC_ID,
980            serde_json::Value::String(exec_id),
981        );
982        exchange.input.set_header(
983            HEADER_CONTAINER_ID,
984            serde_json::Value::String(container_id),
985        );
986    } else {
987        let start_result = docker
988            .start_exec(&exec_id, None)
989            .await
990            .map_err(|e| {
991                CamelError::ProcessorError(format!("Failed to start exec: {}", e))
992            })?;
993
994        let mut output = String::new();
995
996        match start_result {
997            bollard::exec::StartExecResults::Attached { output: mut stream, .. } => {
998                use futures::StreamExt;
999                while let Some(msg) = stream.next().await {
1000                    match msg {
1001                        Ok(bollard::container::LogOutput::StdOut { message }) => {
1002                            output.push_str(&String::from_utf8_lossy(&message));
1003                        }
1004                        Ok(bollard::container::LogOutput::StdErr { message }) => {
1005                            output.push_str(&String::from_utf8_lossy(&message));
1006                        }
1007                        Ok(_) => {}
1008                        Err(e) => {
1009                            output.push_str(&format!("[error reading stream: {}]", e));
1010                        }
1011                    }
1012                }
1013            }
1014            bollard::exec::StartExecResults::Detached => {}
1015        }
1016
1017        let inspect = docker.inspect_exec(&exec_id).await.map_err(|e| {
1018            CamelError::ProcessorError(format!("Failed to inspect exec: {}", e))
1019        })?;
1020
1021        let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1022
1023        let output = output.trim_end().to_string();
1024        exchange.input.body = Body::Text(output);
1025        exchange.input.set_header(
1026            HEADER_EXIT_CODE,
1027            serde_json::Value::Number(exit_code.into()),
1028        );
1029        exchange.input.set_header(
1030            HEADER_CONTAINER_ID,
1031            serde_json::Value::String(container_id),
1032        );
1033    }
1034
1035    exchange.input.set_header(
1036        HEADER_ACTION_RESULT,
1037        serde_json::Value::String("success".to_string()),
1038    );
1039    Ok(())
1040}
1041
1042async fn handle_network_create(
1043    docker: Docker,
1044    config: ContainerConfig,
1045    exchange: &mut Exchange,
1046) -> Result<(), CamelError> {
1047    let network_name = exchange
1048        .input
1049        .header(HEADER_CONTAINER_NAME)
1050        .and_then(|v| v.as_str().map(|s| s.to_string()))
1051        .or(config.name.clone())
1052        .ok_or_else(|| {
1053            CamelError::ProcessorError(
1054                "CamelContainerName header or name param is required for network-create".to_string(),
1055            )
1056        })?;
1057
1058    let driver = config.driver.as_deref().unwrap_or("bridge");
1059
1060    let options = bollard::network::CreateNetworkOptions {
1061        name: network_name.clone(),
1062        driver: driver.to_string(),
1063        ..Default::default()
1064    };
1065
1066    let result = docker.create_network(options).await.map_err(|e| {
1067        let err_str = e.to_string().to_lowercase();
1068        if err_str.contains("409") || err_str.contains("already exists") {
1069            CamelError::ProcessorError(format!(
1070                "Network '{}' already exists",
1071                network_name
1072            ))
1073        } else {
1074            CamelError::ProcessorError(format!("Failed to create network: {}", e))
1075        }
1076    })?;
1077
1078    let network_id = result.id.clone();
1079    let json_value = serde_json::to_value(&result).map_err(|e| {
1080        CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1081    })?;
1082
1083    exchange.input.body = Body::Json(json_value);
1084    exchange.input.set_header(
1085        HEADER_NETWORK,
1086        serde_json::Value::String(network_id),
1087    );
1088    exchange.input.set_header(
1089        HEADER_ACTION_RESULT,
1090        serde_json::Value::String("success".to_string()),
1091    );
1092    Ok(())
1093}
1094
1095async fn handle_network_connect(
1096    docker: Docker,
1097    config: ContainerConfig,
1098    exchange: &mut Exchange,
1099) -> Result<(), CamelError> {
1100    let network = exchange
1101        .input
1102        .header(HEADER_NETWORK)
1103        .and_then(|v| v.as_str().map(|s| s.to_string()))
1104        .or(config.network.clone())
1105        .ok_or_else(|| {
1106            CamelError::ProcessorError(
1107                "CamelContainerNetwork header or network param is required for network-connect"
1108                    .to_string(),
1109            )
1110        })?;
1111
1112    let container = exchange
1113        .input
1114        .header(HEADER_CONTAINER_ID)
1115        .and_then(|v| v.as_str().map(|s| s.to_string()))
1116        .or(config.container_id.clone())
1117        .ok_or_else(|| {
1118            CamelError::ProcessorError(
1119                "CamelContainerId header or container param is required for network-connect"
1120                    .to_string(),
1121            )
1122        })?;
1123
1124    docker
1125        .connect_network(
1126            &network,
1127            bollard::network::ConnectNetworkOptions {
1128                container,
1129                ..Default::default()
1130            },
1131        )
1132        .await
1133        .map_err(|e| {
1134            let err_str = e.to_string().to_lowercase();
1135            if err_str.contains("404") || err_str.contains("not found") {
1136                CamelError::ProcessorError(format!(
1137                    "Network '{}' or container not found",
1138                    network
1139                ))
1140            } else {
1141                CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1142            }
1143        })?;
1144
1145    exchange.input.set_header(
1146        HEADER_ACTION_RESULT,
1147        serde_json::Value::String("success".to_string()),
1148    );
1149    Ok(())
1150}
1151
1152async fn handle_network_disconnect(
1153    docker: Docker,
1154    config: ContainerConfig,
1155    exchange: &mut Exchange,
1156) -> Result<(), CamelError> {
1157    let network = exchange
1158        .input
1159        .header(HEADER_NETWORK)
1160        .and_then(|v| v.as_str().map(|s| s.to_string()))
1161        .or(config.network.clone())
1162        .ok_or_else(|| {
1163            CamelError::ProcessorError(
1164                "CamelContainerNetwork header or network param is required for network-disconnect"
1165                    .to_string(),
1166            )
1167        })?;
1168
1169    let container = exchange
1170        .input
1171        .header(HEADER_CONTAINER_ID)
1172        .and_then(|v| v.as_str().map(|s| s.to_string()))
1173        .or(config.container_id.clone())
1174        .ok_or_else(|| {
1175            CamelError::ProcessorError(
1176                "CamelContainerId header or container param is required for network-disconnect"
1177                    .to_string(),
1178            )
1179        })?;
1180
1181    docker
1182        .disconnect_network(
1183            &network,
1184            bollard::network::DisconnectNetworkOptions {
1185                container,
1186                force: config.force,
1187            },
1188        )
1189        .await
1190        .map_err(|e| {
1191            let err_str = e.to_string().to_lowercase();
1192            if err_str.contains("404") || err_str.contains("not found") {
1193                CamelError::ProcessorError(format!(
1194                    "Network '{}' or container not found",
1195                    network
1196                ))
1197            } else {
1198                CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1199            }
1200        })?;
1201
1202    exchange.input.set_header(
1203        HEADER_ACTION_RESULT,
1204        serde_json::Value::String("success".to_string()),
1205    );
1206    Ok(())
1207}
1208
1209async fn handle_network_remove(
1210    docker: Docker,
1211    config: ContainerConfig,
1212    exchange: &mut Exchange,
1213) -> Result<(), CamelError> {
1214    let network = exchange
1215        .input
1216        .header(HEADER_NETWORK)
1217        .and_then(|v| v.as_str().map(|s| s.to_string()))
1218        .or(config.network.clone())
1219        .ok_or_else(|| {
1220            CamelError::ProcessorError(
1221                "CamelContainerNetwork header or network param is required for network-remove"
1222                    .to_string(),
1223            )
1224        })?;
1225
1226    docker
1227        .remove_network(&network)
1228        .await
1229        .map_err(|e| {
1230            let err_str = e.to_string().to_lowercase();
1231            if err_str.contains("404") || err_str.contains("not found") {
1232                CamelError::ProcessorError(format!("Network '{}' not found", network))
1233            } else if err_str.contains("409") || err_str.contains("in use") {
1234                CamelError::ProcessorError(format!(
1235                    "Network '{}' is in use and cannot be removed",
1236                    network
1237                ))
1238            } else {
1239                CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1240            }
1241        })?;
1242
1243    exchange.input.set_header(
1244        HEADER_ACTION_RESULT,
1245        serde_json::Value::String("success".to_string()),
1246    );
1247    Ok(())
1248}
1249
1250async fn handle_network_list(
1251    docker: Docker,
1252    _config: ContainerConfig,
1253    exchange: &mut Exchange,
1254) -> Result<(), CamelError> {
1255    let networks = docker.list_networks::<String>(None).await.map_err(|e| {
1256        CamelError::ProcessorError(format!("Failed to list networks: {}", e))
1257    })?;
1258
1259    let json_value = serde_json::to_value(&networks).map_err(|e| {
1260        CamelError::ProcessorError(format!("Failed to serialize networks: {}", e))
1261    })?;
1262
1263    exchange.input.body = Body::Json(json_value);
1264    exchange.input.set_header(
1265        HEADER_ACTION_RESULT,
1266        serde_json::Value::String("success".to_string()),
1267    );
1268    Ok(())
1269}
1270
1271/// Producer for executing container operations.
1272///
1273/// This producer handles synchronous container operations like listing,
1274/// creating, starting, stopping, and removing containers.
1275#[derive(Clone)]
1276pub struct ContainerProducer {
1277    config: ContainerConfig,
1278    docker: Docker,
1279}
1280
1281impl Service<Exchange> for ContainerProducer {
1282    type Response = Exchange;
1283    type Error = CamelError;
1284    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1285
1286    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1287        Poll::Ready(Ok(()))
1288    }
1289
1290    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1291        let config = self.config.clone();
1292        let docker = self.docker.clone();
1293        Box::pin(async move {
1294            let operation_name = exchange
1295                .input
1296                .header(HEADER_ACTION)
1297                .and_then(|v| v.as_str().map(|s| s.to_string()))
1298                .unwrap_or_else(|| config.operation.clone());
1299
1300            let operation = parse_producer_operation(&operation_name)?;
1301
1302            match operation {
1303                ProducerOperation::List => {
1304                    handle_list(docker, config, &mut exchange).await?;
1305                }
1306                ProducerOperation::Run => {
1307                    handle_run(docker, config, &mut exchange).await?;
1308                }
1309                ProducerOperation::Start => {
1310                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1311                        .await?;
1312                }
1313                ProducerOperation::Stop => {
1314                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1315                        .await?;
1316                }
1317                ProducerOperation::Remove => {
1318                    handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1319                        .await?;
1320                }
1321                ProducerOperation::Exec => {
1322                    handle_exec(docker, config, &mut exchange).await?;
1323                }
1324                ProducerOperation::NetworkCreate => {
1325                    handle_network_create(docker, config, &mut exchange).await?;
1326                }
1327                ProducerOperation::NetworkConnect => {
1328                    handle_network_connect(docker, config, &mut exchange).await?;
1329                }
1330                ProducerOperation::NetworkDisconnect => {
1331                    handle_network_disconnect(docker, config, &mut exchange).await?;
1332                }
1333                ProducerOperation::NetworkRemove => {
1334                    handle_network_remove(docker, config, &mut exchange).await?;
1335                }
1336                ProducerOperation::NetworkList => {
1337                    handle_network_list(docker, config, &mut exchange).await?;
1338                }
1339            }
1340
1341            Ok(exchange)
1342        })
1343    }
1344}
1345
1346/// Consumer for receiving Docker container events or logs.
1347///
1348/// This consumer subscribes to Docker events or container logs and forwards them
1349/// to the route as exchanges. It implements automatic reconnection on connection failures.
1350pub struct ContainerConsumer {
1351    config: ContainerConfig,
1352}
1353
1354#[async_trait]
1355impl Consumer for ContainerConsumer {
1356    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1357        match self.config.operation.as_str() {
1358            "events" => self.start_events_consumer(context).await,
1359            "logs" => self.start_logs_consumer(context).await,
1360            _ => Err(CamelError::EndpointCreationFailed(format!(
1361                "Consumer only supports 'events' or 'logs' operations, got '{}'",
1362                self.config.operation
1363            ))),
1364        }
1365    }
1366
1367    async fn stop(&mut self) -> Result<(), CamelError> {
1368        Ok(())
1369    }
1370
1371    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1372        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1373    }
1374}
1375
1376impl ContainerConsumer {
1377    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1378        use futures::StreamExt;
1379
1380        loop {
1381            if context.is_cancelled() {
1382                tracing::info!("Container events consumer shutting down");
1383                return Ok(());
1384            }
1385
1386            let docker = match self.config.connect_docker().await {
1387                Ok(d) => d,
1388                Err(e) => {
1389                    tracing::error!(
1390                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
1391                        e
1392                    );
1393                    tokio::select! {
1394                        _ = context.cancelled() => {
1395                            tracing::info!("Container events consumer shutting down");
1396                            return Ok(());
1397                        }
1398                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1399                    }
1400                    continue;
1401                }
1402            };
1403
1404            let mut event_stream = docker.events::<String>(None);
1405
1406            loop {
1407                tokio::select! {
1408                    _ = context.cancelled() => {
1409                        tracing::info!("Container events consumer shutting down");
1410                        return Ok(());
1411                    }
1412
1413                    msg = event_stream.next() => {
1414                        match msg {
1415                            Some(Ok(event)) => {
1416                                let formatted = format_docker_event(&event);
1417                                let message = Message::new(Body::Text(formatted));
1418                                let exchange = Exchange::new(message);
1419
1420                                if let Err(e) = context.send(exchange).await {
1421                                    tracing::error!("Failed to send exchange: {:?}", e);
1422                                    break;
1423                                }
1424                            }
1425                            Some(Err(e)) => {
1426                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1427                                break;
1428                            }
1429                            None => {
1430                                tracing::info!("Docker event stream ended. Reconnecting...");
1431                                break;
1432                            }
1433                        }
1434                    }
1435                }
1436            }
1437
1438            tokio::select! {
1439                _ = context.cancelled() => {
1440                    tracing::info!("Container events consumer shutting down");
1441                    return Ok(());
1442                }
1443                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1444            }
1445        }
1446    }
1447
1448    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1449        use futures::StreamExt;
1450
1451        let container_id = self.config.container_id.clone().ok_or_else(|| {
1452            CamelError::EndpointCreationFailed(
1453                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1454                    .to_string(),
1455            )
1456        })?;
1457
1458        loop {
1459            if context.is_cancelled() {
1460                tracing::info!("Container logs consumer shutting down");
1461                return Ok(());
1462            }
1463
1464            let docker = match self.config.connect_docker().await {
1465                Ok(d) => d,
1466                Err(e) => {
1467                    tracing::error!(
1468                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1469                        e
1470                    );
1471                    tokio::select! {
1472                        _ = context.cancelled() => {
1473                            tracing::info!("Container logs consumer shutting down");
1474                            return Ok(());
1475                        }
1476                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1477                    }
1478                    continue;
1479                }
1480            };
1481
1482            let tail = self
1483                .config
1484                .tail
1485                .clone()
1486                .unwrap_or_else(|| "all".to_string());
1487
1488            let options = bollard::container::LogsOptions::<String> {
1489                follow: self.config.follow,
1490                stdout: true,
1491                stderr: true,
1492                timestamps: self.config.timestamps,
1493                tail,
1494                ..Default::default()
1495            };
1496
1497            let mut log_stream = docker.logs(&container_id, Some(options));
1498            let container_id_header = container_id.clone();
1499
1500            loop {
1501                tokio::select! {
1502                    _ = context.cancelled() => {
1503                        tracing::info!("Container logs consumer shutting down");
1504                        return Ok(());
1505                    }
1506
1507                    msg = log_stream.next() => {
1508                        match msg {
1509                            Some(Ok(log_output)) => {
1510                                let (stream_type, content) = match log_output {
1511                                    bollard::container::LogOutput::StdOut { message } => {
1512                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
1513                                    }
1514                                    bollard::container::LogOutput::StdErr { message } => {
1515                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
1516                                    }
1517                                    bollard::container::LogOutput::Console { message } => {
1518                                        ("console", String::from_utf8_lossy(&message).into_owned())
1519                                    }
1520                                    bollard::container::LogOutput::StdIn { message } => {
1521                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
1522                                    }
1523                                };
1524
1525                                let content = content.trim_end();
1526                                if content.is_empty() {
1527                                    continue;
1528                                }
1529
1530                                let mut message = Message::new(Body::Text(content.to_string()));
1531                                message.set_header(
1532                                    HEADER_CONTAINER_ID,
1533                                    serde_json::Value::String(container_id_header.clone()),
1534                                );
1535                                message.set_header(
1536                                    HEADER_LOG_STREAM,
1537                                    serde_json::Value::String(stream_type.to_string()),
1538                                );
1539
1540                                if self.config.timestamps
1541                                    && let Some(ts) = extract_timestamp(content) {
1542                                        message.set_header(
1543                                            HEADER_LOG_TIMESTAMP,
1544                                            serde_json::Value::String(ts),
1545                                        );
1546                                    }
1547
1548                                let exchange = Exchange::new(message);
1549
1550                                if let Err(e) = context.send(exchange).await {
1551                                    tracing::error!("Failed to send log exchange: {:?}", e);
1552                                    break;
1553                                }
1554                            }
1555                            Some(Err(e)) => {
1556                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1557                                break;
1558                            }
1559                            None => {
1560                                if self.config.follow {
1561                                    tracing::info!("Docker log stream ended. Reconnecting...");
1562                                    break;
1563                                } else {
1564                                    tracing::info!("Container logs consumer finished (follow=false)");
1565                                    return Ok(());
1566                                }
1567                            }
1568                        }
1569                    }
1570                }
1571            }
1572
1573            tokio::select! {
1574                _ = context.cancelled() => {
1575                    tracing::info!("Container logs consumer shutting down");
1576                    return Ok(());
1577                }
1578                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1579            }
1580        }
1581    }
1582}
1583
1584fn extract_timestamp(log_line: &str) -> Option<String> {
1585    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1586    if parts.len() > 1 && parts[0].contains('T') {
1587        Some(parts[0].to_string())
1588    } else {
1589        None
1590    }
1591}
1592
1593/// Component for creating container endpoints.
1594///
1595/// This component handles URIs with the "container" scheme and creates
1596/// appropriate producer and consumer endpoints for Docker operations.
1597///
1598/// Containers created via `run` operation are tracked globally and can be
1599/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1600pub struct ContainerComponent {
1601    config: Option<ContainerGlobalConfig>,
1602}
1603
1604impl ContainerComponent {
1605    /// Creates a new container component instance without global config.
1606    pub fn new() -> Self {
1607        Self { config: None }
1608    }
1609
1610    /// Creates a container component with the given global config.
1611    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1612        Self {
1613            config: Some(config),
1614        }
1615    }
1616
1617    /// Creates a container component with optional global config.
1618    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1619        Self { config }
1620    }
1621}
1622
1623impl Default for ContainerComponent {
1624    fn default() -> Self {
1625        Self::new()
1626    }
1627}
1628
1629impl Component for ContainerComponent {
1630    fn scheme(&self) -> &str {
1631        "container"
1632    }
1633
1634    fn create_endpoint(
1635        &self,
1636        uri: &str,
1637        _ctx: &dyn camel_component_api::ComponentContext,
1638    ) -> Result<Box<dyn Endpoint>, CamelError> {
1639        let mut config = ContainerConfig::from_uri(uri)?;
1640        // Apply global defaults if present and URI didn't set them
1641        if let Some(ref global) = self.config {
1642            config.apply_global_defaults(global);
1643        }
1644        Ok(Box::new(ContainerEndpoint {
1645            uri: uri.to_string(),
1646            config,
1647        }))
1648    }
1649}
1650
1651/// Endpoint for container operations.
1652///
1653/// This endpoint creates producers for executing container operations
1654/// and consumers for receiving container events.
1655pub struct ContainerEndpoint {
1656    uri: String,
1657    config: ContainerConfig,
1658}
1659
1660impl ContainerEndpoint {
1661    /// Returns the Docker host configured for this endpoint.
1662    /// Returns `None` if not set (for testing purposes).
1663    pub fn docker_host(&self) -> Option<&str> {
1664        self.config.host.as_deref()
1665    }
1666}
1667
1668impl Endpoint for ContainerEndpoint {
1669    fn uri(&self) -> &str {
1670        &self.uri
1671    }
1672
1673    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1674        Ok(Box::new(ContainerConsumer {
1675            config: self.config.clone(),
1676        }))
1677    }
1678
1679    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1680        let docker = self.config.connect_docker_client()?;
1681        Ok(BoxProcessor::new(ContainerProducer {
1682            config: self.config.clone(),
1683            docker,
1684        }))
1685    }
1686}
1687
1688#[cfg(test)]
1689mod tests {
1690    use super::*;
1691    use camel_component_api::NoOpComponentContext;
1692
1693    #[test]
1694    fn test_container_config() {
1695        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1696        assert_eq!(config.operation, "run");
1697        assert_eq!(config.image.as_deref(), Some("alpine"));
1698        // host is None by default; global config applies it later
1699        assert!(config.host.is_none());
1700    }
1701
1702    #[test]
1703    fn test_global_config_applied_to_endpoint() {
1704        // When global config is set and URI doesn't specify host,
1705        // apply_global_defaults should set host from global config.
1706        let global =
1707            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1708        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1709        assert!(
1710            config.host.is_none(),
1711            "URI without ?host= should leave host as None"
1712        );
1713        config.apply_global_defaults(&global);
1714        assert_eq!(
1715            config.host.as_deref(),
1716            Some("unix:///custom/docker.sock"),
1717            "global docker_host must be applied when URI did not set host"
1718        );
1719    }
1720
1721    #[test]
1722    fn test_uri_param_wins_over_global_config() {
1723        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1724        let global =
1725            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1726        let mut config =
1727            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1728                .unwrap();
1729        assert_eq!(
1730            config.host.as_deref(),
1731            Some("unix:///override.sock"),
1732            "URI-set host should be parsed correctly"
1733        );
1734        config.apply_global_defaults(&global);
1735        assert_eq!(
1736            config.host.as_deref(),
1737            Some("unix:///override.sock"),
1738            "global config must NOT override a host already set by URI"
1739        );
1740    }
1741
1742    #[test]
1743    fn test_container_config_parses_name() {
1744        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1745        assert_eq!(config.name.as_deref(), Some("my-container"));
1746    }
1747
1748    #[test]
1749    fn test_parse_producer_operation_known() {
1750        assert_eq!(
1751            parse_producer_operation("list").unwrap(),
1752            ProducerOperation::List
1753        );
1754        assert_eq!(
1755            parse_producer_operation("run").unwrap(),
1756            ProducerOperation::Run
1757        );
1758        assert_eq!(
1759            parse_producer_operation("start").unwrap(),
1760            ProducerOperation::Start
1761        );
1762        assert_eq!(
1763            parse_producer_operation("stop").unwrap(),
1764            ProducerOperation::Stop
1765        );
1766        assert_eq!(
1767            parse_producer_operation("remove").unwrap(),
1768            ProducerOperation::Remove
1769        );
1770    }
1771
1772    #[test]
1773    fn test_parse_producer_operation_unknown() {
1774        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1775        match err {
1776            CamelError::ProcessorError(msg) => {
1777                assert!(
1778                    msg.contains("Unknown container operation"),
1779                    "Unexpected error message: {}",
1780                    msg
1781                );
1782            }
1783            _ => panic!("Expected ProcessorError for unknown operation"),
1784        }
1785    }
1786
1787    #[test]
1788    fn test_parse_producer_operation_new_variants() {
1789        assert_eq!(
1790            parse_producer_operation("exec").unwrap(),
1791            ProducerOperation::Exec
1792        );
1793        assert_eq!(
1794            parse_producer_operation("network-create").unwrap(),
1795            ProducerOperation::NetworkCreate
1796        );
1797        assert_eq!(
1798            parse_producer_operation("network-connect").unwrap(),
1799            ProducerOperation::NetworkConnect
1800        );
1801        assert_eq!(
1802            parse_producer_operation("network-disconnect").unwrap(),
1803            ProducerOperation::NetworkDisconnect
1804        );
1805        assert_eq!(
1806            parse_producer_operation("network-remove").unwrap(),
1807            ProducerOperation::NetworkRemove
1808        );
1809        assert_eq!(
1810            parse_producer_operation("network-list").unwrap(),
1811            ProducerOperation::NetworkList
1812        );
1813    }
1814
1815    #[test]
1816    fn test_resolve_container_name_header_overrides_config() {
1817        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1818        let mut exchange = Exchange::new(Message::new(""));
1819        exchange.input.set_header(
1820            HEADER_CONTAINER_NAME,
1821            serde_json::Value::String("header-name".to_string()),
1822        );
1823
1824        let resolved = resolve_container_name(&exchange, &config);
1825        assert_eq!(resolved.as_deref(), Some("header-name"));
1826    }
1827
1828    #[test]
1829    fn test_container_config_rejects_tcp_host() {
1830        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1831        let err = config.connect_docker_client().unwrap_err();
1832        match err {
1833            CamelError::ProcessorError(msg) => {
1834                assert!(
1835                    msg.to_lowercase().contains("tcp"),
1836                    "Expected TCP scheme error, got: {}",
1837                    msg
1838                );
1839            }
1840            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1841        }
1842    }
1843
1844    #[tokio::test]
1845    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1846        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1847        let remove_called_clone = remove_called.clone();
1848
1849        let result = run_container_with_cleanup(
1850            || async { Ok("container-123".to_string()) },
1851            |_id| async move {
1852                Err(CamelError::ProcessorError(
1853                    "Failed to start container".to_string(),
1854                ))
1855            },
1856            move |_id| {
1857                let remove_called_inner = remove_called_clone.clone();
1858                async move {
1859                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1860                    Ok(())
1861                }
1862            },
1863        )
1864        .await;
1865
1866        assert!(result.is_err(), "Expected start failure to bubble up");
1867        assert!(
1868            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1869            "Expected cleanup to remove container"
1870        );
1871    }
1872
1873    #[test]
1874    fn test_container_component_creates_endpoint() {
1875        let component = ContainerComponent::new();
1876        assert_eq!(component.scheme(), "container");
1877        let ctx = NoOpComponentContext;
1878        let endpoint = component
1879            .create_endpoint("container:run?image=alpine", &ctx)
1880            .unwrap();
1881        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1882    }
1883
1884    #[test]
1885    fn test_container_config_parses_ports() {
1886        let config =
1887            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1888        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1889    }
1890
1891    #[test]
1892    fn test_container_config_parses_env() {
1893        let config =
1894            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1895        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1896    }
1897
1898    #[test]
1899    fn test_container_config_parses_logs_options() {
1900        let config = ContainerConfig::from_uri(
1901            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1902        )
1903        .unwrap();
1904        assert_eq!(config.operation, "logs");
1905        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1906        assert!(config.follow);
1907        assert!(config.timestamps);
1908        assert_eq!(config.tail.as_deref(), Some("100"));
1909    }
1910
1911    #[test]
1912    fn test_container_config_logs_defaults() {
1913        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1914        assert!(config.follow); // default: true
1915        assert!(!config.timestamps); // default: false
1916        assert!(config.tail.is_none()); // default: None (all)
1917    }
1918
1919    #[test]
1920    fn test_parse_ports_single() {
1921        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1922        let (exposed, bindings) = config.parse_ports().unwrap();
1923
1924        assert!(exposed.contains_key("80/tcp"));
1925        assert!(bindings.contains_key("80/tcp"));
1926
1927        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1928        assert_eq!(binding.len(), 1);
1929        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1930    }
1931
1932    #[test]
1933    fn test_parse_ports_multiple() {
1934        let config =
1935            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1936        let (exposed, bindings) = config.parse_ports().unwrap();
1937
1938        assert!(exposed.contains_key("80/tcp"));
1939        assert!(exposed.contains_key("443/tcp"));
1940        assert_eq!(bindings.len(), 2);
1941    }
1942
1943    #[test]
1944    fn test_parse_ports_with_protocol() {
1945        let config =
1946            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1947                .unwrap();
1948        let (exposed, _bindings) = config.parse_ports().unwrap();
1949
1950        assert!(exposed.contains_key("80/tcp"));
1951        assert!(exposed.contains_key("53/udp"));
1952    }
1953
1954    #[test]
1955    fn test_parse_ports_none() {
1956        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1957        assert!(config.parse_ports().is_none());
1958    }
1959
1960    #[test]
1961    fn test_parse_env_single() {
1962        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1963        let env = config.parse_env().unwrap();
1964
1965        assert_eq!(env.len(), 1);
1966        assert_eq!(env[0], "FOO=bar");
1967    }
1968
1969    #[test]
1970    fn test_parse_env_multiple() {
1971        let config =
1972            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1973                .unwrap();
1974        let env = config.parse_env().unwrap();
1975
1976        assert_eq!(env.len(), 3);
1977        assert!(env.contains(&"FOO=bar".to_string()));
1978        assert!(env.contains(&"BAZ=qux".to_string()));
1979        assert!(env.contains(&"NUM=123".to_string()));
1980    }
1981
1982    #[test]
1983    fn test_parse_env_none() {
1984        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1985        assert!(config.parse_env().is_none());
1986    }
1987
1988    use camel_component_api::Message;
1989    use std::sync::Arc;
1990
1991    #[tokio::test]
1992    async fn test_container_producer_resolves_operation_from_header() {
1993        // Try to connect to Docker - if fails, return early
1994        let docker = match Docker::connect_with_local_defaults() {
1995            Ok(d) => d,
1996            Err(_) => {
1997                eprintln!("Skipping test: Could not connect to Docker daemon");
1998                return;
1999            }
2000        };
2001
2002        if docker.ping().await.is_err() {
2003            eprintln!("Skipping test: Docker daemon not responding to ping");
2004            return;
2005        }
2006
2007        let component = ContainerComponent::new();
2008        let ctx = NoOpComponentContext;
2009        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2010
2011        let ctx = ProducerContext::new();
2012        let mut producer = endpoint.create_producer(&ctx).unwrap();
2013
2014        let mut exchange = Exchange::new(Message::new(""));
2015        exchange
2016            .input
2017            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2018
2019        use tower::ServiceExt;
2020        let result = producer
2021            .ready()
2022            .await
2023            .unwrap()
2024            .call(exchange)
2025            .await
2026            .unwrap();
2027
2028        // For list operation, the result header should be on the input message
2029        assert_eq!(
2030            result
2031                .input
2032                .header(HEADER_ACTION_RESULT)
2033                .map(|v| v.as_str().unwrap()),
2034            Some("success")
2035        );
2036    }
2037
2038    #[tokio::test]
2039    async fn test_container_producer_connection_error_on_invalid_host() {
2040        // Test that an invalid host (nonexistent socket) results in a connection error
2041        let component = ContainerComponent::new();
2042        let ctx = NoOpComponentContext;
2043        let endpoint = component
2044            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2045            .unwrap();
2046
2047        let ctx = ProducerContext::new();
2048        let result = endpoint.create_producer(&ctx);
2049
2050        // The producer should return an error because it cannot connect to the invalid socket
2051        assert!(
2052            result.is_err(),
2053            "Expected error when connecting to invalid host"
2054        );
2055        let err = result.unwrap_err();
2056        match &err {
2057            CamelError::ProcessorError(msg) => {
2058                assert!(
2059                    msg.to_lowercase().contains("connection")
2060                        || msg.to_lowercase().contains("connect")
2061                        || msg.to_lowercase().contains("socket")
2062                        || msg.contains("docker"),
2063                    "Error message should indicate connection failure, got: {}",
2064                    msg
2065                );
2066            }
2067            _ => panic!("Expected ProcessorError, got: {:?}", err),
2068        }
2069    }
2070
2071    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
2072    #[tokio::test]
2073    async fn test_container_producer_lifecycle_operations_missing_id() {
2074        // Try to connect to Docker - if fails, return early
2075        let docker = match Docker::connect_with_local_defaults() {
2076            Ok(d) => d,
2077            Err(_) => {
2078                eprintln!("Skipping test: Could not connect to Docker daemon");
2079                return;
2080            }
2081        };
2082
2083        if docker.ping().await.is_err() {
2084            eprintln!("Skipping test: Docker daemon not responding to ping");
2085            return;
2086        }
2087
2088        let component = ContainerComponent::new();
2089        let ctx = NoOpComponentContext;
2090        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2091        let ctx = ProducerContext::new();
2092        let mut producer = endpoint.create_producer(&ctx).unwrap();
2093
2094        // Test each lifecycle operation without CamelContainerId header
2095        for operation in ["start", "stop", "remove"] {
2096            let mut exchange = Exchange::new(Message::new(""));
2097            exchange.input.set_header(
2098                HEADER_ACTION,
2099                serde_json::Value::String(operation.to_string()),
2100            );
2101            // Deliberately NOT setting CamelContainerId header
2102
2103            use tower::ServiceExt;
2104            let result = producer.ready().await.unwrap().call(exchange).await;
2105
2106            assert!(
2107                result.is_err(),
2108                "Expected error for {} operation without CamelContainerId",
2109                operation
2110            );
2111            let err = result.unwrap_err();
2112            match &err {
2113                CamelError::ProcessorError(msg) => {
2114                    assert!(
2115                        msg.contains(HEADER_CONTAINER_ID),
2116                        "Error message should mention {}, got: {}",
2117                        HEADER_CONTAINER_ID,
2118                        msg
2119                    );
2120                }
2121                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2122            }
2123        }
2124    }
2125
2126    /// Test that stop operation returns an error for a nonexistent container.
2127    #[tokio::test]
2128    async fn test_container_producer_stop_nonexistent() {
2129        // Try to connect to Docker - if fails, return early
2130        let docker = match Docker::connect_with_local_defaults() {
2131            Ok(d) => d,
2132            Err(_) => {
2133                eprintln!("Skipping test: Could not connect to Docker daemon");
2134                return;
2135            }
2136        };
2137
2138        if docker.ping().await.is_err() {
2139            eprintln!("Skipping test: Docker daemon not responding to ping");
2140            return;
2141        }
2142
2143        let component = ContainerComponent::new();
2144        let ctx = NoOpComponentContext;
2145        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2146        let ctx = ProducerContext::new();
2147        let mut producer = endpoint.create_producer(&ctx).unwrap();
2148
2149        let mut exchange = Exchange::new(Message::new(""));
2150        exchange
2151            .input
2152            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2153        exchange.input.set_header(
2154            HEADER_CONTAINER_ID,
2155            serde_json::Value::String("nonexistent-container-123".into()),
2156        );
2157
2158        use tower::ServiceExt;
2159        let result = producer.ready().await.unwrap().call(exchange).await;
2160
2161        assert!(
2162            result.is_err(),
2163            "Expected error when stopping nonexistent container"
2164        );
2165        let err = result.unwrap_err();
2166        match &err {
2167            CamelError::ProcessorError(msg) => {
2168                // Docker API returns 404 with "No such container" message
2169                assert!(
2170                    msg.to_lowercase().contains("no such container")
2171                        || msg.to_lowercase().contains("not found")
2172                        || msg.contains("404"),
2173                    "Error message should indicate container not found, got: {}",
2174                    msg
2175                );
2176            }
2177            _ => panic!("Expected ProcessorError, got: {:?}", err),
2178        }
2179    }
2180
2181    /// Test that run operation returns an error when no image is provided.
2182    #[tokio::test]
2183    async fn test_container_producer_run_missing_image() {
2184        // Try to connect to Docker - if fails, return early
2185        let docker = match Docker::connect_with_local_defaults() {
2186            Ok(d) => d,
2187            Err(_) => {
2188                eprintln!("Skipping test: Could not connect to Docker daemon");
2189                return;
2190            }
2191        };
2192
2193        if docker.ping().await.is_err() {
2194            eprintln!("Skipping test: Docker daemon not responding to ping");
2195            return;
2196        }
2197
2198        // Create producer without an image in the URI
2199        let component = ContainerComponent::new();
2200        let ctx = NoOpComponentContext;
2201        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2202        let ctx = ProducerContext::new();
2203        let mut producer = endpoint.create_producer(&ctx).unwrap();
2204
2205        let mut exchange = Exchange::new(Message::new(""));
2206        exchange
2207            .input
2208            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2209        // Deliberately NOT setting CamelContainerImage header
2210
2211        use tower::ServiceExt;
2212        let result = producer.ready().await.unwrap().call(exchange).await;
2213
2214        assert!(
2215            result.is_err(),
2216            "Expected error for run operation without image"
2217        );
2218        let err = result.unwrap_err();
2219        match &err {
2220            CamelError::ProcessorError(msg) => {
2221                assert!(
2222                    msg.to_lowercase().contains("image"),
2223                    "Error message should mention 'image', got: {}",
2224                    msg
2225                );
2226            }
2227            _ => panic!("Expected ProcessorError, got: {:?}", err),
2228        }
2229    }
2230
2231    /// Test that run operation uses image from header.
2232    #[tokio::test]
2233    async fn test_container_producer_run_image_from_header() {
2234        // Try to connect to Docker - if fails, return early
2235        let docker = match Docker::connect_with_local_defaults() {
2236            Ok(d) => d,
2237            Err(_) => {
2238                eprintln!("Skipping test: Could not connect to Docker daemon");
2239                return;
2240            }
2241        };
2242
2243        if docker.ping().await.is_err() {
2244            eprintln!("Skipping test: Docker daemon not responding to ping");
2245            return;
2246        }
2247
2248        // Create producer without an image in the URI
2249        let component = ContainerComponent::new();
2250        let ctx = NoOpComponentContext;
2251        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2252        let ctx = ProducerContext::new();
2253        let mut producer = endpoint.create_producer(&ctx).unwrap();
2254
2255        let mut exchange = Exchange::new(Message::new(""));
2256        exchange
2257            .input
2258            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2259        // Set a non-existent image to test that the run operation attempts to use it
2260        exchange.input.set_header(
2261            HEADER_IMAGE,
2262            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2263        );
2264
2265        use tower::ServiceExt;
2266        let result = producer.ready().await.unwrap().call(exchange).await;
2267
2268        // The operation should fail because the image doesn't exist
2269        assert!(
2270            result.is_err(),
2271            "Expected error when running container with nonexistent image"
2272        );
2273        let err = result.unwrap_err();
2274        match &err {
2275            CamelError::ProcessorError(msg) => {
2276                // Docker API returns an error about the missing image
2277                assert!(
2278                    msg.to_lowercase().contains("no such image")
2279                        || msg.to_lowercase().contains("not found")
2280                        || msg.to_lowercase().contains("image")
2281                        || msg.to_lowercase().contains("pull")
2282                        || msg.contains("404"),
2283                    "Error message should indicate image issue, got: {}",
2284                    msg
2285                );
2286            }
2287            _ => panic!("Expected ProcessorError, got: {:?}", err),
2288        }
2289    }
2290
2291    /// Integration test: Actually run a container with alpine:latest.
2292    /// This test verifies the full flow: create → start → set headers.
2293    #[tokio::test]
2294    async fn test_container_producer_run_alpine_container() {
2295        let docker = match Docker::connect_with_local_defaults() {
2296            Ok(d) => d,
2297            Err(_) => {
2298                eprintln!("Skipping test: Could not connect to Docker daemon");
2299                return;
2300            }
2301        };
2302
2303        if docker.ping().await.is_err() {
2304            eprintln!("Skipping test: Docker daemon not responding to ping");
2305            return;
2306        }
2307
2308        // Pull alpine:latest if not present
2309        let images = docker.list_images::<&str>(None).await.unwrap();
2310        let has_alpine = images
2311            .iter()
2312            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2313
2314        if !has_alpine {
2315            eprintln!("Pulling alpine:latest image...");
2316            let mut stream = docker.create_image(
2317                Some(bollard::image::CreateImageOptions {
2318                    from_image: "alpine:latest",
2319                    ..Default::default()
2320                }),
2321                None,
2322                None,
2323            );
2324
2325            use futures::StreamExt;
2326            while let Some(_item) = stream.next().await {
2327                // Wait for pull to complete
2328            }
2329            eprintln!("Image pulled successfully");
2330        }
2331
2332        // Create producer
2333        let component = ContainerComponent::new();
2334        let ctx = NoOpComponentContext;
2335        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2336        let ctx = ProducerContext::new();
2337        let mut producer = endpoint.create_producer(&ctx).unwrap();
2338
2339        // Run container with unique name
2340        let timestamp = std::time::SystemTime::now()
2341            .duration_since(std::time::UNIX_EPOCH)
2342            .unwrap()
2343            .as_millis();
2344        let container_name = format!("test-rust-camel-{}", timestamp);
2345        let mut exchange = Exchange::new(Message::new(""));
2346        exchange.input.set_header(
2347            HEADER_IMAGE,
2348            serde_json::Value::String("alpine:latest".into()),
2349        );
2350        exchange.input.set_header(
2351            HEADER_CONTAINER_NAME,
2352            serde_json::Value::String(container_name.clone()),
2353        );
2354
2355        use tower::ServiceExt;
2356        let result = producer
2357            .ready()
2358            .await
2359            .unwrap()
2360            .call(exchange)
2361            .await
2362            .expect("Container run should succeed");
2363
2364        // Verify container ID was set
2365        let container_id = result
2366            .input
2367            .header(HEADER_CONTAINER_ID)
2368            .and_then(|v| v.as_str().map(|s| s.to_string()))
2369            .expect("Expected container ID header");
2370        assert!(!container_id.is_empty(), "Container ID should not be empty");
2371
2372        // Verify success header
2373        assert_eq!(
2374            result
2375                .input
2376                .header(HEADER_ACTION_RESULT)
2377                .and_then(|v| v.as_str()),
2378            Some("success")
2379        );
2380
2381        // Verify container exists in Docker
2382        let inspect = docker
2383            .inspect_container(&container_id, None)
2384            .await
2385            .expect("Container should exist");
2386        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2387
2388        // Cleanup: remove container
2389        docker
2390            .remove_container(
2391                &container_id,
2392                Some(bollard::container::RemoveContainerOptions {
2393                    force: true,
2394                    ..Default::default()
2395                }),
2396            )
2397            .await
2398            .ok();
2399
2400        eprintln!("✅ Container {} created and cleaned up", container_id);
2401    }
2402
2403    /// Test that consumer returns an error for unsupported operations.
2404    #[tokio::test]
2405    async fn test_container_consumer_unsupported_operation() {
2406        use tokio::sync::mpsc;
2407
2408        let component = ContainerComponent::new();
2409        let ctx = NoOpComponentContext;
2410        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2411        let mut consumer = endpoint.create_consumer().unwrap();
2412
2413        // Create a minimal ConsumerContext
2414        let (tx, _rx) = mpsc::channel(16);
2415        let cancel_token = tokio_util::sync::CancellationToken::new();
2416        let context = ConsumerContext::new(tx, cancel_token);
2417
2418        let result = consumer.start(context).await;
2419
2420        // Should return error because "run" is not a supported consumer operation
2421        assert!(
2422            result.is_err(),
2423            "Expected error for unsupported consumer operation"
2424        );
2425        let err = result.unwrap_err();
2426        match &err {
2427            CamelError::EndpointCreationFailed(msg) => {
2428                assert!(
2429                    msg.contains("Consumer only supports 'events' or 'logs'"),
2430                    "Error message should mention events or logs support, got: {}",
2431                    msg
2432                );
2433            }
2434            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2435        }
2436    }
2437
2438    #[test]
2439    fn test_container_consumer_concurrency_model_is_concurrent() {
2440        let consumer = ContainerConsumer {
2441            config: ContainerConfig::from_uri("container:events").unwrap(),
2442        };
2443
2444        assert_eq!(
2445            consumer.concurrency_model(),
2446            camel_component_api::ConcurrencyModel::Concurrent { max: None }
2447        );
2448    }
2449
2450    /// Test that consumer gracefully shuts down when cancellation is requested.
2451    /// This test requires a running Docker daemon. If Docker is not available, the test
2452    /// will be ignored.
2453    #[tokio::test]
2454    async fn test_container_consumer_cancellation() {
2455        use std::sync::atomic::{AtomicBool, Ordering};
2456        use tokio::sync::mpsc;
2457
2458        // Try to connect to Docker - if fails, return early
2459        let docker = match Docker::connect_with_local_defaults() {
2460            Ok(d) => d,
2461            Err(_) => {
2462                eprintln!("Skipping test: Could not connect to Docker daemon");
2463                return;
2464            }
2465        };
2466
2467        if docker.ping().await.is_err() {
2468            eprintln!("Skipping test: Docker daemon not responding to ping");
2469            return;
2470        }
2471
2472        let component = ContainerComponent::new();
2473        let ctx = NoOpComponentContext;
2474        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2475        let mut consumer = endpoint.create_consumer().unwrap();
2476
2477        // Create a ConsumerContext
2478        let (tx, _rx) = mpsc::channel(16);
2479        let cancel_token = tokio_util::sync::CancellationToken::new();
2480        let context = ConsumerContext::new(tx, cancel_token.clone());
2481
2482        // Track if the consumer task has completed
2483        let completed = Arc::new(AtomicBool::new(false));
2484        let completed_clone = completed.clone();
2485
2486        // Spawn consumer in background
2487        let handle = tokio::spawn(async move {
2488            let result = consumer.start(context).await;
2489            // Mark as completed when done
2490            completed_clone.store(true, Ordering::SeqCst);
2491            result
2492        });
2493
2494        // Wait a bit for the consumer to start
2495        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2496
2497        // Consumer should still be running (not completed)
2498        assert!(
2499            !completed.load(Ordering::SeqCst),
2500            "Consumer should still be running before cancellation"
2501        );
2502
2503        // Request cancellation
2504        cancel_token.cancel();
2505
2506        // Wait for the task to complete (with timeout)
2507        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2508
2509        // Task should have completed (not timed out)
2510        assert!(
2511            result.is_ok(),
2512            "Consumer should gracefully shut down after cancellation"
2513        );
2514
2515        // Verify the consumer completed
2516        assert!(
2517            completed.load(Ordering::SeqCst),
2518            "Consumer should have completed after cancellation"
2519        );
2520    }
2521
2522    /// Integration test for listing containers.
2523    /// This test requires a running Docker daemon. If Docker is not available, the test
2524    /// will return early and be effectively ignored.
2525    #[tokio::test]
2526    async fn test_container_producer_list_containers() {
2527        // Try to connect to Docker using default config
2528        // If ping fails, return early (effectively ignoring this test)
2529        let docker = match Docker::connect_with_local_defaults() {
2530            Ok(d) => d,
2531            Err(_) => {
2532                eprintln!("Skipping test: Could not connect to Docker daemon");
2533                return;
2534            }
2535        };
2536
2537        if docker.ping().await.is_err() {
2538            eprintln!("Skipping test: Docker daemon not responding to ping");
2539            return;
2540        }
2541
2542        // Create producer with list operation
2543        let component = ContainerComponent::new();
2544        let ctx = NoOpComponentContext;
2545        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2546
2547        let ctx = ProducerContext::new();
2548        let mut producer = endpoint.create_producer(&ctx).unwrap();
2549
2550        // Create exchange with list operation in header
2551        let mut exchange = Exchange::new(Message::new(""));
2552        exchange
2553            .input
2554            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2555
2556        // Call the producer
2557        use tower::ServiceExt;
2558        let result = producer
2559            .ready()
2560            .await
2561            .unwrap()
2562            .call(exchange)
2563            .await
2564            .expect("Producer should succeed when Docker is available");
2565
2566        // Assert that the input exchange body is a JSON array
2567        // (because list_containers should put the JSON result in the body)
2568        match &result.input.body {
2569            camel_component_api::Body::Json(json_value) => {
2570                assert!(
2571                    json_value.is_array(),
2572                    "Expected input body to be a JSON array, got: {:?}",
2573                    json_value
2574                );
2575            }
2576            other => panic!("Expected Body::Json with array, got: {:?}", other),
2577        }
2578    }
2579
2580    #[test]
2581    fn test_container_config_parses_volumes() {
2582        let config = ContainerConfig::from_uri(
2583            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2584        )
2585        .unwrap();
2586        assert_eq!(
2587            config.volumes.as_deref(),
2588            Some("./html:/usr/share/nginx/html:ro")
2589        );
2590    }
2591
2592    #[test]
2593    fn test_container_config_parses_exec_params() {
2594        let config = ContainerConfig::from_uri(
2595            "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2596        )
2597        .unwrap();
2598        assert_eq!(config.operation, "exec");
2599        assert_eq!(config.container_id.as_deref(), Some("my-app"));
2600        assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2601        assert_eq!(config.user.as_deref(), Some("root"));
2602        assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2603        assert!(config.detach);
2604    }
2605
2606    #[test]
2607    fn test_container_config_parses_network_create_params() {
2608        let config =
2609            ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge").unwrap();
2610        assert_eq!(config.operation, "network-create");
2611        assert_eq!(config.name.as_deref(), Some("my-net"));
2612        assert_eq!(config.driver.as_deref(), Some("bridge"));
2613    }
2614
2615    #[test]
2616    fn test_container_config_defaults_new_fields() {
2617        let config = ContainerConfig::from_uri("container:list").unwrap();
2618        assert!(config.volumes.is_none());
2619        assert!(config.user.is_none());
2620        assert!(config.workdir.is_none());
2621        assert!(!config.detach);
2622        assert!(config.driver.is_none());
2623        assert!(!config.force);
2624    }
2625
2626    #[test]
2627    fn test_parse_volumes_bind_mount() {
2628        let config = ContainerConfig::from_uri(
2629            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2630        )
2631        .unwrap();
2632        let (binds, anon) = config.parse_volumes().unwrap();
2633        assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2634        assert!(anon.is_empty());
2635    }
2636
2637    #[test]
2638    fn test_parse_volumes_named_volume() {
2639        let config =
2640            ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2641                .unwrap();
2642        let (binds, anon) = config.parse_volumes().unwrap();
2643        assert_eq!(binds, vec!["data:/var/lib/data"]);
2644        assert!(anon.is_empty());
2645    }
2646
2647    #[test]
2648    fn test_parse_volumes_anonymous() {
2649        let config =
2650            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2651        let (binds, anon) = config.parse_volumes().unwrap();
2652        assert!(binds.is_empty());
2653        assert!(anon.contains_key("/tmp/app-data"));
2654    }
2655
2656    #[test]
2657    fn test_parse_volumes_anonymous_with_mode() {
2658        let config =
2659            ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2660                .unwrap();
2661        let (binds, anon) = config.parse_volumes().unwrap();
2662        assert!(binds.is_empty());
2663        assert!(anon.contains_key("/tmp/app-data"));
2664    }
2665
2666    #[test]
2667    fn test_parse_volumes_multiple() {
2668        let config = ContainerConfig::from_uri(
2669            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2670        )
2671        .unwrap();
2672        let (binds, anon) = config.parse_volumes().unwrap();
2673        assert_eq!(binds.len(), 2);
2674        assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2675        assert!(binds.contains(&"data:/var/log/app".to_string()));
2676        assert!(anon.is_empty());
2677    }
2678
2679    #[test]
2680    fn test_parse_volumes_mixed() {
2681        let config = ContainerConfig::from_uri(
2682            "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2683        )
2684        .unwrap();
2685        let (binds, anon) = config.parse_volumes().unwrap();
2686        assert_eq!(binds.len(), 1);
2687        assert!(anon.contains_key("/tmp/cache"));
2688    }
2689
2690    #[test]
2691    fn test_parse_volumes_none() {
2692        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2693        assert!(config.parse_volumes().is_none());
2694    }
2695
2696    #[test]
2697    fn test_parse_volumes_empty_entry_skipped() {
2698        let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2699        assert!(config.parse_volumes().is_none());
2700    }
2701
2702    #[test]
2703    fn test_parse_volumes_rw_mode() {
2704        let config = ContainerConfig::from_uri(
2705            "container:run?image=nginx&volumes=./data:/app/data:rw",
2706        )
2707        .unwrap();
2708        let (binds, _) = config.parse_volumes().unwrap();
2709        assert_eq!(binds, vec!["./data:/app/data:rw"]);
2710    }
2711
2712    #[tokio::test]
2713    async fn test_container_producer_run_with_volumes() {
2714        let docker = match Docker::connect_with_local_defaults() {
2715            Ok(d) => d,
2716            Err(_) => {
2717                eprintln!("Skipping test: Could not connect to Docker daemon");
2718                return;
2719            }
2720        };
2721        if docker.ping().await.is_err() {
2722            eprintln!("Skipping test: Docker daemon not responding to ping");
2723            return;
2724        }
2725
2726        let cargo_toml = std::fs::canonicalize("./Cargo.toml")
2727            .expect("Cargo.toml should exist");
2728        let volume_path = cargo_toml.to_str().unwrap();
2729
2730        let component = ContainerComponent::new();
2731        let ctx = NoOpComponentContext;
2732        let uri = format!(
2733            "container:run?image=alpine&cmd=cat /mnt/test.txt&volumes={}:/mnt/test.txt:ro&autoRemove=true",
2734            volume_path
2735        );
2736        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
2737        let ctx = ProducerContext::new();
2738        let mut producer = endpoint.create_producer(&ctx).unwrap();
2739
2740        let mut exchange = Exchange::new(Message::new(""));
2741        exchange
2742            .input
2743            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2744
2745        use tower::ServiceExt;
2746        let result = producer
2747            .ready()
2748            .await
2749            .unwrap()
2750            .call(exchange)
2751            .await
2752            .expect("Run with volumes should succeed");
2753
2754        let container_id = result
2755            .input
2756            .header(HEADER_CONTAINER_ID)
2757            .and_then(|v| v.as_str().map(|s| s.to_string()))
2758            .expect("Should have container ID");
2759
2760        let _ = docker
2761            .remove_container(
2762                &container_id,
2763                Some(bollard::container::RemoveContainerOptions {
2764                    force: true,
2765                    ..Default::default()
2766                }),
2767            )
2768            .await;
2769    }
2770
2771    #[tokio::test]
2772    async fn test_container_producer_exec() {
2773        let docker = match Docker::connect_with_local_defaults() {
2774            Ok(d) => d,
2775            Err(_) => {
2776                eprintln!("Skipping test: Could not connect to Docker daemon");
2777                return;
2778            }
2779        };
2780        if docker.ping().await.is_err() {
2781            eprintln!("Skipping test: Docker daemon not responding to ping");
2782            return;
2783        }
2784
2785        let component = ContainerComponent::new();
2786        let ctx = NoOpComponentContext;
2787        let endpoint = component
2788            .create_endpoint("container:run?image=alpine&cmd=sleep 30&autoRemove=true", &ctx)
2789            .unwrap();
2790        let ctx = ProducerContext::new();
2791        let mut producer = endpoint.create_producer(&ctx).unwrap();
2792
2793        let mut exchange = Exchange::new(Message::new(""));
2794        exchange
2795            .input
2796            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2797
2798        use tower::ServiceExt;
2799        let result = producer
2800            .ready()
2801            .await
2802            .unwrap()
2803            .call(exchange)
2804            .await
2805            .expect("Run should succeed");
2806
2807        let container_id = result
2808            .input
2809            .header(HEADER_CONTAINER_ID)
2810            .and_then(|v| v.as_str().map(|s| s.to_string()))
2811            .expect("Should have container ID");
2812
2813        let mut exec_exchange = Exchange::new(Message::new(""));
2814        exec_exchange.input.set_header(
2815            HEADER_ACTION,
2816            serde_json::Value::String("exec".into()),
2817        );
2818        exec_exchange.input.set_header(
2819            HEADER_CONTAINER_ID,
2820            serde_json::Value::String(container_id.clone()),
2821        );
2822        exec_exchange.input.set_header(
2823            HEADER_CMD,
2824            serde_json::Value::String("echo hello".into()),
2825        );
2826
2827        let exec_result = producer
2828            .ready()
2829            .await
2830            .unwrap()
2831            .call(exec_exchange)
2832            .await
2833            .expect("Exec should succeed");
2834
2835        match &exec_result.input.body {
2836            Body::Text(text) => {
2837                assert!(text.contains("hello"), "Exec output should contain 'hello', got: {}", text);
2838            }
2839            other => panic!("Expected Body::Text, got: {:?}", other),
2840        }
2841
2842        let exit_code = exec_result
2843            .input
2844            .header(HEADER_EXIT_CODE)
2845            .and_then(|v| v.as_i64());
2846        assert_eq!(exit_code, Some(0));
2847
2848        let action_result = exec_result
2849            .input
2850            .header(HEADER_ACTION_RESULT)
2851            .and_then(|v| v.as_str());
2852        assert_eq!(action_result, Some("success"));
2853
2854        let _ = docker
2855            .remove_container(
2856                &container_id,
2857                Some(bollard::container::RemoveContainerOptions {
2858                    force: true,
2859                    ..Default::default()
2860                }),
2861            )
2862            .await;
2863    }
2864
2865    #[tokio::test]
2866    async fn test_container_producer_network_lifecycle() {
2867        let docker = match Docker::connect_with_local_defaults() {
2868            Ok(d) => d,
2869            Err(_) => {
2870                eprintln!("Skipping test: Could not connect to Docker daemon");
2871                return;
2872            }
2873        };
2874        if docker.ping().await.is_err() {
2875            eprintln!("Skipping test: Docker daemon not responding to ping");
2876            return;
2877        }
2878
2879        let network_name = format!("camel-test-{}", std::process::id());
2880
2881        let component = ContainerComponent::new();
2882        let component_ctx = NoOpComponentContext;
2883
2884        // Create network
2885        let endpoint = component
2886            .create_endpoint(
2887                &format!("container:network-create?name={}", network_name),
2888                &component_ctx,
2889            )
2890            .unwrap();
2891        let ctx = ProducerContext::new();
2892        let mut producer = endpoint.create_producer(&ctx).unwrap();
2893
2894        let mut exchange = Exchange::new(Message::new(""));
2895        exchange.input.set_header(
2896            HEADER_ACTION,
2897            serde_json::Value::String("network-create".into()),
2898        );
2899
2900        use tower::ServiceExt;
2901        let result = producer
2902            .ready()
2903            .await
2904            .unwrap()
2905            .call(exchange)
2906            .await
2907            .expect("Network create should succeed");
2908
2909        let network_id = result
2910            .input
2911            .header(HEADER_NETWORK)
2912            .and_then(|v| v.as_str().map(|s| s.to_string()))
2913            .expect("Should have network ID");
2914
2915        assert!(!network_id.is_empty());
2916
2917        // List networks
2918        let endpoint = component
2919            .create_endpoint("container:network-list", &component_ctx)
2920            .unwrap();
2921        let mut producer = endpoint.create_producer(&ctx).unwrap();
2922
2923        let mut exchange = Exchange::new(Message::new(""));
2924        exchange.input.set_header(
2925            HEADER_ACTION,
2926            serde_json::Value::String("network-list".into()),
2927        );
2928
2929        let list_result = producer
2930            .ready()
2931            .await
2932            .unwrap()
2933            .call(exchange)
2934            .await
2935            .expect("Network list should succeed");
2936
2937        match &list_result.input.body {
2938            Body::Json(json_value) => {
2939                assert!(json_value.is_array(), "Expected JSON array");
2940            }
2941            other => panic!("Expected Body::Json, got: {:?}", other),
2942        }
2943
2944        // Remove network
2945        let endpoint = component
2946            .create_endpoint(
2947                &format!("container:network-remove?network={}", network_name),
2948                &component_ctx,
2949            )
2950            .unwrap();
2951        let mut producer = endpoint.create_producer(&ctx).unwrap();
2952
2953        let mut exchange = Exchange::new(Message::new(""));
2954        exchange.input.set_header(
2955            HEADER_ACTION,
2956            serde_json::Value::String("network-remove".into()),
2957        );
2958
2959        let remove_result = producer
2960            .ready()
2961            .await
2962            .unwrap()
2963            .call(exchange)
2964            .await
2965            .expect("Network remove should succeed");
2966
2967        let action_result = remove_result
2968            .input
2969            .header(HEADER_ACTION_RESULT)
2970            .and_then(|v| v.as_str());
2971        assert_eq!(action_result, Some("success"));
2972    }
2973}