Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24/// Global tracker for containers created by this component.
25/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
26static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29/// Registers a container ID for tracking (will be cleaned up on shutdown).
30fn track_container(id: String) {
31    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32        tracker.insert(id);
33    }
34}
35
36/// Removes a container ID from tracking (when it's been removed naturally).
37fn untrack_container(id: &str) {
38    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39        tracker.remove(id);
40    }
41}
42
43/// Cleans up all tracked containers. Call this on application shutdown.
44pub async fn cleanup_tracked_containers() {
45    let ids: Vec<String> = {
46        match CONTAINER_TRACKER.lock() {
47            Ok(tracker) => tracker.iter().cloned().collect(),
48            Err(_) => return,
49        }
50    };
51
52    if ids.is_empty() {
53        return;
54    }
55
56    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58    let docker = match Docker::connect_with_local_defaults() {
59        Ok(d) => d,
60        Err(e) => {
61            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62            return;
63        }
64    };
65
66    for id in ids {
67        match docker
68            .remove_container(
69                &id,
70                Some(bollard::container::RemoveContainerOptions {
71                    force: true,
72                    ..Default::default()
73                }),
74            )
75            .await
76        {
77            Ok(_) => {
78                tracing::debug!("Cleaned up container {}", id);
79                untrack_container(&id);
80            }
81            Err(e) => {
82                tracing::warn!("Failed to cleanup container {}: {}", id, e);
83            }
84        }
85    }
86}
87
88// Header constants for container operations
89
90/// Timeout (seconds) for connecting to the Docker daemon.
91const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
94pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96/// Header key for specifying the container image to use for "run" operations.
97pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99/// Header key for specifying or receiving the container ID.
100pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102/// Header key for the log stream type (stdout or stderr).
103pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105/// Header key for the log timestamp.
106pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108/// Header key for specifying the container name for "run" operations.
109pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111/// Header key for the result status of a container operation (e.g., "success").
112pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114// ---------------------------------------------------------------------------
115// ContainerGlobalConfig
116// ---------------------------------------------------------------------------
117
118/// Global configuration for Container component.
119/// Supports serde deserialization with defaults and builder methods.
120/// These are the fallback defaults when URI params are not set.
121#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
122#[serde(default)]
123pub struct ContainerGlobalConfig {
124    /// The Docker host URL (default: "unix:///var/run/docker.sock").
125    pub docker_host: String,
126}
127
128impl Default for ContainerGlobalConfig {
129    fn default() -> Self {
130        Self {
131            docker_host: "unix:///var/run/docker.sock".to_string(),
132        }
133    }
134}
135
136impl ContainerGlobalConfig {
137    pub fn new() -> Self {
138        Self::default()
139    }
140
141    pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
142        self.docker_host = v.into();
143        self
144    }
145}
146
147// ---------------------------------------------------------------------------
148// ContainerConfig (endpoint configuration)
149// ---------------------------------------------------------------------------
150
151/// Configuration for the container component endpoint.
152///
153/// This struct holds the parsed URI configuration including the operation type,
154/// optional container image, and Docker host connection details.
155#[derive(Debug, Clone)]
156pub struct ContainerConfig {
157    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
158    pub operation: String,
159    /// The container image to use for "run" operations (can be overridden via header).
160    pub image: Option<String>,
161    /// The container name to use for "run" operations (can be overridden via header).
162    pub name: Option<String>,
163    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
164    pub host: Option<String>,
165    /// Command to run in the container (e.g., "sleep 30").
166    pub cmd: Option<String>,
167    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
168    pub ports: Option<String>,
169    /// Environment variables in format "KEY=value,KEY2=value2".
170    pub env: Option<String>,
171    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
172    pub network: Option<String>,
173    /// Container ID or name for logs consumer.
174    pub container_id: Option<String>,
175    /// Follow log output (default: true for consumer).
176    pub follow: bool,
177    /// Include timestamps in logs (default: false).
178    pub timestamps: bool,
179    /// Number of lines to show from the end of logs (default: all).
180    pub tail: Option<String>,
181    /// Automatically pull the image if not present (default: true).
182    pub auto_pull: bool,
183    /// Automatically remove the container when it exits (default: true).
184    pub auto_remove: bool,
185}
186
187impl ContainerConfig {
188    /// Parses a container URI into a `ContainerConfig`.
189    ///
190    /// # Arguments
191    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
192    ///
193    /// # Errors
194    /// Returns an error if the URI scheme is not "container".
195    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
196        let parts = parse_uri(uri)?;
197        if parts.scheme != "container" {
198            return Err(CamelError::InvalidUri(format!(
199                "expected scheme 'container', got '{}'",
200                parts.scheme
201            )));
202        }
203
204        let image = parts.params.get("image").cloned();
205        let name = parts.params.get("name").cloned();
206        let cmd = parts.params.get("cmd").cloned();
207        let ports = parts.params.get("ports").cloned();
208        let env = parts.params.get("env").cloned();
209        let network = parts.params.get("network").cloned();
210        let container_id = parts.params.get("containerId").cloned();
211        let follow = parts
212            .params
213            .get("follow")
214            .map(|v| v.eq_ignore_ascii_case("true"))
215            .unwrap_or(true);
216        let timestamps = parts
217            .params
218            .get("timestamps")
219            .map(|v| v.eq_ignore_ascii_case("true"))
220            .unwrap_or(false);
221        let tail = parts.params.get("tail").cloned();
222        let auto_pull = parts
223            .params
224            .get("autoPull")
225            .map(|v| v.eq_ignore_ascii_case("true"))
226            .unwrap_or(true);
227        let auto_remove = parts
228            .params
229            .get("autoRemove")
230            .map(|v| v.eq_ignore_ascii_case("true"))
231            .unwrap_or(true);
232        // host is only set from URI param; global config defaults are applied later
233        let host = parts.params.get("host").cloned();
234
235        Ok(Self {
236            operation: parts.path,
237            image,
238            name,
239            host,
240            cmd,
241            ports,
242            env,
243            network,
244            container_id,
245            follow,
246            timestamps,
247            tail,
248            auto_pull,
249            auto_remove,
250        })
251    }
252
253    /// Apply global config defaults to this endpoint config.
254    /// Only sets values that are currently `None`.
255    fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
256        if self.host.is_none() {
257            self.host = Some(global.docker_host.clone());
258        }
259    }
260
261    fn docker_socket_path(&self) -> Result<&str, CamelError> {
262        let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
263            "npipe:////./pipe/docker_engine"
264        } else {
265            "unix:///var/run/docker.sock"
266        });
267
268        if host.starts_with("unix://") || host.starts_with("npipe://") {
269            return Ok(host);
270        }
271
272        if host.contains("://") {
273            return Err(CamelError::ProcessorError(format!(
274                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
275                host
276            )));
277        }
278
279        Ok(host)
280    }
281
282    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
283        let socket_path = self.docker_socket_path()?;
284        Docker::connect_with_socket(
285            socket_path,
286            DOCKER_CONNECT_TIMEOUT_SECS,
287            bollard::API_DEFAULT_VERSION,
288        )
289        .map_err(|e| {
290            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
291        })
292    }
293
294    /// Connects to the Docker daemon using the configured host.
295    ///
296    /// This method establishes a Unix socket connection to Docker and verifies
297    /// the connection by sending a ping request.
298    ///
299    /// # Errors
300    /// Returns an error if the connection fails or the ping request fails.
301    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
302        let docker = self.connect_docker_client()?;
303        docker
304            .ping()
305            .await
306            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
307        Ok(docker)
308    }
309
310    #[allow(clippy::type_complexity)]
311    fn parse_ports(
312        &self,
313    ) -> Option<(
314        HashMap<String, HashMap<(), ()>>,
315        HashMap<String, Option<Vec<PortBinding>>>,
316    )> {
317        let ports_str = self.ports.as_ref()?;
318
319        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
320        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
321
322        for mapping in ports_str.split(',') {
323            let mapping = mapping.trim();
324            if mapping.is_empty() {
325                continue;
326            }
327
328            let (host_port, container_spec) = mapping.split_once(':')?;
329
330            let (container_port, protocol) = if container_spec.contains('/') {
331                let parts: Vec<&str> = container_spec.split('/').collect();
332                (parts[0], parts[1])
333            } else {
334                (container_spec, "tcp")
335            };
336
337            let container_key = format!("{}/{}", container_port, protocol);
338
339            exposed_ports.insert(container_key.clone(), HashMap::new());
340
341            port_bindings.insert(
342                container_key,
343                Some(vec![PortBinding {
344                    host_ip: None,
345                    host_port: Some(host_port.to_string()),
346                }]),
347            );
348        }
349
350        if exposed_ports.is_empty() {
351            None
352        } else {
353            Some((exposed_ports, port_bindings))
354        }
355    }
356
357    fn parse_env(&self) -> Option<Vec<String>> {
358        let env_str = self.env.as_ref()?;
359
360        let env_vars: Vec<String> = env_str
361            .split(',')
362            .map(|s| s.trim().to_string())
363            .filter(|s| !s.is_empty())
364            .collect();
365
366        if env_vars.is_empty() {
367            None
368        } else {
369            Some(env_vars)
370        }
371    }
372}
373
374#[derive(Debug, Clone, Copy, PartialEq, Eq)]
375enum ProducerOperation {
376    List,
377    Run,
378    Start,
379    Stop,
380    Remove,
381}
382
383fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
384    match operation {
385        "list" => Ok(ProducerOperation::List),
386        "run" => Ok(ProducerOperation::Run),
387        "start" => Ok(ProducerOperation::Start),
388        "stop" => Ok(ProducerOperation::Stop),
389        "remove" => Ok(ProducerOperation::Remove),
390        _ => Err(CamelError::ProcessorError(format!(
391            "Unknown container operation: {}",
392            operation
393        ))),
394    }
395}
396
397fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
398    exchange
399        .input
400        .header(HEADER_CONTAINER_NAME)
401        .and_then(|v| v.as_str().map(|s| s.to_string()))
402        .or_else(|| config.name.clone())
403}
404
405async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
406    let images = docker
407        .list_images::<&str>(None)
408        .await
409        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
410
411    Ok(images.iter().any(|img| {
412        img.repo_tags
413            .iter()
414            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
415    }))
416}
417
418async fn pull_image_with_progress(
419    docker: &Docker,
420    image: &str,
421    timeout_secs: u64,
422) -> Result<(), CamelError> {
423    use futures::StreamExt;
424
425    tracing::info!("Pulling image: {}", image);
426
427    let mut stream = docker.create_image(
428        Some(bollard::image::CreateImageOptions {
429            from_image: image,
430            ..Default::default()
431        }),
432        None,
433        None,
434    );
435
436    let start = std::time::Instant::now();
437    let mut last_progress = std::time::Instant::now();
438
439    while let Some(item) = stream.next().await {
440        if start.elapsed().as_secs() > timeout_secs {
441            return Err(CamelError::ProcessorError(format!(
442                "Image pull timeout after {}s. Try manually: docker pull {}",
443                timeout_secs, image
444            )));
445        }
446
447        match item {
448            Ok(update) => {
449                // Log progress every 2 seconds
450                if last_progress.elapsed().as_secs() >= 2 {
451                    if let Some(status) = update.status {
452                        tracing::debug!("Pull progress: {}", status);
453                    }
454                    last_progress = std::time::Instant::now();
455                }
456            }
457            Err(e) => {
458                let err_str = e.to_string().to_lowercase();
459                if err_str.contains("unauthorized") || err_str.contains("401") {
460                    return Err(CamelError::ProcessorError(format!(
461                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
462                        image
463                    )));
464                }
465                if err_str.contains("not found") || err_str.contains("404") {
466                    return Err(CamelError::ProcessorError(format!(
467                        "Image '{}' not found in registry. Check the image name and tag",
468                        image
469                    )));
470                }
471                return Err(CamelError::ProcessorError(format!(
472                    "Failed to pull image '{}': {}",
473                    image, e
474                )));
475            }
476        }
477    }
478
479    tracing::info!("Successfully pulled image: {}", image);
480    Ok(())
481}
482
483async fn ensure_image_available(
484    docker: &Docker,
485    image: &str,
486    auto_pull: bool,
487    timeout_secs: u64,
488) -> Result<(), CamelError> {
489    if image_exists_locally(docker, image).await? {
490        tracing::debug!("Image '{}' already available locally", image);
491        return Ok(());
492    }
493
494    if !auto_pull {
495        return Err(CamelError::ProcessorError(format!(
496            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
497            image, image
498        )));
499    }
500
501    pull_image_with_progress(docker, image, timeout_secs).await
502}
503
504fn format_docker_event(event: &bollard::models::EventMessage) -> String {
505    let action = event.action.as_deref().unwrap_or("unknown");
506    let actor = event.actor.as_ref();
507
508    let container_name = actor
509        .and_then(|a| a.attributes.as_ref())
510        .and_then(|attrs| attrs.get("name"))
511        .map(|s| s.as_str())
512        .unwrap_or("unknown");
513
514    let image = actor
515        .and_then(|a| a.attributes.as_ref())
516        .and_then(|attrs| attrs.get("image"))
517        .map(|s| s.as_str())
518        .unwrap_or("");
519
520    let exit_code = actor
521        .and_then(|a| a.attributes.as_ref())
522        .and_then(|attrs| attrs.get("exitCode"))
523        .map(|s| s.as_str());
524
525    match action {
526        "create" => {
527            if image.is_empty() {
528                format!("[CREATE] Container {}", container_name)
529            } else {
530                format!("[CREATE] Container {} ({})", container_name, image)
531            }
532        }
533        "start" => format!("[START]  Container {}", container_name),
534        "die" => {
535            if let Some(code) = exit_code {
536                format!("[DIE]    Container {} (exit: {})", container_name, code)
537            } else {
538                format!("[DIE]    Container {}", container_name)
539            }
540        }
541        "destroy" => format!("[DESTROY] Container {}", container_name),
542        "stop" => format!("[STOP]   Container {}", container_name),
543        "pause" => format!("[PAUSE]  Container {}", container_name),
544        "unpause" => format!("[UNPAUSE] Container {}", container_name),
545        "restart" => format!("[RESTART] Container {}", container_name),
546        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
547    }
548}
549
550async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
551    create: CreateFn,
552    start: StartFn,
553    remove: RemoveFn,
554) -> Result<String, CamelError>
555where
556    CreateFn: FnOnce() -> CreateFut,
557    CreateFut: Future<Output = Result<String, CamelError>>,
558    StartFn: FnOnce(String) -> StartFut,
559    StartFut: Future<Output = Result<(), CamelError>>,
560    RemoveFn: FnOnce(String) -> RemoveFut,
561    RemoveFut: Future<Output = Result<(), CamelError>>,
562{
563    let container_id = create().await?;
564    if let Err(start_err) = start(container_id.clone()).await {
565        if let Err(remove_err) = remove(container_id.clone()).await {
566            return Err(CamelError::ProcessorError(format!(
567                "Failed to start container: {}. Cleanup failed: {}",
568                start_err, remove_err
569            )));
570        }
571        return Err(start_err);
572    }
573
574    Ok(container_id)
575}
576
577/// Producer for executing container operations.
578///
579/// This producer handles synchronous container operations like listing,
580/// creating, starting, stopping, and removing containers.
581#[derive(Clone)]
582pub struct ContainerProducer {
583    config: ContainerConfig,
584    docker: Docker,
585}
586
587impl Service<Exchange> for ContainerProducer {
588    type Response = Exchange;
589    type Error = CamelError;
590    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
591
592    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
593        Poll::Ready(Ok(()))
594    }
595
596    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
597        let config = self.config.clone();
598        let docker = self.docker.clone();
599        Box::pin(async move {
600            // Extract operation from header or use config
601            let operation_name = exchange
602                .input
603                .header(HEADER_ACTION)
604                .and_then(|v| v.as_str().map(|s| s.to_string()))
605                .unwrap_or_else(|| config.operation.clone());
606
607            let operation = parse_producer_operation(&operation_name)?;
608
609            // Execute operation
610            match operation {
611                ProducerOperation::List => {
612                    let containers = docker.list_containers::<String>(None).await.map_err(|e| {
613                        CamelError::ProcessorError(format!("Failed to list containers: {}", e))
614                    })?;
615
616                    let json_value = serde_json::to_value(&containers).map_err(|e| {
617                        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
618                    })?;
619
620                    exchange.input.body = Body::Json(json_value);
621                    exchange.input.set_header(
622                        HEADER_ACTION_RESULT,
623                        serde_json::Value::String("success".to_string()),
624                    );
625                }
626                ProducerOperation::Run => {
627                    // Run operation: create and start a container from an image
628                    let image = exchange
629                        .input
630                        .header(HEADER_IMAGE)
631                        .and_then(|v| v.as_str().map(|s| s.to_string()))
632                        .or(config.image.clone())
633                        .ok_or_else(|| {
634                            CamelError::ProcessorError(
635                                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
636                            )
637                        })?;
638
639                    // Ensure image is available (auto-pull if needed)
640                    let pull_timeout = 300; // 5 minutes default
641                    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
642                        .await
643                        .map_err(|e| {
644                            CamelError::ProcessorError(format!(
645                                "Image '{}' not available: {}",
646                                image, e
647                            ))
648                        })?;
649
650                    let container_name = resolve_container_name(&exchange, &config);
651                    let container_name_ref = container_name.as_deref().unwrap_or("");
652                    let cmd_parts: Option<Vec<String>> = config
653                        .cmd
654                        .as_ref()
655                        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
656                    let auto_remove = config.auto_remove;
657                    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
658                    let env_vars = config.parse_env();
659                    let network_mode = config.network.clone();
660
661                    let docker_create = docker.clone();
662                    let docker_start = docker.clone();
663                    let docker_remove = docker.clone();
664
665                    let container_id = run_container_with_cleanup(
666                        move || async move {
667                            let create_options = bollard::container::CreateContainerOptions {
668                                name: container_name_ref,
669                                ..Default::default()
670                            };
671                            let container_config = bollard::container::Config::<String> {
672                                image: Some(image.clone()),
673                                cmd: cmd_parts,
674                                env: env_vars,
675                                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
676                                host_config: Some(HostConfig {
677                                    auto_remove: Some(auto_remove),
678                                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
679                                    network_mode,
680                                    ..Default::default()
681                                }),
682                                ..Default::default()
683                            };
684
685                            let create_response = docker_create
686                                .create_container(Some(create_options), container_config)
687                                .await
688                                .map_err(|e| {
689                                    let err_str = e.to_string().to_lowercase();
690                                    if err_str.contains("409") || err_str.contains("conflict") {
691                                        CamelError::ProcessorError(format!(
692                                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
693                                            container_name_ref
694                                        ))
695                                    } else {
696                                        CamelError::ProcessorError(format!(
697                                            "Failed to create container: {}",
698                                            e
699                                        ))
700                                    }
701                                })?;
702
703                            Ok(create_response.id)
704                        },
705                        move |container_id| async move {
706                            docker_start
707                                .start_container::<String>(&container_id, None)
708                                .await
709                                .map_err(|e| {
710                                    CamelError::ProcessorError(format!(
711                                        "Failed to start container: {}",
712                                        e
713                                    ))
714                                })
715                        },
716                        move |container_id| async move {
717                            docker_remove
718                                .remove_container(&container_id, None)
719                                .await
720                                .map_err(|e| {
721                                    CamelError::ProcessorError(format!(
722                                        "Failed to remove container after start failure: {}",
723                                        e
724                                    ))
725                                })
726                        },
727                    )
728                    .await?;
729
730                    track_container(container_id.clone());
731
732                    exchange
733                        .input
734                        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
735                    exchange.input.set_header(
736                        HEADER_ACTION_RESULT,
737                        serde_json::Value::String("success".to_string()),
738                    );
739                }
740                ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
741                    // Lifecycle operations require a container ID
742                    let container_id = exchange
743                        .input
744                        .header(HEADER_CONTAINER_ID)
745                        .and_then(|v| v.as_str().map(|s| s.to_string()))
746                        .ok_or_else(|| {
747                            CamelError::ProcessorError(format!(
748                                "{} header is required for {} operation",
749                                HEADER_CONTAINER_ID, operation_name
750                            ))
751                        })?;
752
753                    match operation {
754                        ProducerOperation::Start => {
755                            docker
756                                .start_container::<String>(&container_id, None)
757                                .await
758                                .map_err(|e| {
759                                    CamelError::ProcessorError(format!(
760                                        "Failed to start container: {}",
761                                        e
762                                    ))
763                                })?;
764                        }
765                        ProducerOperation::Stop => {
766                            docker
767                                .stop_container(&container_id, None)
768                                .await
769                                .map_err(|e| {
770                                    CamelError::ProcessorError(format!(
771                                        "Failed to stop container: {}",
772                                        e
773                                    ))
774                                })?;
775                        }
776                        ProducerOperation::Remove => {
777                            docker
778                                .remove_container(&container_id, None)
779                                .await
780                                .map_err(|e| {
781                                    CamelError::ProcessorError(format!(
782                                        "Failed to remove container: {}",
783                                        e
784                                    ))
785                                })?;
786                            untrack_container(&container_id);
787                        }
788                        _ => {}
789                    }
790
791                    // Set success result
792                    exchange.input.set_header(
793                        HEADER_ACTION_RESULT,
794                        serde_json::Value::String("success".to_string()),
795                    );
796                }
797            }
798
799            Ok(exchange)
800        })
801    }
802}
803
804/// Consumer for receiving Docker container events or logs.
805///
806/// This consumer subscribes to Docker events or container logs and forwards them
807/// to the route as exchanges. It implements automatic reconnection on connection failures.
808pub struct ContainerConsumer {
809    config: ContainerConfig,
810}
811
812#[async_trait]
813impl Consumer for ContainerConsumer {
814    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
815        match self.config.operation.as_str() {
816            "events" => self.start_events_consumer(context).await,
817            "logs" => self.start_logs_consumer(context).await,
818            _ => Err(CamelError::EndpointCreationFailed(format!(
819                "Consumer only supports 'events' or 'logs' operations, got '{}'",
820                self.config.operation
821            ))),
822        }
823    }
824
825    async fn stop(&mut self) -> Result<(), CamelError> {
826        Ok(())
827    }
828
829    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
830        camel_component_api::ConcurrencyModel::Concurrent { max: None }
831    }
832}
833
834impl ContainerConsumer {
835    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
836        use futures::StreamExt;
837
838        loop {
839            if context.is_cancelled() {
840                tracing::info!("Container events consumer shutting down");
841                return Ok(());
842            }
843
844            let docker = match self.config.connect_docker().await {
845                Ok(d) => d,
846                Err(e) => {
847                    tracing::error!(
848                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
849                        e
850                    );
851                    tokio::select! {
852                        _ = context.cancelled() => {
853                            tracing::info!("Container events consumer shutting down");
854                            return Ok(());
855                        }
856                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
857                    }
858                    continue;
859                }
860            };
861
862            let mut event_stream = docker.events::<String>(None);
863
864            loop {
865                tokio::select! {
866                    _ = context.cancelled() => {
867                        tracing::info!("Container events consumer shutting down");
868                        return Ok(());
869                    }
870
871                    msg = event_stream.next() => {
872                        match msg {
873                            Some(Ok(event)) => {
874                                let formatted = format_docker_event(&event);
875                                let message = Message::new(Body::Text(formatted));
876                                let exchange = Exchange::new(message);
877
878                                if let Err(e) = context.send(exchange).await {
879                                    tracing::error!("Failed to send exchange: {:?}", e);
880                                    break;
881                                }
882                            }
883                            Some(Err(e)) => {
884                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
885                                break;
886                            }
887                            None => {
888                                tracing::info!("Docker event stream ended. Reconnecting...");
889                                break;
890                            }
891                        }
892                    }
893                }
894            }
895
896            tokio::select! {
897                _ = context.cancelled() => {
898                    tracing::info!("Container events consumer shutting down");
899                    return Ok(());
900                }
901                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
902            }
903        }
904    }
905
906    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
907        use futures::StreamExt;
908
909        let container_id = self.config.container_id.clone().ok_or_else(|| {
910            CamelError::EndpointCreationFailed(
911                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
912                    .to_string(),
913            )
914        })?;
915
916        loop {
917            if context.is_cancelled() {
918                tracing::info!("Container logs consumer shutting down");
919                return Ok(());
920            }
921
922            let docker = match self.config.connect_docker().await {
923                Ok(d) => d,
924                Err(e) => {
925                    tracing::error!(
926                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
927                        e
928                    );
929                    tokio::select! {
930                        _ = context.cancelled() => {
931                            tracing::info!("Container logs consumer shutting down");
932                            return Ok(());
933                        }
934                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
935                    }
936                    continue;
937                }
938            };
939
940            let tail = self
941                .config
942                .tail
943                .clone()
944                .unwrap_or_else(|| "all".to_string());
945
946            let options = bollard::container::LogsOptions::<String> {
947                follow: self.config.follow,
948                stdout: true,
949                stderr: true,
950                timestamps: self.config.timestamps,
951                tail,
952                ..Default::default()
953            };
954
955            let mut log_stream = docker.logs(&container_id, Some(options));
956            let container_id_header = container_id.clone();
957
958            loop {
959                tokio::select! {
960                    _ = context.cancelled() => {
961                        tracing::info!("Container logs consumer shutting down");
962                        return Ok(());
963                    }
964
965                    msg = log_stream.next() => {
966                        match msg {
967                            Some(Ok(log_output)) => {
968                                let (stream_type, content) = match log_output {
969                                    bollard::container::LogOutput::StdOut { message } => {
970                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
971                                    }
972                                    bollard::container::LogOutput::StdErr { message } => {
973                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
974                                    }
975                                    bollard::container::LogOutput::Console { message } => {
976                                        ("console", String::from_utf8_lossy(&message).into_owned())
977                                    }
978                                    bollard::container::LogOutput::StdIn { message } => {
979                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
980                                    }
981                                };
982
983                                let content = content.trim_end();
984                                if content.is_empty() {
985                                    continue;
986                                }
987
988                                let mut message = Message::new(Body::Text(content.to_string()));
989                                message.set_header(
990                                    HEADER_CONTAINER_ID,
991                                    serde_json::Value::String(container_id_header.clone()),
992                                );
993                                message.set_header(
994                                    HEADER_LOG_STREAM,
995                                    serde_json::Value::String(stream_type.to_string()),
996                                );
997
998                                if self.config.timestamps
999                                    && let Some(ts) = extract_timestamp(content) {
1000                                        message.set_header(
1001                                            HEADER_LOG_TIMESTAMP,
1002                                            serde_json::Value::String(ts),
1003                                        );
1004                                    }
1005
1006                                let exchange = Exchange::new(message);
1007
1008                                if let Err(e) = context.send(exchange).await {
1009                                    tracing::error!("Failed to send log exchange: {:?}", e);
1010                                    break;
1011                                }
1012                            }
1013                            Some(Err(e)) => {
1014                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1015                                break;
1016                            }
1017                            None => {
1018                                if self.config.follow {
1019                                    tracing::info!("Docker log stream ended. Reconnecting...");
1020                                    break;
1021                                } else {
1022                                    tracing::info!("Container logs consumer finished (follow=false)");
1023                                    return Ok(());
1024                                }
1025                            }
1026                        }
1027                    }
1028                }
1029            }
1030
1031            tokio::select! {
1032                _ = context.cancelled() => {
1033                    tracing::info!("Container logs consumer shutting down");
1034                    return Ok(());
1035                }
1036                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1037            }
1038        }
1039    }
1040}
1041
1042fn extract_timestamp(log_line: &str) -> Option<String> {
1043    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1044    if parts.len() > 1 && parts[0].contains('T') {
1045        Some(parts[0].to_string())
1046    } else {
1047        None
1048    }
1049}
1050
1051/// Component for creating container endpoints.
1052///
1053/// This component handles URIs with the "container" scheme and creates
1054/// appropriate producer and consumer endpoints for Docker operations.
1055///
1056/// Containers created via `run` operation are tracked globally and can be
1057/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1058pub struct ContainerComponent {
1059    config: Option<ContainerGlobalConfig>,
1060}
1061
1062impl ContainerComponent {
1063    /// Creates a new container component instance without global config.
1064    pub fn new() -> Self {
1065        Self { config: None }
1066    }
1067
1068    /// Creates a container component with the given global config.
1069    pub fn with_config(config: ContainerGlobalConfig) -> Self {
1070        Self {
1071            config: Some(config),
1072        }
1073    }
1074
1075    /// Creates a container component with optional global config.
1076    pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1077        Self { config }
1078    }
1079}
1080
1081impl Default for ContainerComponent {
1082    fn default() -> Self {
1083        Self::new()
1084    }
1085}
1086
1087impl Component for ContainerComponent {
1088    fn scheme(&self) -> &str {
1089        "container"
1090    }
1091
1092    fn create_endpoint(
1093        &self,
1094        uri: &str,
1095        _ctx: &dyn camel_component_api::ComponentContext,
1096    ) -> Result<Box<dyn Endpoint>, CamelError> {
1097        let mut config = ContainerConfig::from_uri(uri)?;
1098        // Apply global defaults if present and URI didn't set them
1099        if let Some(ref global) = self.config {
1100            config.apply_global_defaults(global);
1101        }
1102        Ok(Box::new(ContainerEndpoint {
1103            uri: uri.to_string(),
1104            config,
1105        }))
1106    }
1107}
1108
1109/// Endpoint for container operations.
1110///
1111/// This endpoint creates producers for executing container operations
1112/// and consumers for receiving container events.
1113pub struct ContainerEndpoint {
1114    uri: String,
1115    config: ContainerConfig,
1116}
1117
1118impl ContainerEndpoint {
1119    /// Returns the Docker host configured for this endpoint.
1120    /// Returns `None` if not set (for testing purposes).
1121    pub fn docker_host(&self) -> Option<&str> {
1122        self.config.host.as_deref()
1123    }
1124}
1125
1126impl Endpoint for ContainerEndpoint {
1127    fn uri(&self) -> &str {
1128        &self.uri
1129    }
1130
1131    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1132        Ok(Box::new(ContainerConsumer {
1133            config: self.config.clone(),
1134        }))
1135    }
1136
1137    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1138        let docker = self.config.connect_docker_client()?;
1139        Ok(BoxProcessor::new(ContainerProducer {
1140            config: self.config.clone(),
1141            docker,
1142        }))
1143    }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use super::*;
1149    use camel_component_api::NoOpComponentContext;
1150
1151    #[test]
1152    fn test_container_config() {
1153        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1154        assert_eq!(config.operation, "run");
1155        assert_eq!(config.image.as_deref(), Some("alpine"));
1156        // host is None by default; global config applies it later
1157        assert!(config.host.is_none());
1158    }
1159
1160    #[test]
1161    fn test_global_config_applied_to_endpoint() {
1162        // When global config is set and URI doesn't specify host,
1163        // apply_global_defaults should set host from global config.
1164        let global =
1165            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1166        let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1167        assert!(
1168            config.host.is_none(),
1169            "URI without ?host= should leave host as None"
1170        );
1171        config.apply_global_defaults(&global);
1172        assert_eq!(
1173            config.host.as_deref(),
1174            Some("unix:///custom/docker.sock"),
1175            "global docker_host must be applied when URI did not set host"
1176        );
1177    }
1178
1179    #[test]
1180    fn test_uri_param_wins_over_global_config() {
1181        // When URI explicitly sets host param, apply_global_defaults must NOT override it.
1182        let global =
1183            ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1184        let mut config =
1185            ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1186                .unwrap();
1187        assert_eq!(
1188            config.host.as_deref(),
1189            Some("unix:///override.sock"),
1190            "URI-set host should be parsed correctly"
1191        );
1192        config.apply_global_defaults(&global);
1193        assert_eq!(
1194            config.host.as_deref(),
1195            Some("unix:///override.sock"),
1196            "global config must NOT override a host already set by URI"
1197        );
1198    }
1199
1200    #[test]
1201    fn test_container_config_parses_name() {
1202        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1203        assert_eq!(config.name.as_deref(), Some("my-container"));
1204    }
1205
1206    #[test]
1207    fn test_parse_producer_operation_known() {
1208        assert_eq!(
1209            parse_producer_operation("list").unwrap(),
1210            ProducerOperation::List
1211        );
1212        assert_eq!(
1213            parse_producer_operation("run").unwrap(),
1214            ProducerOperation::Run
1215        );
1216        assert_eq!(
1217            parse_producer_operation("start").unwrap(),
1218            ProducerOperation::Start
1219        );
1220        assert_eq!(
1221            parse_producer_operation("stop").unwrap(),
1222            ProducerOperation::Stop
1223        );
1224        assert_eq!(
1225            parse_producer_operation("remove").unwrap(),
1226            ProducerOperation::Remove
1227        );
1228    }
1229
1230    #[test]
1231    fn test_parse_producer_operation_unknown() {
1232        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1233        match err {
1234            CamelError::ProcessorError(msg) => {
1235                assert!(
1236                    msg.contains("Unknown container operation"),
1237                    "Unexpected error message: {}",
1238                    msg
1239                );
1240            }
1241            _ => panic!("Expected ProcessorError for unknown operation"),
1242        }
1243    }
1244
1245    #[test]
1246    fn test_resolve_container_name_header_overrides_config() {
1247        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1248        let mut exchange = Exchange::new(Message::new(""));
1249        exchange.input.set_header(
1250            HEADER_CONTAINER_NAME,
1251            serde_json::Value::String("header-name".to_string()),
1252        );
1253
1254        let resolved = resolve_container_name(&exchange, &config);
1255        assert_eq!(resolved.as_deref(), Some("header-name"));
1256    }
1257
1258    #[test]
1259    fn test_container_config_rejects_tcp_host() {
1260        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1261        let err = config.connect_docker_client().unwrap_err();
1262        match err {
1263            CamelError::ProcessorError(msg) => {
1264                assert!(
1265                    msg.to_lowercase().contains("tcp"),
1266                    "Expected TCP scheme error, got: {}",
1267                    msg
1268                );
1269            }
1270            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1271        }
1272    }
1273
1274    #[tokio::test]
1275    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1276        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1277        let remove_called_clone = remove_called.clone();
1278
1279        let result = run_container_with_cleanup(
1280            || async { Ok("container-123".to_string()) },
1281            |_id| async move {
1282                Err(CamelError::ProcessorError(
1283                    "Failed to start container".to_string(),
1284                ))
1285            },
1286            move |_id| {
1287                let remove_called_inner = remove_called_clone.clone();
1288                async move {
1289                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1290                    Ok(())
1291                }
1292            },
1293        )
1294        .await;
1295
1296        assert!(result.is_err(), "Expected start failure to bubble up");
1297        assert!(
1298            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1299            "Expected cleanup to remove container"
1300        );
1301    }
1302
1303    #[test]
1304    fn test_container_component_creates_endpoint() {
1305        let component = ContainerComponent::new();
1306        assert_eq!(component.scheme(), "container");
1307        let ctx = NoOpComponentContext;
1308        let endpoint = component
1309            .create_endpoint("container:run?image=alpine", &ctx)
1310            .unwrap();
1311        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1312    }
1313
1314    #[test]
1315    fn test_container_config_parses_ports() {
1316        let config =
1317            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1318        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1319    }
1320
1321    #[test]
1322    fn test_container_config_parses_env() {
1323        let config =
1324            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1325        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1326    }
1327
1328    #[test]
1329    fn test_container_config_parses_logs_options() {
1330        let config = ContainerConfig::from_uri(
1331            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1332        )
1333        .unwrap();
1334        assert_eq!(config.operation, "logs");
1335        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1336        assert!(config.follow);
1337        assert!(config.timestamps);
1338        assert_eq!(config.tail.as_deref(), Some("100"));
1339    }
1340
1341    #[test]
1342    fn test_container_config_logs_defaults() {
1343        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1344        assert!(config.follow); // default: true
1345        assert!(!config.timestamps); // default: false
1346        assert!(config.tail.is_none()); // default: None (all)
1347    }
1348
1349    #[test]
1350    fn test_parse_ports_single() {
1351        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1352        let (exposed, bindings) = config.parse_ports().unwrap();
1353
1354        assert!(exposed.contains_key("80/tcp"));
1355        assert!(bindings.contains_key("80/tcp"));
1356
1357        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1358        assert_eq!(binding.len(), 1);
1359        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1360    }
1361
1362    #[test]
1363    fn test_parse_ports_multiple() {
1364        let config =
1365            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1366        let (exposed, bindings) = config.parse_ports().unwrap();
1367
1368        assert!(exposed.contains_key("80/tcp"));
1369        assert!(exposed.contains_key("443/tcp"));
1370        assert_eq!(bindings.len(), 2);
1371    }
1372
1373    #[test]
1374    fn test_parse_ports_with_protocol() {
1375        let config =
1376            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1377                .unwrap();
1378        let (exposed, _bindings) = config.parse_ports().unwrap();
1379
1380        assert!(exposed.contains_key("80/tcp"));
1381        assert!(exposed.contains_key("53/udp"));
1382    }
1383
1384    #[test]
1385    fn test_parse_ports_none() {
1386        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1387        assert!(config.parse_ports().is_none());
1388    }
1389
1390    #[test]
1391    fn test_parse_env_single() {
1392        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1393        let env = config.parse_env().unwrap();
1394
1395        assert_eq!(env.len(), 1);
1396        assert_eq!(env[0], "FOO=bar");
1397    }
1398
1399    #[test]
1400    fn test_parse_env_multiple() {
1401        let config =
1402            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1403                .unwrap();
1404        let env = config.parse_env().unwrap();
1405
1406        assert_eq!(env.len(), 3);
1407        assert!(env.contains(&"FOO=bar".to_string()));
1408        assert!(env.contains(&"BAZ=qux".to_string()));
1409        assert!(env.contains(&"NUM=123".to_string()));
1410    }
1411
1412    #[test]
1413    fn test_parse_env_none() {
1414        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1415        assert!(config.parse_env().is_none());
1416    }
1417
1418    use camel_component_api::Message;
1419    use std::sync::Arc;
1420
1421    #[tokio::test]
1422    async fn test_container_producer_resolves_operation_from_header() {
1423        // Try to connect to Docker - if fails, return early
1424        let docker = match Docker::connect_with_local_defaults() {
1425            Ok(d) => d,
1426            Err(_) => {
1427                eprintln!("Skipping test: Could not connect to Docker daemon");
1428                return;
1429            }
1430        };
1431
1432        if docker.ping().await.is_err() {
1433            eprintln!("Skipping test: Docker daemon not responding to ping");
1434            return;
1435        }
1436
1437        let component = ContainerComponent::new();
1438        let ctx = NoOpComponentContext;
1439        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1440
1441        let ctx = ProducerContext::new();
1442        let mut producer = endpoint.create_producer(&ctx).unwrap();
1443
1444        let mut exchange = Exchange::new(Message::new(""));
1445        exchange
1446            .input
1447            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1448
1449        use tower::ServiceExt;
1450        let result = producer
1451            .ready()
1452            .await
1453            .unwrap()
1454            .call(exchange)
1455            .await
1456            .unwrap();
1457
1458        // For list operation, the result header should be on the input message
1459        assert_eq!(
1460            result
1461                .input
1462                .header(HEADER_ACTION_RESULT)
1463                .map(|v| v.as_str().unwrap()),
1464            Some("success")
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn test_container_producer_connection_error_on_invalid_host() {
1470        // Test that an invalid host (nonexistent socket) results in a connection error
1471        let component = ContainerComponent::new();
1472        let ctx = NoOpComponentContext;
1473        let endpoint = component
1474            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
1475            .unwrap();
1476
1477        let ctx = ProducerContext::new();
1478        let result = endpoint.create_producer(&ctx);
1479
1480        // The producer should return an error because it cannot connect to the invalid socket
1481        assert!(
1482            result.is_err(),
1483            "Expected error when connecting to invalid host"
1484        );
1485        let err = result.unwrap_err();
1486        match &err {
1487            CamelError::ProcessorError(msg) => {
1488                assert!(
1489                    msg.to_lowercase().contains("connection")
1490                        || msg.to_lowercase().contains("connect")
1491                        || msg.to_lowercase().contains("socket")
1492                        || msg.contains("docker"),
1493                    "Error message should indicate connection failure, got: {}",
1494                    msg
1495                );
1496            }
1497            _ => panic!("Expected ProcessorError, got: {:?}", err),
1498        }
1499    }
1500
1501    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
1502    #[tokio::test]
1503    async fn test_container_producer_lifecycle_operations_missing_id() {
1504        // Try to connect to Docker - if fails, return early
1505        let docker = match Docker::connect_with_local_defaults() {
1506            Ok(d) => d,
1507            Err(_) => {
1508                eprintln!("Skipping test: Could not connect to Docker daemon");
1509                return;
1510            }
1511        };
1512
1513        if docker.ping().await.is_err() {
1514            eprintln!("Skipping test: Docker daemon not responding to ping");
1515            return;
1516        }
1517
1518        let component = ContainerComponent::new();
1519        let ctx = NoOpComponentContext;
1520        let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
1521        let ctx = ProducerContext::new();
1522        let mut producer = endpoint.create_producer(&ctx).unwrap();
1523
1524        // Test each lifecycle operation without CamelContainerId header
1525        for operation in ["start", "stop", "remove"] {
1526            let mut exchange = Exchange::new(Message::new(""));
1527            exchange.input.set_header(
1528                HEADER_ACTION,
1529                serde_json::Value::String(operation.to_string()),
1530            );
1531            // Deliberately NOT setting CamelContainerId header
1532
1533            use tower::ServiceExt;
1534            let result = producer.ready().await.unwrap().call(exchange).await;
1535
1536            assert!(
1537                result.is_err(),
1538                "Expected error for {} operation without CamelContainerId",
1539                operation
1540            );
1541            let err = result.unwrap_err();
1542            match &err {
1543                CamelError::ProcessorError(msg) => {
1544                    assert!(
1545                        msg.contains(HEADER_CONTAINER_ID),
1546                        "Error message should mention {}, got: {}",
1547                        HEADER_CONTAINER_ID,
1548                        msg
1549                    );
1550                }
1551                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1552            }
1553        }
1554    }
1555
1556    /// Test that stop operation returns an error for a nonexistent container.
1557    #[tokio::test]
1558    async fn test_container_producer_stop_nonexistent() {
1559        // Try to connect to Docker - if fails, return early
1560        let docker = match Docker::connect_with_local_defaults() {
1561            Ok(d) => d,
1562            Err(_) => {
1563                eprintln!("Skipping test: Could not connect to Docker daemon");
1564                return;
1565            }
1566        };
1567
1568        if docker.ping().await.is_err() {
1569            eprintln!("Skipping test: Docker daemon not responding to ping");
1570            return;
1571        }
1572
1573        let component = ContainerComponent::new();
1574        let ctx = NoOpComponentContext;
1575        let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
1576        let ctx = ProducerContext::new();
1577        let mut producer = endpoint.create_producer(&ctx).unwrap();
1578
1579        let mut exchange = Exchange::new(Message::new(""));
1580        exchange
1581            .input
1582            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1583        exchange.input.set_header(
1584            HEADER_CONTAINER_ID,
1585            serde_json::Value::String("nonexistent-container-123".into()),
1586        );
1587
1588        use tower::ServiceExt;
1589        let result = producer.ready().await.unwrap().call(exchange).await;
1590
1591        assert!(
1592            result.is_err(),
1593            "Expected error when stopping nonexistent container"
1594        );
1595        let err = result.unwrap_err();
1596        match &err {
1597            CamelError::ProcessorError(msg) => {
1598                // Docker API returns 404 with "No such container" message
1599                assert!(
1600                    msg.to_lowercase().contains("no such container")
1601                        || msg.to_lowercase().contains("not found")
1602                        || msg.contains("404"),
1603                    "Error message should indicate container not found, got: {}",
1604                    msg
1605                );
1606            }
1607            _ => panic!("Expected ProcessorError, got: {:?}", err),
1608        }
1609    }
1610
1611    /// Test that run operation returns an error when no image is provided.
1612    #[tokio::test]
1613    async fn test_container_producer_run_missing_image() {
1614        // Try to connect to Docker - if fails, return early
1615        let docker = match Docker::connect_with_local_defaults() {
1616            Ok(d) => d,
1617            Err(_) => {
1618                eprintln!("Skipping test: Could not connect to Docker daemon");
1619                return;
1620            }
1621        };
1622
1623        if docker.ping().await.is_err() {
1624            eprintln!("Skipping test: Docker daemon not responding to ping");
1625            return;
1626        }
1627
1628        // Create producer without an image in the URI
1629        let component = ContainerComponent::new();
1630        let ctx = NoOpComponentContext;
1631        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1632        let ctx = ProducerContext::new();
1633        let mut producer = endpoint.create_producer(&ctx).unwrap();
1634
1635        let mut exchange = Exchange::new(Message::new(""));
1636        exchange
1637            .input
1638            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1639        // Deliberately NOT setting CamelContainerImage header
1640
1641        use tower::ServiceExt;
1642        let result = producer.ready().await.unwrap().call(exchange).await;
1643
1644        assert!(
1645            result.is_err(),
1646            "Expected error for run operation without image"
1647        );
1648        let err = result.unwrap_err();
1649        match &err {
1650            CamelError::ProcessorError(msg) => {
1651                assert!(
1652                    msg.to_lowercase().contains("image"),
1653                    "Error message should mention 'image', got: {}",
1654                    msg
1655                );
1656            }
1657            _ => panic!("Expected ProcessorError, got: {:?}", err),
1658        }
1659    }
1660
1661    /// Test that run operation uses image from header.
1662    #[tokio::test]
1663    async fn test_container_producer_run_image_from_header() {
1664        // Try to connect to Docker - if fails, return early
1665        let docker = match Docker::connect_with_local_defaults() {
1666            Ok(d) => d,
1667            Err(_) => {
1668                eprintln!("Skipping test: Could not connect to Docker daemon");
1669                return;
1670            }
1671        };
1672
1673        if docker.ping().await.is_err() {
1674            eprintln!("Skipping test: Docker daemon not responding to ping");
1675            return;
1676        }
1677
1678        // Create producer without an image in the URI
1679        let component = ContainerComponent::new();
1680        let ctx = NoOpComponentContext;
1681        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1682        let ctx = ProducerContext::new();
1683        let mut producer = endpoint.create_producer(&ctx).unwrap();
1684
1685        let mut exchange = Exchange::new(Message::new(""));
1686        exchange
1687            .input
1688            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1689        // Set a non-existent image to test that the run operation attempts to use it
1690        exchange.input.set_header(
1691            HEADER_IMAGE,
1692            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1693        );
1694
1695        use tower::ServiceExt;
1696        let result = producer.ready().await.unwrap().call(exchange).await;
1697
1698        // The operation should fail because the image doesn't exist
1699        assert!(
1700            result.is_err(),
1701            "Expected error when running container with nonexistent image"
1702        );
1703        let err = result.unwrap_err();
1704        match &err {
1705            CamelError::ProcessorError(msg) => {
1706                // Docker API returns an error about the missing image
1707                assert!(
1708                    msg.to_lowercase().contains("no such image")
1709                        || msg.to_lowercase().contains("not found")
1710                        || msg.to_lowercase().contains("image")
1711                        || msg.to_lowercase().contains("pull")
1712                        || msg.contains("404"),
1713                    "Error message should indicate image issue, got: {}",
1714                    msg
1715                );
1716            }
1717            _ => panic!("Expected ProcessorError, got: {:?}", err),
1718        }
1719    }
1720
1721    /// Integration test: Actually run a container with alpine:latest.
1722    /// This test verifies the full flow: create → start → set headers.
1723    #[tokio::test]
1724    async fn test_container_producer_run_alpine_container() {
1725        let docker = match Docker::connect_with_local_defaults() {
1726            Ok(d) => d,
1727            Err(_) => {
1728                eprintln!("Skipping test: Could not connect to Docker daemon");
1729                return;
1730            }
1731        };
1732
1733        if docker.ping().await.is_err() {
1734            eprintln!("Skipping test: Docker daemon not responding to ping");
1735            return;
1736        }
1737
1738        // Pull alpine:latest if not present
1739        let images = docker.list_images::<&str>(None).await.unwrap();
1740        let has_alpine = images
1741            .iter()
1742            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1743
1744        if !has_alpine {
1745            eprintln!("Pulling alpine:latest image...");
1746            let mut stream = docker.create_image(
1747                Some(bollard::image::CreateImageOptions {
1748                    from_image: "alpine:latest",
1749                    ..Default::default()
1750                }),
1751                None,
1752                None,
1753            );
1754
1755            use futures::StreamExt;
1756            while let Some(_item) = stream.next().await {
1757                // Wait for pull to complete
1758            }
1759            eprintln!("Image pulled successfully");
1760        }
1761
1762        // Create producer
1763        let component = ContainerComponent::new();
1764        let ctx = NoOpComponentContext;
1765        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1766        let ctx = ProducerContext::new();
1767        let mut producer = endpoint.create_producer(&ctx).unwrap();
1768
1769        // Run container with unique name
1770        let timestamp = std::time::SystemTime::now()
1771            .duration_since(std::time::UNIX_EPOCH)
1772            .unwrap()
1773            .as_millis();
1774        let container_name = format!("test-rust-camel-{}", timestamp);
1775        let mut exchange = Exchange::new(Message::new(""));
1776        exchange.input.set_header(
1777            HEADER_IMAGE,
1778            serde_json::Value::String("alpine:latest".into()),
1779        );
1780        exchange.input.set_header(
1781            HEADER_CONTAINER_NAME,
1782            serde_json::Value::String(container_name.clone()),
1783        );
1784
1785        use tower::ServiceExt;
1786        let result = producer
1787            .ready()
1788            .await
1789            .unwrap()
1790            .call(exchange)
1791            .await
1792            .expect("Container run should succeed");
1793
1794        // Verify container ID was set
1795        let container_id = result
1796            .input
1797            .header(HEADER_CONTAINER_ID)
1798            .and_then(|v| v.as_str().map(|s| s.to_string()))
1799            .expect("Expected container ID header");
1800        assert!(!container_id.is_empty(), "Container ID should not be empty");
1801
1802        // Verify success header
1803        assert_eq!(
1804            result
1805                .input
1806                .header(HEADER_ACTION_RESULT)
1807                .and_then(|v| v.as_str()),
1808            Some("success")
1809        );
1810
1811        // Verify container exists in Docker
1812        let inspect = docker
1813            .inspect_container(&container_id, None)
1814            .await
1815            .expect("Container should exist");
1816        assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
1817
1818        // Cleanup: remove container
1819        docker
1820            .remove_container(
1821                &container_id,
1822                Some(bollard::container::RemoveContainerOptions {
1823                    force: true,
1824                    ..Default::default()
1825                }),
1826            )
1827            .await
1828            .ok();
1829
1830        eprintln!("✅ Container {} created and cleaned up", container_id);
1831    }
1832
1833    /// Test that consumer returns an error for unsupported operations.
1834    #[tokio::test]
1835    async fn test_container_consumer_unsupported_operation() {
1836        use tokio::sync::mpsc;
1837
1838        let component = ContainerComponent::new();
1839        let ctx = NoOpComponentContext;
1840        let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1841        let mut consumer = endpoint.create_consumer().unwrap();
1842
1843        // Create a minimal ConsumerContext
1844        let (tx, _rx) = mpsc::channel(16);
1845        let cancel_token = tokio_util::sync::CancellationToken::new();
1846        let context = ConsumerContext::new(tx, cancel_token);
1847
1848        let result = consumer.start(context).await;
1849
1850        // Should return error because "run" is not a supported consumer operation
1851        assert!(
1852            result.is_err(),
1853            "Expected error for unsupported consumer operation"
1854        );
1855        let err = result.unwrap_err();
1856        match &err {
1857            CamelError::EndpointCreationFailed(msg) => {
1858                assert!(
1859                    msg.contains("Consumer only supports 'events' or 'logs'"),
1860                    "Error message should mention events or logs support, got: {}",
1861                    msg
1862                );
1863            }
1864            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1865        }
1866    }
1867
1868    #[test]
1869    fn test_container_consumer_concurrency_model_is_concurrent() {
1870        let consumer = ContainerConsumer {
1871            config: ContainerConfig::from_uri("container:events").unwrap(),
1872        };
1873
1874        assert_eq!(
1875            consumer.concurrency_model(),
1876            camel_component_api::ConcurrencyModel::Concurrent { max: None }
1877        );
1878    }
1879
1880    /// Test that consumer gracefully shuts down when cancellation is requested.
1881    /// This test requires a running Docker daemon. If Docker is not available, the test
1882    /// will be ignored.
1883    #[tokio::test]
1884    async fn test_container_consumer_cancellation() {
1885        use std::sync::atomic::{AtomicBool, Ordering};
1886        use tokio::sync::mpsc;
1887
1888        // Try to connect to Docker - if fails, return early
1889        let docker = match Docker::connect_with_local_defaults() {
1890            Ok(d) => d,
1891            Err(_) => {
1892                eprintln!("Skipping test: Could not connect to Docker daemon");
1893                return;
1894            }
1895        };
1896
1897        if docker.ping().await.is_err() {
1898            eprintln!("Skipping test: Docker daemon not responding to ping");
1899            return;
1900        }
1901
1902        let component = ContainerComponent::new();
1903        let ctx = NoOpComponentContext;
1904        let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
1905        let mut consumer = endpoint.create_consumer().unwrap();
1906
1907        // Create a ConsumerContext
1908        let (tx, _rx) = mpsc::channel(16);
1909        let cancel_token = tokio_util::sync::CancellationToken::new();
1910        let context = ConsumerContext::new(tx, cancel_token.clone());
1911
1912        // Track if the consumer task has completed
1913        let completed = Arc::new(AtomicBool::new(false));
1914        let completed_clone = completed.clone();
1915
1916        // Spawn consumer in background
1917        let handle = tokio::spawn(async move {
1918            let result = consumer.start(context).await;
1919            // Mark as completed when done
1920            completed_clone.store(true, Ordering::SeqCst);
1921            result
1922        });
1923
1924        // Wait a bit for the consumer to start
1925        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1926
1927        // Consumer should still be running (not completed)
1928        assert!(
1929            !completed.load(Ordering::SeqCst),
1930            "Consumer should still be running before cancellation"
1931        );
1932
1933        // Request cancellation
1934        cancel_token.cancel();
1935
1936        // Wait for the task to complete (with timeout)
1937        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1938
1939        // Task should have completed (not timed out)
1940        assert!(
1941            result.is_ok(),
1942            "Consumer should gracefully shut down after cancellation"
1943        );
1944
1945        // Verify the consumer completed
1946        assert!(
1947            completed.load(Ordering::SeqCst),
1948            "Consumer should have completed after cancellation"
1949        );
1950    }
1951
1952    /// Integration test for listing containers.
1953    /// This test requires a running Docker daemon. If Docker is not available, the test
1954    /// will return early and be effectively ignored.
1955    #[tokio::test]
1956    async fn test_container_producer_list_containers() {
1957        // Try to connect to Docker using default config
1958        // If ping fails, return early (effectively ignoring this test)
1959        let docker = match Docker::connect_with_local_defaults() {
1960            Ok(d) => d,
1961            Err(_) => {
1962                eprintln!("Skipping test: Could not connect to Docker daemon");
1963                return;
1964            }
1965        };
1966
1967        if docker.ping().await.is_err() {
1968            eprintln!("Skipping test: Docker daemon not responding to ping");
1969            return;
1970        }
1971
1972        // Create producer with list operation
1973        let component = ContainerComponent::new();
1974        let ctx = NoOpComponentContext;
1975        let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
1976
1977        let ctx = ProducerContext::new();
1978        let mut producer = endpoint.create_producer(&ctx).unwrap();
1979
1980        // Create exchange with list operation in header
1981        let mut exchange = Exchange::new(Message::new(""));
1982        exchange
1983            .input
1984            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1985
1986        // Call the producer
1987        use tower::ServiceExt;
1988        let result = producer
1989            .ready()
1990            .await
1991            .unwrap()
1992            .call(exchange)
1993            .await
1994            .expect("Producer should succeed when Docker is available");
1995
1996        // Assert that the input exchange body is a JSON array
1997        // (because list_containers should put the JSON result in the body)
1998        match &result.input.body {
1999            camel_component_api::Body::Json(json_value) => {
2000                assert!(
2001                    json_value.is_array(),
2002                    "Expected input body to be a JSON array, got: {:?}",
2003                    json_value
2004                );
2005            }
2006            other => panic!("Expected Body::Json with array, got: {:?}", other),
2007        }
2008    }
2009}