Skip to main content

camel_component_container/
lib.rs

1//! Camel Container Component
2//!
3//! This component provides integration with Docker containers, allowing Camel routes
4//! to manage container lifecycle (create, start, stop, remove) and consume container events.
5
6use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20/// Global tracker for containers created by this component.
21/// Used for cleanup on shutdown (especially important for hot-reload scenarios).
22static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25/// Registers a container ID for tracking (will be cleaned up on shutdown).
26fn track_container(id: String) {
27    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28        tracker.insert(id);
29    }
30}
31
32/// Removes a container ID from tracking (when it's been removed naturally).
33fn untrack_container(id: &str) {
34    if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35        tracker.remove(id);
36    }
37}
38
39/// Cleans up all tracked containers. Call this on application shutdown.
40pub async fn cleanup_tracked_containers() {
41    let ids: Vec<String> = {
42        match CONTAINER_TRACKER.lock() {
43            Ok(tracker) => tracker.iter().cloned().collect(),
44            Err(_) => return,
45        }
46    };
47
48    if ids.is_empty() {
49        return;
50    }
51
52    tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54    let docker = match Docker::connect_with_local_defaults() {
55        Ok(d) => d,
56        Err(e) => {
57            tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58            return;
59        }
60    };
61
62    for id in ids {
63        match docker
64            .remove_container(
65                &id,
66                Some(bollard::container::RemoveContainerOptions {
67                    force: true,
68                    ..Default::default()
69                }),
70            )
71            .await
72        {
73            Ok(_) => {
74                tracing::debug!("Cleaned up container {}", id);
75                untrack_container(&id);
76            }
77            Err(e) => {
78                tracing::warn!("Failed to cleanup container {}: {}", id, e);
79            }
80        }
81    }
82}
83
84// Header constants for container operations
85
86/// Timeout (seconds) for connecting to the Docker daemon.
87const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89/// Header key for specifying the container action (e.g., "list", "run", "start", "stop", "remove").
90pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92/// Header key for specifying the container image to use for "run" operations.
93pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95/// Header key for specifying or receiving the container ID.
96pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98/// Header key for the log stream type (stdout or stderr).
99pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101/// Header key for the log timestamp.
102pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104/// Header key for specifying the container name for "run" operations.
105pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107/// Header key for the result status of a container operation (e.g., "success").
108pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110/// Configuration for the container component endpoint.
111///
112/// This struct holds the parsed URI configuration including the operation type,
113/// optional container image, and Docker host connection details.
114#[derive(Debug, Clone)]
115pub struct ContainerConfig {
116    /// The operation to perform (e.g., "list", "run", "start", "stop", "remove", "events").
117    pub operation: String,
118    /// The container image to use for "run" operations (can be overridden via header).
119    pub image: Option<String>,
120    /// The container name to use for "run" operations (can be overridden via header).
121    pub name: Option<String>,
122    /// The Docker host URL (defaults to "unix:///var/run/docker.sock").
123    pub host: Option<String>,
124    /// Command to run in the container (e.g., "sleep 30").
125    pub cmd: Option<String>,
126    /// Port mappings in format "hostPort:containerPort" (e.g., "8080:80,8443:443").
127    pub ports: Option<String>,
128    /// Environment variables in format "KEY=value,KEY2=value2".
129    pub env: Option<String>,
130    /// Network mode (e.g., "bridge", "host", "none"). Default: "bridge".
131    pub network: Option<String>,
132    /// Container ID or name for logs consumer.
133    pub container_id: Option<String>,
134    /// Follow log output (default: true for consumer).
135    pub follow: bool,
136    /// Include timestamps in logs (default: false).
137    pub timestamps: bool,
138    /// Number of lines to show from the end of logs (default: all).
139    pub tail: Option<String>,
140    /// Automatically pull the image if not present (default: true).
141    pub auto_pull: bool,
142    /// Automatically remove the container when it exits (default: true).
143    pub auto_remove: bool,
144}
145
146impl ContainerConfig {
147    /// Parses a container URI into a `ContainerConfig`.
148    ///
149    /// # Arguments
150    /// * `uri` - The URI to parse (e.g., "container:run?image=alpine")
151    ///
152    /// # Errors
153    /// Returns an error if the URI scheme is not "container".
154    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
155        let parts = parse_uri(uri)?;
156        if parts.scheme != "container" {
157            return Err(CamelError::InvalidUri(format!(
158                "expected scheme 'container', got '{}'",
159                parts.scheme
160            )));
161        }
162
163        let image = parts.params.get("image").cloned();
164        let name = parts.params.get("name").cloned();
165        let cmd = parts.params.get("cmd").cloned();
166        let ports = parts.params.get("ports").cloned();
167        let env = parts.params.get("env").cloned();
168        let network = parts.params.get("network").cloned();
169        let container_id = parts.params.get("containerId").cloned();
170        let follow = parts
171            .params
172            .get("follow")
173            .map(|v| v.eq_ignore_ascii_case("true"))
174            .unwrap_or(true);
175        let timestamps = parts
176            .params
177            .get("timestamps")
178            .map(|v| v.eq_ignore_ascii_case("true"))
179            .unwrap_or(false);
180        let tail = parts.params.get("tail").cloned();
181        let auto_pull = parts
182            .params
183            .get("autoPull")
184            .map(|v| v.eq_ignore_ascii_case("true"))
185            .unwrap_or(true);
186        let auto_remove = parts
187            .params
188            .get("autoRemove")
189            .map(|v| v.eq_ignore_ascii_case("true"))
190            .unwrap_or(true);
191        let host = parts
192            .params
193            .get("host")
194            .cloned()
195            .or_else(|| {
196                Some(if cfg!(windows) {
197                    "npipe:////./pipe/docker_engine".to_string()
198                } else {
199                    "unix:///var/run/docker.sock".to_string()
200                })
201            });
202
203        Ok(Self {
204            operation: parts.path,
205            image,
206            name,
207            host,
208            cmd,
209            ports,
210            env,
211            network,
212            container_id,
213            follow,
214            timestamps,
215            tail,
216            auto_pull,
217            auto_remove,
218        })
219    }
220
221    fn docker_socket_path(&self) -> Result<&str, CamelError> {
222        let host = self
223            .host
224            .as_deref()
225            .unwrap_or(if cfg!(windows) {
226                "npipe:////./pipe/docker_engine"
227            } else {
228                "unix:///var/run/docker.sock"
229            });
230
231        if host.starts_with("unix://") || host.starts_with("npipe://") {
232            return Ok(host);
233        }
234
235        if host.contains("://") {
236            return Err(CamelError::ProcessorError(format!(
237                "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
238                host
239            )));
240        }
241
242        Ok(host)
243    }
244
245    pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
246        let socket_path = self.docker_socket_path()?;
247        Docker::connect_with_socket(
248            socket_path,
249            DOCKER_CONNECT_TIMEOUT_SECS,
250            bollard::API_DEFAULT_VERSION,
251        )
252        .map_err(|e| {
253            CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
254        })
255    }
256
257    /// Connects to the Docker daemon using the configured host.
258    ///
259    /// This method establishes a Unix socket connection to Docker and verifies
260    /// the connection by sending a ping request.
261    ///
262    /// # Errors
263    /// Returns an error if the connection fails or the ping request fails.
264    pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
265        let docker = self.connect_docker_client()?;
266        docker
267            .ping()
268            .await
269            .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
270        Ok(docker)
271    }
272
273    #[allow(clippy::type_complexity)]
274    fn parse_ports(
275        &self,
276    ) -> Option<(
277        HashMap<String, HashMap<(), ()>>,
278        HashMap<String, Option<Vec<PortBinding>>>,
279    )> {
280        let ports_str = self.ports.as_ref()?;
281
282        let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
283        let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
284
285        for mapping in ports_str.split(',') {
286            let mapping = mapping.trim();
287            if mapping.is_empty() {
288                continue;
289            }
290
291            let (host_port, container_spec) = mapping.split_once(':')?;
292
293            let (container_port, protocol) = if container_spec.contains('/') {
294                let parts: Vec<&str> = container_spec.split('/').collect();
295                (parts[0], parts[1])
296            } else {
297                (container_spec, "tcp")
298            };
299
300            let container_key = format!("{}/{}", container_port, protocol);
301
302            exposed_ports.insert(container_key.clone(), HashMap::new());
303
304            port_bindings.insert(
305                container_key,
306                Some(vec![PortBinding {
307                    host_ip: None,
308                    host_port: Some(host_port.to_string()),
309                }]),
310            );
311        }
312
313        if exposed_ports.is_empty() {
314            None
315        } else {
316            Some((exposed_ports, port_bindings))
317        }
318    }
319
320    fn parse_env(&self) -> Option<Vec<String>> {
321        let env_str = self.env.as_ref()?;
322
323        let env_vars: Vec<String> = env_str
324            .split(',')
325            .map(|s| s.trim().to_string())
326            .filter(|s| !s.is_empty())
327            .collect();
328
329        if env_vars.is_empty() {
330            None
331        } else {
332            Some(env_vars)
333        }
334    }
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338enum ProducerOperation {
339    List,
340    Run,
341    Start,
342    Stop,
343    Remove,
344}
345
346fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
347    match operation {
348        "list" => Ok(ProducerOperation::List),
349        "run" => Ok(ProducerOperation::Run),
350        "start" => Ok(ProducerOperation::Start),
351        "stop" => Ok(ProducerOperation::Stop),
352        "remove" => Ok(ProducerOperation::Remove),
353        _ => Err(CamelError::ProcessorError(format!(
354            "Unknown container operation: {}",
355            operation
356        ))),
357    }
358}
359
360fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
361    exchange
362        .input
363        .header(HEADER_CONTAINER_NAME)
364        .and_then(|v| v.as_str().map(|s| s.to_string()))
365        .or_else(|| config.name.clone())
366}
367
368async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
369    let images = docker
370        .list_images::<&str>(None)
371        .await
372        .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
373
374    Ok(images.iter().any(|img| {
375        img.repo_tags
376            .iter()
377            .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
378    }))
379}
380
381async fn pull_image_with_progress(
382    docker: &Docker,
383    image: &str,
384    timeout_secs: u64,
385) -> Result<(), CamelError> {
386    use futures::StreamExt;
387
388    tracing::info!("Pulling image: {}", image);
389
390    let mut stream = docker.create_image(
391        Some(bollard::image::CreateImageOptions {
392            from_image: image,
393            ..Default::default()
394        }),
395        None,
396        None,
397    );
398
399    let start = std::time::Instant::now();
400    let mut last_progress = std::time::Instant::now();
401
402    while let Some(item) = stream.next().await {
403        if start.elapsed().as_secs() > timeout_secs {
404            return Err(CamelError::ProcessorError(format!(
405                "Image pull timeout after {}s. Try manually: docker pull {}",
406                timeout_secs, image
407            )));
408        }
409
410        match item {
411            Ok(update) => {
412                // Log progress every 2 seconds
413                if last_progress.elapsed().as_secs() >= 2 {
414                    if let Some(status) = update.status {
415                        tracing::debug!("Pull progress: {}", status);
416                    }
417                    last_progress = std::time::Instant::now();
418                }
419            }
420            Err(e) => {
421                let err_str = e.to_string().to_lowercase();
422                if err_str.contains("unauthorized") || err_str.contains("401") {
423                    return Err(CamelError::ProcessorError(format!(
424                        "Authentication required for image '{}'. Configure Docker credentials: docker login",
425                        image
426                    )));
427                }
428                if err_str.contains("not found") || err_str.contains("404") {
429                    return Err(CamelError::ProcessorError(format!(
430                        "Image '{}' not found in registry. Check the image name and tag",
431                        image
432                    )));
433                }
434                return Err(CamelError::ProcessorError(format!(
435                    "Failed to pull image '{}': {}",
436                    image, e
437                )));
438            }
439        }
440    }
441
442    tracing::info!("Successfully pulled image: {}", image);
443    Ok(())
444}
445
446async fn ensure_image_available(
447    docker: &Docker,
448    image: &str,
449    auto_pull: bool,
450    timeout_secs: u64,
451) -> Result<(), CamelError> {
452    if image_exists_locally(docker, image).await? {
453        tracing::debug!("Image '{}' already available locally", image);
454        return Ok(());
455    }
456
457    if !auto_pull {
458        return Err(CamelError::ProcessorError(format!(
459            "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
460            image, image
461        )));
462    }
463
464    pull_image_with_progress(docker, image, timeout_secs).await
465}
466
467fn format_docker_event(event: &bollard::models::EventMessage) -> String {
468    let action = event.action.as_deref().unwrap_or("unknown");
469    let actor = event.actor.as_ref();
470
471    let container_name = actor
472        .and_then(|a| a.attributes.as_ref())
473        .and_then(|attrs| attrs.get("name"))
474        .map(|s| s.as_str())
475        .unwrap_or("unknown");
476
477    let image = actor
478        .and_then(|a| a.attributes.as_ref())
479        .and_then(|attrs| attrs.get("image"))
480        .map(|s| s.as_str())
481        .unwrap_or("");
482
483    let exit_code = actor
484        .and_then(|a| a.attributes.as_ref())
485        .and_then(|attrs| attrs.get("exitCode"))
486        .map(|s| s.as_str());
487
488    match action {
489        "create" => {
490            if image.is_empty() {
491                format!("[CREATE] Container {}", container_name)
492            } else {
493                format!("[CREATE] Container {} ({})", container_name, image)
494            }
495        }
496        "start" => format!("[START]  Container {}", container_name),
497        "die" => {
498            if let Some(code) = exit_code {
499                format!("[DIE]    Container {} (exit: {})", container_name, code)
500            } else {
501                format!("[DIE]    Container {}", container_name)
502            }
503        }
504        "destroy" => format!("[DESTROY] Container {}", container_name),
505        "stop" => format!("[STOP]   Container {}", container_name),
506        "pause" => format!("[PAUSE]  Container {}", container_name),
507        "unpause" => format!("[UNPAUSE] Container {}", container_name),
508        "restart" => format!("[RESTART] Container {}", container_name),
509        _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
510    }
511}
512
513async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
514    create: CreateFn,
515    start: StartFn,
516    remove: RemoveFn,
517) -> Result<String, CamelError>
518where
519    CreateFn: FnOnce() -> CreateFut,
520    CreateFut: Future<Output = Result<String, CamelError>>,
521    StartFn: FnOnce(String) -> StartFut,
522    StartFut: Future<Output = Result<(), CamelError>>,
523    RemoveFn: FnOnce(String) -> RemoveFut,
524    RemoveFut: Future<Output = Result<(), CamelError>>,
525{
526    let container_id = create().await?;
527    if let Err(start_err) = start(container_id.clone()).await {
528        if let Err(remove_err) = remove(container_id.clone()).await {
529            return Err(CamelError::ProcessorError(format!(
530                "Failed to start container: {}. Cleanup failed: {}",
531                start_err, remove_err
532            )));
533        }
534        return Err(start_err);
535    }
536
537    Ok(container_id)
538}
539
540/// Producer for executing container operations.
541///
542/// This producer handles synchronous container operations like listing,
543/// creating, starting, stopping, and removing containers.
544#[derive(Clone)]
545pub struct ContainerProducer {
546    config: ContainerConfig,
547    docker: Docker,
548}
549
550impl Service<Exchange> for ContainerProducer {
551    type Response = Exchange;
552    type Error = CamelError;
553    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
554
555    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
556        Poll::Ready(Ok(()))
557    }
558
559    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
560        let config = self.config.clone();
561        let docker = self.docker.clone();
562        Box::pin(async move {
563            // Extract operation from header or use config
564            let operation_name = exchange
565                .input
566                .header(HEADER_ACTION)
567                .and_then(|v| v.as_str().map(|s| s.to_string()))
568                .unwrap_or_else(|| config.operation.clone());
569
570            let operation = parse_producer_operation(&operation_name)?;
571
572            // Execute operation
573            match operation {
574                ProducerOperation::List => {
575                    let containers = docker.list_containers::<String>(None).await.map_err(|e| {
576                        CamelError::ProcessorError(format!("Failed to list containers: {}", e))
577                    })?;
578
579                    let json_value = serde_json::to_value(&containers).map_err(|e| {
580                        CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
581                    })?;
582
583                    exchange.input.body = Body::Json(json_value);
584                    exchange.input.set_header(
585                        HEADER_ACTION_RESULT,
586                        serde_json::Value::String("success".to_string()),
587                    );
588                }
589                ProducerOperation::Run => {
590                    // Run operation: create and start a container from an image
591                    let image = exchange
592                        .input
593                        .header(HEADER_IMAGE)
594                        .and_then(|v| v.as_str().map(|s| s.to_string()))
595                        .or(config.image.clone())
596                        .ok_or_else(|| {
597                            CamelError::ProcessorError(
598                                "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
599                            )
600                        })?;
601
602                    // Ensure image is available (auto-pull if needed)
603                    let pull_timeout = 300; // 5 minutes default
604                    ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
605                        .await
606                        .map_err(|e| {
607                            CamelError::ProcessorError(format!(
608                                "Image '{}' not available: {}",
609                                image, e
610                            ))
611                        })?;
612
613                    let container_name = resolve_container_name(&exchange, &config);
614                    let container_name_ref = container_name.as_deref().unwrap_or("");
615                    let cmd_parts: Option<Vec<String>> = config
616                        .cmd
617                        .as_ref()
618                        .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
619                    let auto_remove = config.auto_remove;
620                    let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
621                    let env_vars = config.parse_env();
622                    let network_mode = config.network.clone();
623
624                    let docker_create = docker.clone();
625                    let docker_start = docker.clone();
626                    let docker_remove = docker.clone();
627
628                    let container_id = run_container_with_cleanup(
629                        move || async move {
630                            let create_options = bollard::container::CreateContainerOptions {
631                                name: container_name_ref,
632                                ..Default::default()
633                            };
634                            let container_config = bollard::container::Config::<String> {
635                                image: Some(image.clone()),
636                                cmd: cmd_parts,
637                                env: env_vars,
638                                exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
639                                host_config: Some(HostConfig {
640                                    auto_remove: Some(auto_remove),
641                                    port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
642                                    network_mode,
643                                    ..Default::default()
644                                }),
645                                ..Default::default()
646                            };
647
648                            let create_response = docker_create
649                                .create_container(Some(create_options), container_config)
650                                .await
651                                .map_err(|e| {
652                                    let err_str = e.to_string().to_lowercase();
653                                    if err_str.contains("409") || err_str.contains("conflict") {
654                                        CamelError::ProcessorError(format!(
655                                            "Container name '{}' already exists. Use a unique name or remove the existing container first",
656                                            container_name_ref
657                                        ))
658                                    } else {
659                                        CamelError::ProcessorError(format!(
660                                            "Failed to create container: {}",
661                                            e
662                                        ))
663                                    }
664                                })?;
665
666                            Ok(create_response.id)
667                        },
668                        move |container_id| async move {
669                            docker_start
670                                .start_container::<String>(&container_id, None)
671                                .await
672                                .map_err(|e| {
673                                    CamelError::ProcessorError(format!(
674                                        "Failed to start container: {}",
675                                        e
676                                    ))
677                                })
678                        },
679                        move |container_id| async move {
680                            docker_remove
681                                .remove_container(&container_id, None)
682                                .await
683                                .map_err(|e| {
684                                    CamelError::ProcessorError(format!(
685                                        "Failed to remove container after start failure: {}",
686                                        e
687                                    ))
688                                })
689                        },
690                    )
691                    .await?;
692
693                    track_container(container_id.clone());
694
695                    exchange
696                        .input
697                        .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
698                    exchange.input.set_header(
699                        HEADER_ACTION_RESULT,
700                        serde_json::Value::String("success".to_string()),
701                    );
702                }
703                ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
704                    // Lifecycle operations require a container ID
705                    let container_id = exchange
706                        .input
707                        .header(HEADER_CONTAINER_ID)
708                        .and_then(|v| v.as_str().map(|s| s.to_string()))
709                        .ok_or_else(|| {
710                            CamelError::ProcessorError(format!(
711                                "{} header is required for {} operation",
712                                HEADER_CONTAINER_ID, operation_name
713                            ))
714                        })?;
715
716                    match operation {
717                        ProducerOperation::Start => {
718                            docker
719                                .start_container::<String>(&container_id, None)
720                                .await
721                                .map_err(|e| {
722                                    CamelError::ProcessorError(format!(
723                                        "Failed to start container: {}",
724                                        e
725                                    ))
726                                })?;
727                        }
728                        ProducerOperation::Stop => {
729                            docker
730                                .stop_container(&container_id, None)
731                                .await
732                                .map_err(|e| {
733                                    CamelError::ProcessorError(format!(
734                                        "Failed to stop container: {}",
735                                        e
736                                    ))
737                                })?;
738                        }
739                        ProducerOperation::Remove => {
740                            docker
741                                .remove_container(&container_id, None)
742                                .await
743                                .map_err(|e| {
744                                    CamelError::ProcessorError(format!(
745                                        "Failed to remove container: {}",
746                                        e
747                                    ))
748                                })?;
749                            untrack_container(&container_id);
750                        }
751                        _ => {}
752                    }
753
754                    // Set success result
755                    exchange.input.set_header(
756                        HEADER_ACTION_RESULT,
757                        serde_json::Value::String("success".to_string()),
758                    );
759                }
760            }
761
762            Ok(exchange)
763        })
764    }
765}
766
767/// Consumer for receiving Docker container events or logs.
768///
769/// This consumer subscribes to Docker events or container logs and forwards them
770/// to the route as exchanges. It implements automatic reconnection on connection failures.
771pub struct ContainerConsumer {
772    config: ContainerConfig,
773}
774
775#[async_trait]
776impl Consumer for ContainerConsumer {
777    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
778        match self.config.operation.as_str() {
779            "events" => self.start_events_consumer(context).await,
780            "logs" => self.start_logs_consumer(context).await,
781            _ => Err(CamelError::EndpointCreationFailed(format!(
782                "Consumer only supports 'events' or 'logs' operations, got '{}'",
783                self.config.operation
784            ))),
785        }
786    }
787
788    async fn stop(&mut self) -> Result<(), CamelError> {
789        Ok(())
790    }
791
792    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
793        camel_component::ConcurrencyModel::Concurrent { max: None }
794    }
795}
796
797impl ContainerConsumer {
798    async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
799        use futures::StreamExt;
800
801        loop {
802            if context.is_cancelled() {
803                tracing::info!("Container events consumer shutting down");
804                return Ok(());
805            }
806
807            let docker = match self.config.connect_docker().await {
808                Ok(d) => d,
809                Err(e) => {
810                    tracing::error!(
811                        "Consumer failed to connect to docker: {}. Retrying in 5s...",
812                        e
813                    );
814                    tokio::select! {
815                        _ = context.cancelled() => {
816                            tracing::info!("Container events consumer shutting down");
817                            return Ok(());
818                        }
819                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
820                    }
821                    continue;
822                }
823            };
824
825            let mut event_stream = docker.events::<String>(None);
826
827            loop {
828                tokio::select! {
829                    _ = context.cancelled() => {
830                        tracing::info!("Container events consumer shutting down");
831                        return Ok(());
832                    }
833
834                    msg = event_stream.next() => {
835                        match msg {
836                            Some(Ok(event)) => {
837                                let formatted = format_docker_event(&event);
838                                let message = Message::new(Body::Text(formatted));
839                                let exchange = Exchange::new(message);
840
841                                if let Err(e) = context.send(exchange).await {
842                                    tracing::error!("Failed to send exchange: {:?}", e);
843                                    break;
844                                }
845                            }
846                            Some(Err(e)) => {
847                                tracing::error!("Docker event stream error: {}. Reconnecting...", e);
848                                break;
849                            }
850                            None => {
851                                tracing::info!("Docker event stream ended. Reconnecting...");
852                                break;
853                            }
854                        }
855                    }
856                }
857            }
858
859            tokio::select! {
860                _ = context.cancelled() => {
861                    tracing::info!("Container events consumer shutting down");
862                    return Ok(());
863                }
864                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
865            }
866        }
867    }
868
869    async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
870        use futures::StreamExt;
871
872        let container_id = self.config.container_id.clone().ok_or_else(|| {
873            CamelError::EndpointCreationFailed(
874                "containerId is required for logs consumer. Use container:logs?containerId=xxx"
875                    .to_string(),
876            )
877        })?;
878
879        loop {
880            if context.is_cancelled() {
881                tracing::info!("Container logs consumer shutting down");
882                return Ok(());
883            }
884
885            let docker = match self.config.connect_docker().await {
886                Ok(d) => d,
887                Err(e) => {
888                    tracing::error!(
889                        "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
890                        e
891                    );
892                    tokio::select! {
893                        _ = context.cancelled() => {
894                            tracing::info!("Container logs consumer shutting down");
895                            return Ok(());
896                        }
897                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
898                    }
899                    continue;
900                }
901            };
902
903            let tail = self
904                .config
905                .tail
906                .clone()
907                .unwrap_or_else(|| "all".to_string());
908
909            let options = bollard::container::LogsOptions::<String> {
910                follow: self.config.follow,
911                stdout: true,
912                stderr: true,
913                timestamps: self.config.timestamps,
914                tail,
915                ..Default::default()
916            };
917
918            let mut log_stream = docker.logs(&container_id, Some(options));
919            let container_id_header = container_id.clone();
920
921            loop {
922                tokio::select! {
923                    _ = context.cancelled() => {
924                        tracing::info!("Container logs consumer shutting down");
925                        return Ok(());
926                    }
927
928                    msg = log_stream.next() => {
929                        match msg {
930                            Some(Ok(log_output)) => {
931                                let (stream_type, content) = match log_output {
932                                    bollard::container::LogOutput::StdOut { message } => {
933                                        ("stdout", String::from_utf8_lossy(&message).into_owned())
934                                    }
935                                    bollard::container::LogOutput::StdErr { message } => {
936                                        ("stderr", String::from_utf8_lossy(&message).into_owned())
937                                    }
938                                    bollard::container::LogOutput::Console { message } => {
939                                        ("console", String::from_utf8_lossy(&message).into_owned())
940                                    }
941                                    bollard::container::LogOutput::StdIn { message } => {
942                                        ("stdin", String::from_utf8_lossy(&message).into_owned())
943                                    }
944                                };
945
946                                let content = content.trim_end();
947                                if content.is_empty() {
948                                    continue;
949                                }
950
951                                let mut message = Message::new(Body::Text(content.to_string()));
952                                message.set_header(
953                                    HEADER_CONTAINER_ID,
954                                    serde_json::Value::String(container_id_header.clone()),
955                                );
956                                message.set_header(
957                                    HEADER_LOG_STREAM,
958                                    serde_json::Value::String(stream_type.to_string()),
959                                );
960
961                                if self.config.timestamps
962                                    && let Some(ts) = extract_timestamp(content) {
963                                        message.set_header(
964                                            HEADER_LOG_TIMESTAMP,
965                                            serde_json::Value::String(ts),
966                                        );
967                                    }
968
969                                let exchange = Exchange::new(message);
970
971                                if let Err(e) = context.send(exchange).await {
972                                    tracing::error!("Failed to send log exchange: {:?}", e);
973                                    break;
974                                }
975                            }
976                            Some(Err(e)) => {
977                                tracing::error!("Docker log stream error: {}. Reconnecting...", e);
978                                break;
979                            }
980                            None => {
981                                if self.config.follow {
982                                    tracing::info!("Docker log stream ended. Reconnecting...");
983                                    break;
984                                } else {
985                                    tracing::info!("Container logs consumer finished (follow=false)");
986                                    return Ok(());
987                                }
988                            }
989                        }
990                    }
991                }
992            }
993
994            tokio::select! {
995                _ = context.cancelled() => {
996                    tracing::info!("Container logs consumer shutting down");
997                    return Ok(());
998                }
999                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1000            }
1001        }
1002    }
1003}
1004
1005fn extract_timestamp(log_line: &str) -> Option<String> {
1006    let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1007    if parts.len() > 1 && parts[0].contains('T') {
1008        Some(parts[0].to_string())
1009    } else {
1010        None
1011    }
1012}
1013
1014/// Component for creating container endpoints.
1015///
1016/// This component handles URIs with the "container" scheme and creates
1017/// appropriate producer and consumer endpoints for Docker operations.
1018///
1019/// Containers created via `run` operation are tracked globally and can be
1020/// cleaned up on shutdown by calling `cleanup_tracked_containers()`.
1021pub struct ContainerComponent;
1022
1023impl ContainerComponent {
1024    /// Creates a new container component instance.
1025    pub fn new() -> Self {
1026        Self
1027    }
1028}
1029
1030impl Default for ContainerComponent {
1031    fn default() -> Self {
1032        Self::new()
1033    }
1034}
1035
1036impl Component for ContainerComponent {
1037    fn scheme(&self) -> &str {
1038        "container"
1039    }
1040
1041    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1042        let config = ContainerConfig::from_uri(uri)?;
1043        Ok(Box::new(ContainerEndpoint {
1044            uri: uri.to_string(),
1045            config,
1046        }))
1047    }
1048}
1049
1050/// Endpoint for container operations.
1051///
1052/// This endpoint creates producers for executing container operations
1053/// and consumers for receiving container events.
1054pub struct ContainerEndpoint {
1055    uri: String,
1056    config: ContainerConfig,
1057}
1058
1059impl Endpoint for ContainerEndpoint {
1060    fn uri(&self) -> &str {
1061        &self.uri
1062    }
1063
1064    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1065        Ok(Box::new(ContainerConsumer {
1066            config: self.config.clone(),
1067        }))
1068    }
1069
1070    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1071        let docker = self.config.connect_docker_client()?;
1072        Ok(BoxProcessor::new(ContainerProducer {
1073            config: self.config.clone(),
1074            docker,
1075        }))
1076    }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::*;
1082
1083    #[test]
1084    fn test_container_config() {
1085        let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1086        assert_eq!(config.operation, "run");
1087        assert_eq!(config.image.as_deref(), Some("alpine"));
1088        assert_eq!(config.host.as_deref(), Some("unix:///var/run/docker.sock"));
1089    }
1090
1091    #[test]
1092    fn test_container_config_parses_name() {
1093        let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1094        assert_eq!(config.name.as_deref(), Some("my-container"));
1095    }
1096
1097    #[test]
1098    fn test_parse_producer_operation_known() {
1099        assert_eq!(
1100            parse_producer_operation("list").unwrap(),
1101            ProducerOperation::List
1102        );
1103        assert_eq!(
1104            parse_producer_operation("run").unwrap(),
1105            ProducerOperation::Run
1106        );
1107        assert_eq!(
1108            parse_producer_operation("start").unwrap(),
1109            ProducerOperation::Start
1110        );
1111        assert_eq!(
1112            parse_producer_operation("stop").unwrap(),
1113            ProducerOperation::Stop
1114        );
1115        assert_eq!(
1116            parse_producer_operation("remove").unwrap(),
1117            ProducerOperation::Remove
1118        );
1119    }
1120
1121    #[test]
1122    fn test_parse_producer_operation_unknown() {
1123        let err = parse_producer_operation("destruir_mundo").unwrap_err();
1124        match err {
1125            CamelError::ProcessorError(msg) => {
1126                assert!(
1127                    msg.contains("Unknown container operation"),
1128                    "Unexpected error message: {}",
1129                    msg
1130                );
1131            }
1132            _ => panic!("Expected ProcessorError for unknown operation"),
1133        }
1134    }
1135
1136    #[test]
1137    fn test_resolve_container_name_header_overrides_config() {
1138        let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1139        let mut exchange = Exchange::new(Message::new(""));
1140        exchange.input.set_header(
1141            HEADER_CONTAINER_NAME,
1142            serde_json::Value::String("header-name".to_string()),
1143        );
1144
1145        let resolved = resolve_container_name(&exchange, &config);
1146        assert_eq!(resolved.as_deref(), Some("header-name"));
1147    }
1148
1149    #[test]
1150    fn test_container_config_rejects_tcp_host() {
1151        let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1152        let err = config.connect_docker_client().unwrap_err();
1153        match err {
1154            CamelError::ProcessorError(msg) => {
1155                assert!(
1156                    msg.to_lowercase().contains("tcp"),
1157                    "Expected TCP scheme error, got: {}",
1158                    msg
1159                );
1160            }
1161            _ => panic!("Expected ProcessorError for unsupported tcp host"),
1162        }
1163    }
1164
1165    #[tokio::test]
1166    async fn test_run_container_with_cleanup_removes_on_start_failure() {
1167        let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1168        let remove_called_clone = remove_called.clone();
1169
1170        let result = run_container_with_cleanup(
1171            || async { Ok("container-123".to_string()) },
1172            |_id| async move {
1173                Err(CamelError::ProcessorError(
1174                    "Failed to start container".to_string(),
1175                ))
1176            },
1177            move |_id| {
1178                let remove_called_inner = remove_called_clone.clone();
1179                async move {
1180                    remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1181                    Ok(())
1182                }
1183            },
1184        )
1185        .await;
1186
1187        assert!(result.is_err(), "Expected start failure to bubble up");
1188        assert!(
1189            remove_called.load(std::sync::atomic::Ordering::SeqCst),
1190            "Expected cleanup to remove container"
1191        );
1192    }
1193
1194    #[test]
1195    fn test_container_component_creates_endpoint() {
1196        let component = ContainerComponent::new();
1197        assert_eq!(component.scheme(), "container");
1198        let endpoint = component
1199            .create_endpoint("container:run?image=alpine")
1200            .unwrap();
1201        assert_eq!(endpoint.uri(), "container:run?image=alpine");
1202    }
1203
1204    #[test]
1205    fn test_container_config_parses_ports() {
1206        let config =
1207            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1208        assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1209    }
1210
1211    #[test]
1212    fn test_container_config_parses_env() {
1213        let config =
1214            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1215        assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1216    }
1217
1218    #[test]
1219    fn test_container_config_parses_logs_options() {
1220        let config = ContainerConfig::from_uri(
1221            "container:logs?containerId=my-app&follow=true&timestamps=true&tail=100",
1222        )
1223        .unwrap();
1224        assert_eq!(config.operation, "logs");
1225        assert_eq!(config.container_id.as_deref(), Some("my-app"));
1226        assert!(config.follow);
1227        assert!(config.timestamps);
1228        assert_eq!(config.tail.as_deref(), Some("100"));
1229    }
1230
1231    #[test]
1232    fn test_container_config_logs_defaults() {
1233        let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1234        assert!(config.follow); // default: true
1235        assert!(!config.timestamps); // default: false
1236        assert!(config.tail.is_none()); // default: None (all)
1237    }
1238
1239    #[test]
1240    fn test_parse_ports_single() {
1241        let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1242        let (exposed, bindings) = config.parse_ports().unwrap();
1243
1244        assert!(exposed.contains_key("80/tcp"));
1245        assert!(bindings.contains_key("80/tcp"));
1246
1247        let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1248        assert_eq!(binding.len(), 1);
1249        assert_eq!(binding[0].host_port, Some("8080".to_string()));
1250    }
1251
1252    #[test]
1253    fn test_parse_ports_multiple() {
1254        let config =
1255            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1256        let (exposed, bindings) = config.parse_ports().unwrap();
1257
1258        assert!(exposed.contains_key("80/tcp"));
1259        assert!(exposed.contains_key("443/tcp"));
1260        assert_eq!(bindings.len(), 2);
1261    }
1262
1263    #[test]
1264    fn test_parse_ports_with_protocol() {
1265        let config =
1266            ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1267                .unwrap();
1268        let (exposed, bindings) = config.parse_ports().unwrap();
1269
1270        assert!(exposed.contains_key("80/tcp"));
1271        assert!(exposed.contains_key("53/udp"));
1272    }
1273
1274    #[test]
1275    fn test_parse_ports_none() {
1276        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1277        assert!(config.parse_ports().is_none());
1278    }
1279
1280    #[test]
1281    fn test_parse_env_single() {
1282        let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1283        let env = config.parse_env().unwrap();
1284
1285        assert_eq!(env.len(), 1);
1286        assert_eq!(env[0], "FOO=bar");
1287    }
1288
1289    #[test]
1290    fn test_parse_env_multiple() {
1291        let config =
1292            ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1293                .unwrap();
1294        let env = config.parse_env().unwrap();
1295
1296        assert_eq!(env.len(), 3);
1297        assert!(env.contains(&"FOO=bar".to_string()));
1298        assert!(env.contains(&"BAZ=qux".to_string()));
1299        assert!(env.contains(&"NUM=123".to_string()));
1300    }
1301
1302    #[test]
1303    fn test_parse_env_none() {
1304        let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1305        assert!(config.parse_env().is_none());
1306    }
1307
1308    mod test_helpers {
1309        use async_trait::async_trait;
1310        use camel_api::CamelError;
1311
1312        /// A no-op route controller for testing purposes.
1313        pub struct NullRouteController;
1314
1315        #[async_trait]
1316        impl camel_api::RouteController for NullRouteController {
1317            async fn start_route(&mut self, _: &str) -> Result<(), CamelError> {
1318                Ok(())
1319            }
1320            async fn stop_route(&mut self, _: &str) -> Result<(), CamelError> {
1321                Ok(())
1322            }
1323            async fn restart_route(&mut self, _: &str) -> Result<(), CamelError> {
1324                Ok(())
1325            }
1326            async fn suspend_route(&mut self, _: &str) -> Result<(), CamelError> {
1327                Ok(())
1328            }
1329            async fn resume_route(&mut self, _: &str) -> Result<(), CamelError> {
1330                Ok(())
1331            }
1332            fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1333                None
1334            }
1335            async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1336                Ok(())
1337            }
1338            async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1339                Ok(())
1340            }
1341        }
1342    }
1343
1344    use camel_api::Message;
1345    use std::sync::Arc;
1346    use test_helpers::NullRouteController;
1347    use tokio::sync::Mutex;
1348
1349    #[tokio::test]
1350    async fn test_container_producer_resolves_operation_from_header() {
1351        // Try to connect to Docker - if fails, return early
1352        let docker = match Docker::connect_with_local_defaults() {
1353            Ok(d) => d,
1354            Err(_) => {
1355                eprintln!("Skipping test: Could not connect to Docker daemon");
1356                return;
1357            }
1358        };
1359
1360        if docker.ping().await.is_err() {
1361            eprintln!("Skipping test: Docker daemon not responding to ping");
1362            return;
1363        }
1364
1365        let component = ContainerComponent::new();
1366        let endpoint = component.create_endpoint("container:run").unwrap();
1367
1368        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1369        let mut producer = endpoint.create_producer(&ctx).unwrap();
1370
1371        let mut exchange = Exchange::new(Message::new(""));
1372        exchange
1373            .input
1374            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1375
1376        use tower::ServiceExt;
1377        let result = producer
1378            .ready()
1379            .await
1380            .unwrap()
1381            .call(exchange)
1382            .await
1383            .unwrap();
1384
1385        // For list operation, the result header should be on the input message
1386        assert_eq!(
1387            result
1388                .input
1389                .header(HEADER_ACTION_RESULT)
1390                .map(|v| v.as_str().unwrap()),
1391            Some("success")
1392        );
1393    }
1394
1395    #[tokio::test]
1396    async fn test_container_producer_connection_error_on_invalid_host() {
1397        // Test that an invalid host (nonexistent socket) results in a connection error
1398        let component = ContainerComponent::new();
1399        let endpoint = component
1400            .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1401            .unwrap();
1402
1403        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1404        let result = endpoint.create_producer(&ctx);
1405
1406        // The producer should return an error because it cannot connect to the invalid socket
1407        assert!(
1408            result.is_err(),
1409            "Expected error when connecting to invalid host"
1410        );
1411        let err = result.unwrap_err();
1412        match &err {
1413            CamelError::ProcessorError(msg) => {
1414                assert!(
1415                    msg.to_lowercase().contains("connection")
1416                        || msg.to_lowercase().contains("connect")
1417                        || msg.to_lowercase().contains("socket")
1418                        || msg.contains("docker"),
1419                    "Error message should indicate connection failure, got: {}",
1420                    msg
1421                );
1422            }
1423            _ => panic!("Expected ProcessorError, got: {:?}", err),
1424        }
1425    }
1426
1427    /// Test that start, stop, remove operations return an error when CamelContainerId header is missing.
1428    #[tokio::test]
1429    async fn test_container_producer_lifecycle_operations_missing_id() {
1430        // Try to connect to Docker - if fails, return early
1431        let docker = match Docker::connect_with_local_defaults() {
1432            Ok(d) => d,
1433            Err(_) => {
1434                eprintln!("Skipping test: Could not connect to Docker daemon");
1435                return;
1436            }
1437        };
1438
1439        if docker.ping().await.is_err() {
1440            eprintln!("Skipping test: Docker daemon not responding to ping");
1441            return;
1442        }
1443
1444        let component = ContainerComponent::new();
1445        let endpoint = component.create_endpoint("container:start").unwrap();
1446        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1447        let mut producer = endpoint.create_producer(&ctx).unwrap();
1448
1449        // Test each lifecycle operation without CamelContainerId header
1450        for operation in ["start", "stop", "remove"] {
1451            let mut exchange = Exchange::new(Message::new(""));
1452            exchange.input.set_header(
1453                HEADER_ACTION,
1454                serde_json::Value::String(operation.to_string()),
1455            );
1456            // Deliberately NOT setting CamelContainerId header
1457
1458            use tower::ServiceExt;
1459            let result = producer.ready().await.unwrap().call(exchange).await;
1460
1461            assert!(
1462                result.is_err(),
1463                "Expected error for {} operation without CamelContainerId",
1464                operation
1465            );
1466            let err = result.unwrap_err();
1467            match &err {
1468                CamelError::ProcessorError(msg) => {
1469                    assert!(
1470                        msg.contains(HEADER_CONTAINER_ID),
1471                        "Error message should mention {}, got: {}",
1472                        HEADER_CONTAINER_ID,
1473                        msg
1474                    );
1475                }
1476                _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1477            }
1478        }
1479    }
1480
1481    /// Test that stop operation returns an error for a nonexistent container.
1482    #[tokio::test]
1483    async fn test_container_producer_stop_nonexistent() {
1484        // Try to connect to Docker - if fails, return early
1485        let docker = match Docker::connect_with_local_defaults() {
1486            Ok(d) => d,
1487            Err(_) => {
1488                eprintln!("Skipping test: Could not connect to Docker daemon");
1489                return;
1490            }
1491        };
1492
1493        if docker.ping().await.is_err() {
1494            eprintln!("Skipping test: Docker daemon not responding to ping");
1495            return;
1496        }
1497
1498        let component = ContainerComponent::new();
1499        let endpoint = component.create_endpoint("container:stop").unwrap();
1500        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1501        let mut producer = endpoint.create_producer(&ctx).unwrap();
1502
1503        let mut exchange = Exchange::new(Message::new(""));
1504        exchange
1505            .input
1506            .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1507        exchange.input.set_header(
1508            HEADER_CONTAINER_ID,
1509            serde_json::Value::String("nonexistent-container-123".into()),
1510        );
1511
1512        use tower::ServiceExt;
1513        let result = producer.ready().await.unwrap().call(exchange).await;
1514
1515        assert!(
1516            result.is_err(),
1517            "Expected error when stopping nonexistent container"
1518        );
1519        let err = result.unwrap_err();
1520        match &err {
1521            CamelError::ProcessorError(msg) => {
1522                // Docker API returns 404 with "No such container" message
1523                assert!(
1524                    msg.to_lowercase().contains("no such container")
1525                        || msg.to_lowercase().contains("not found")
1526                        || msg.contains("404"),
1527                    "Error message should indicate container not found, got: {}",
1528                    msg
1529                );
1530            }
1531            _ => panic!("Expected ProcessorError, got: {:?}", err),
1532        }
1533    }
1534
1535    /// Test that run operation returns an error when no image is provided.
1536    #[tokio::test]
1537    async fn test_container_producer_run_missing_image() {
1538        // Try to connect to Docker - if fails, return early
1539        let docker = match Docker::connect_with_local_defaults() {
1540            Ok(d) => d,
1541            Err(_) => {
1542                eprintln!("Skipping test: Could not connect to Docker daemon");
1543                return;
1544            }
1545        };
1546
1547        if docker.ping().await.is_err() {
1548            eprintln!("Skipping test: Docker daemon not responding to ping");
1549            return;
1550        }
1551
1552        // Create producer without an image in the URI
1553        let component = ContainerComponent::new();
1554        let endpoint = component.create_endpoint("container:run").unwrap();
1555        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1556        let mut producer = endpoint.create_producer(&ctx).unwrap();
1557
1558        let mut exchange = Exchange::new(Message::new(""));
1559        exchange
1560            .input
1561            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1562        // Deliberately NOT setting CamelContainerImage header
1563
1564        use tower::ServiceExt;
1565        let result = producer.ready().await.unwrap().call(exchange).await;
1566
1567        assert!(
1568            result.is_err(),
1569            "Expected error for run operation without image"
1570        );
1571        let err = result.unwrap_err();
1572        match &err {
1573            CamelError::ProcessorError(msg) => {
1574                assert!(
1575                    msg.to_lowercase().contains("image"),
1576                    "Error message should mention 'image', got: {}",
1577                    msg
1578                );
1579            }
1580            _ => panic!("Expected ProcessorError, got: {:?}", err),
1581        }
1582    }
1583
1584    /// Test that run operation uses image from header.
1585    #[tokio::test]
1586    async fn test_container_producer_run_image_from_header() {
1587        // Try to connect to Docker - if fails, return early
1588        let docker = match Docker::connect_with_local_defaults() {
1589            Ok(d) => d,
1590            Err(_) => {
1591                eprintln!("Skipping test: Could not connect to Docker daemon");
1592                return;
1593            }
1594        };
1595
1596        if docker.ping().await.is_err() {
1597            eprintln!("Skipping test: Docker daemon not responding to ping");
1598            return;
1599        }
1600
1601        // Create producer without an image in the URI
1602        let component = ContainerComponent::new();
1603        let endpoint = component.create_endpoint("container:run").unwrap();
1604        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1605        let mut producer = endpoint.create_producer(&ctx).unwrap();
1606
1607        let mut exchange = Exchange::new(Message::new(""));
1608        exchange
1609            .input
1610            .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1611        // Set a non-existent image to test that the run operation attempts to use it
1612        exchange.input.set_header(
1613            HEADER_IMAGE,
1614            serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1615        );
1616
1617        use tower::ServiceExt;
1618        let result = producer.ready().await.unwrap().call(exchange).await;
1619
1620        // The operation should fail because the image doesn't exist
1621        assert!(
1622            result.is_err(),
1623            "Expected error when running container with nonexistent image"
1624        );
1625        let err = result.unwrap_err();
1626        match &err {
1627            CamelError::ProcessorError(msg) => {
1628                // Docker API returns an error about the missing image
1629                assert!(
1630                    msg.to_lowercase().contains("no such image")
1631                        || msg.to_lowercase().contains("not found")
1632                        || msg.to_lowercase().contains("image")
1633                        || msg.to_lowercase().contains("pull")
1634                        || msg.contains("404"),
1635                    "Error message should indicate image issue, got: {}",
1636                    msg
1637                );
1638            }
1639            _ => panic!("Expected ProcessorError, got: {:?}", err),
1640        }
1641    }
1642
1643    /// Integration test: Actually run a container with alpine:latest.
1644    /// This test verifies the full flow: create → start → set headers.
1645    #[tokio::test]
1646    async fn test_container_producer_run_alpine_container() {
1647        let docker = match Docker::connect_with_local_defaults() {
1648            Ok(d) => d,
1649            Err(_) => {
1650                eprintln!("Skipping test: Could not connect to Docker daemon");
1651                return;
1652            }
1653        };
1654
1655        if docker.ping().await.is_err() {
1656            eprintln!("Skipping test: Docker daemon not responding to ping");
1657            return;
1658        }
1659
1660        // Pull alpine:latest if not present
1661        let images = docker.list_images::<&str>(None).await.unwrap();
1662        let has_alpine = images
1663            .iter()
1664            .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1665
1666        if !has_alpine {
1667            eprintln!("Pulling alpine:latest image...");
1668            let mut stream = docker.create_image(
1669                Some(bollard::image::CreateImageOptions {
1670                    from_image: "alpine:latest",
1671                    ..Default::default()
1672                }),
1673                None,
1674                None,
1675            );
1676
1677            use futures::StreamExt;
1678            while let Some(_item) = stream.next().await {
1679                // Wait for pull to complete
1680            }
1681            eprintln!("Image pulled successfully");
1682        }
1683
1684        // Create producer
1685        let component = ContainerComponent::new();
1686        let endpoint = component.create_endpoint("container:run").unwrap();
1687        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1688        let mut producer = endpoint.create_producer(&ctx).unwrap();
1689
1690        // Run container with unique name
1691        let timestamp = std::time::SystemTime::now()
1692            .duration_since(std::time::UNIX_EPOCH)
1693            .unwrap()
1694            .as_millis();
1695        let container_name = format!("test-rust-camel-{}", timestamp);
1696        let mut exchange = Exchange::new(Message::new(""));
1697        exchange.input.set_header(
1698            HEADER_IMAGE,
1699            serde_json::Value::String("alpine:latest".into()),
1700        );
1701        exchange.input.set_header(
1702            HEADER_CONTAINER_NAME,
1703            serde_json::Value::String(container_name.clone()),
1704        );
1705
1706        use tower::ServiceExt;
1707        let result = producer
1708            .ready()
1709            .await
1710            .unwrap()
1711            .call(exchange)
1712            .await
1713            .expect("Container run should succeed");
1714
1715        // Verify container ID was set
1716        let container_id = result
1717            .input
1718            .header(HEADER_CONTAINER_ID)
1719            .and_then(|v| v.as_str().map(|s| s.to_string()))
1720            .expect("Expected container ID header");
1721        assert!(!container_id.is_empty(), "Container ID should not be empty");
1722
1723        // Verify success header
1724        assert_eq!(
1725            result
1726                .input
1727                .header(HEADER_ACTION_RESULT)
1728                .and_then(|v| v.as_str()),
1729            Some("success")
1730        );
1731
1732        // Verify container exists in Docker
1733        let inspect = docker
1734            .inspect_container(&container_id, None)
1735            .await
1736            .expect("Container should exist");
1737        assert_eq!(
1738            inspect.id.as_ref().map(|s| s.as_str()),
1739            Some(container_id.as_str())
1740        );
1741
1742        // Cleanup: remove container
1743        docker
1744            .remove_container(
1745                &container_id,
1746                Some(bollard::container::RemoveContainerOptions {
1747                    force: true,
1748                    ..Default::default()
1749                }),
1750            )
1751            .await
1752            .ok();
1753
1754        eprintln!("✅ Container {} created and cleaned up", container_id);
1755    }
1756
1757    /// Test that consumer returns an error for unsupported operations.
1758    #[tokio::test]
1759    async fn test_container_consumer_unsupported_operation() {
1760        use tokio::sync::mpsc;
1761
1762        let component = ContainerComponent::new();
1763        let endpoint = component.create_endpoint("container:run").unwrap();
1764        let mut consumer = endpoint.create_consumer().unwrap();
1765
1766        // Create a minimal ConsumerContext
1767        let (tx, _rx) = mpsc::channel(16);
1768        let cancel_token = tokio_util::sync::CancellationToken::new();
1769        let context = ConsumerContext::new(tx, cancel_token);
1770
1771        let result = consumer.start(context).await;
1772
1773        // Should return error because "run" is not a supported consumer operation
1774        assert!(
1775            result.is_err(),
1776            "Expected error for unsupported consumer operation"
1777        );
1778        let err = result.unwrap_err();
1779        match &err {
1780            CamelError::EndpointCreationFailed(msg) => {
1781                assert!(
1782                    msg.contains("Consumer only supports 'events' or 'logs'"),
1783                    "Error message should mention events or logs support, got: {}",
1784                    msg
1785                );
1786            }
1787            _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1788        }
1789    }
1790
1791    #[test]
1792    fn test_container_consumer_concurrency_model_is_concurrent() {
1793        let consumer = ContainerConsumer {
1794            config: ContainerConfig::from_uri("container:events").unwrap(),
1795        };
1796
1797        assert_eq!(
1798            consumer.concurrency_model(),
1799            camel_component::ConcurrencyModel::Concurrent { max: None }
1800        );
1801    }
1802
1803    /// Test that consumer gracefully shuts down when cancellation is requested.
1804    /// This test requires a running Docker daemon. If Docker is not available, the test
1805    /// will be ignored.
1806    #[tokio::test]
1807    async fn test_container_consumer_cancellation() {
1808        use std::sync::atomic::{AtomicBool, Ordering};
1809        use tokio::sync::mpsc;
1810
1811        // Try to connect to Docker - if fails, return early
1812        let docker = match Docker::connect_with_local_defaults() {
1813            Ok(d) => d,
1814            Err(_) => {
1815                eprintln!("Skipping test: Could not connect to Docker daemon");
1816                return;
1817            }
1818        };
1819
1820        if docker.ping().await.is_err() {
1821            eprintln!("Skipping test: Docker daemon not responding to ping");
1822            return;
1823        }
1824
1825        let component = ContainerComponent::new();
1826        let endpoint = component.create_endpoint("container:events").unwrap();
1827        let mut consumer = endpoint.create_consumer().unwrap();
1828
1829        // Create a ConsumerContext
1830        let (tx, _rx) = mpsc::channel(16);
1831        let cancel_token = tokio_util::sync::CancellationToken::new();
1832        let context = ConsumerContext::new(tx, cancel_token.clone());
1833
1834        // Track if the consumer task has completed
1835        let completed = Arc::new(AtomicBool::new(false));
1836        let completed_clone = completed.clone();
1837
1838        // Spawn consumer in background
1839        let handle = tokio::spawn(async move {
1840            let result = consumer.start(context).await;
1841            // Mark as completed when done
1842            completed_clone.store(true, Ordering::SeqCst);
1843            result
1844        });
1845
1846        // Wait a bit for the consumer to start
1847        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1848
1849        // Consumer should still be running (not completed)
1850        assert!(
1851            !completed.load(Ordering::SeqCst),
1852            "Consumer should still be running before cancellation"
1853        );
1854
1855        // Request cancellation
1856        cancel_token.cancel();
1857
1858        // Wait for the task to complete (with timeout)
1859        let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1860
1861        // Task should have completed (not timed out)
1862        assert!(
1863            result.is_ok(),
1864            "Consumer should gracefully shut down after cancellation"
1865        );
1866
1867        // Verify the consumer completed
1868        assert!(
1869            completed.load(Ordering::SeqCst),
1870            "Consumer should have completed after cancellation"
1871        );
1872    }
1873
1874    /// Integration test for listing containers.
1875    /// This test requires a running Docker daemon. If Docker is not available, the test
1876    /// will return early and be effectively ignored.
1877    #[tokio::test]
1878    async fn test_container_producer_list_containers() {
1879        // Try to connect to Docker using default config
1880        // If ping fails, return early (effectively ignoring this test)
1881        let docker = match Docker::connect_with_local_defaults() {
1882            Ok(d) => d,
1883            Err(_) => {
1884                eprintln!("Skipping test: Could not connect to Docker daemon");
1885                return;
1886            }
1887        };
1888
1889        if docker.ping().await.is_err() {
1890            eprintln!("Skipping test: Docker daemon not responding to ping");
1891            return;
1892        }
1893
1894        // Create producer with list operation
1895        let component = ContainerComponent::new();
1896        let endpoint = component.create_endpoint("container:list").unwrap();
1897
1898        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1899        let mut producer = endpoint.create_producer(&ctx).unwrap();
1900
1901        // Create exchange with list operation in header
1902        let mut exchange = Exchange::new(Message::new(""));
1903        exchange
1904            .input
1905            .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1906
1907        // Call the producer
1908        use tower::ServiceExt;
1909        let result = producer
1910            .ready()
1911            .await
1912            .unwrap()
1913            .call(exchange)
1914            .await
1915            .expect("Producer should succeed when Docker is available");
1916
1917        // Assert that the input exchange body is a JSON array
1918        // (because list_containers should put the JSON result in the body)
1919        match &result.input.body {
1920            camel_api::Body::Json(json_value) => {
1921                assert!(
1922                    json_value.is_array(),
1923                    "Expected input body to be a JSON array, got: {:?}",
1924                    json_value
1925                );
1926            }
1927            other => panic!("Expected Body::Json with array, got: {:?}", other),
1928        }
1929    }
1930}