wrkflw_executor/
docker.rs

1use async_trait::async_trait;
2use bollard::{
3    container::{Config, CreateContainerOptions},
4    models::HostConfig,
5    network::CreateNetworkOptions,
6    Docker,
7};
8use futures_util::StreamExt;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::Mutex;
13use wrkflw_logging;
14use wrkflw_runtime::container::{ContainerError, ContainerOutput, ContainerRuntime};
15use wrkflw_utils;
16use wrkflw_utils::fd;
17
18static RUNNING_CONTAINERS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
19static CREATED_NETWORKS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
20// Map to track customized images for a job
21#[allow(dead_code)]
22static CUSTOMIZED_IMAGES: Lazy<Mutex<HashMap<String, String>>> =
23    Lazy::new(|| Mutex::new(HashMap::new()));
24
25pub struct DockerRuntime {
26    docker: Docker,
27    preserve_containers_on_failure: bool,
28}
29
30impl DockerRuntime {
31    pub fn new() -> Result<Self, ContainerError> {
32        Self::new_with_config(false)
33    }
34
35    pub fn new_with_config(preserve_containers_on_failure: bool) -> Result<Self, ContainerError> {
36        let docker = Docker::connect_with_local_defaults().map_err(|e| {
37            ContainerError::ContainerStart(format!("Failed to connect to Docker: {}", e))
38        })?;
39
40        Ok(DockerRuntime {
41            docker,
42            preserve_containers_on_failure,
43        })
44    }
45
46    // Add a method to store and retrieve customized images (e.g., with Python installed)
47    #[allow(dead_code)]
48    pub fn get_customized_image(base_image: &str, customization: &str) -> Option<String> {
49        let key = format!("{}:{}", base_image, customization);
50        match CUSTOMIZED_IMAGES.lock() {
51            Ok(images) => images.get(&key).cloned(),
52            Err(e) => {
53                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
54                None
55            }
56        }
57    }
58
59    #[allow(dead_code)]
60    pub fn set_customized_image(base_image: &str, customization: &str, new_image: &str) {
61        let key = format!("{}:{}", base_image, customization);
62        if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
63            images.insert(key, new_image.to_string());
64        }) {
65            wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
66        }
67    }
68
69    /// Find a customized image key by prefix
70    #[allow(dead_code)]
71    pub fn find_customized_image_key(image: &str, prefix: &str) -> Option<String> {
72        let image_keys = match CUSTOMIZED_IMAGES.lock() {
73            Ok(keys) => keys,
74            Err(e) => {
75                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
76                return None;
77            }
78        };
79
80        // Look for any key that starts with the prefix
81        for (key, _) in image_keys.iter() {
82            if key.starts_with(prefix) {
83                return Some(key.clone());
84            }
85        }
86
87        None
88    }
89
90    /// Get a customized image with language-specific dependencies
91    pub fn get_language_specific_image(
92        base_image: &str,
93        language: &str,
94        version: Option<&str>,
95    ) -> Option<String> {
96        let key = match (language, version) {
97            ("python", Some(ver)) => format!("python:{}", ver),
98            ("node", Some(ver)) => format!("node:{}", ver),
99            ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
100            ("go", Some(ver)) => format!("golang:{}", ver),
101            ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
102            ("rust", Some(ver)) => format!("rust:{}", ver),
103            (lang, Some(ver)) => format!("{}:{}", lang, ver),
104            (lang, None) => lang.to_string(),
105        };
106
107        match CUSTOMIZED_IMAGES.lock() {
108            Ok(images) => images.get(&key).cloned(),
109            Err(e) => {
110                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
111                None
112            }
113        }
114    }
115
116    /// Set a customized image with language-specific dependencies
117    pub fn set_language_specific_image(
118        base_image: &str,
119        language: &str,
120        version: Option<&str>,
121        new_image: &str,
122    ) {
123        let key = match (language, version) {
124            ("python", Some(ver)) => format!("python:{}", ver),
125            ("node", Some(ver)) => format!("node:{}", ver),
126            ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
127            ("go", Some(ver)) => format!("golang:{}", ver),
128            ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
129            ("rust", Some(ver)) => format!("rust:{}", ver),
130            (lang, Some(ver)) => format!("{}:{}", lang, ver),
131            (lang, None) => lang.to_string(),
132        };
133
134        if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
135            images.insert(key, new_image.to_string());
136        }) {
137            wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
138        }
139    }
140
141    /// Prepare a language-specific environment
142    #[allow(dead_code)]
143    pub async fn prepare_language_environment(
144        &self,
145        language: &str,
146        version: Option<&str>,
147        additional_packages: Option<Vec<String>>,
148    ) -> Result<String, ContainerError> {
149        // Check if we already have a customized image for this language and version
150        let key = format!("{}-{}", language, version.unwrap_or("latest"));
151        if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
152            return Ok(customized_image);
153        }
154
155        // Create a temporary Dockerfile for customization
156        let temp_dir = tempfile::tempdir().map_err(|e| {
157            ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
158        })?;
159
160        let dockerfile_path = temp_dir.path().join("Dockerfile");
161        let mut dockerfile_content = String::new();
162
163        // Add language-specific setup based on the language
164        match language {
165            "python" => {
166                let base_image =
167                    version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
168                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
169                dockerfile_content.push_str(
170                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
171                );
172                dockerfile_content.push_str("    build-essential \\\n");
173                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
174
175                if let Some(packages) = additional_packages {
176                    for package in packages {
177                        dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
178                    }
179                }
180            }
181            "node" => {
182                let base_image =
183                    version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
184                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
185                dockerfile_content.push_str(
186                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
187                );
188                dockerfile_content.push_str("    build-essential \\\n");
189                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
190
191                if let Some(packages) = additional_packages {
192                    for package in packages {
193                        dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
194                    }
195                }
196            }
197            "java" => {
198                let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
199                    format!("eclipse-temurin:{}", v)
200                });
201                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
202                dockerfile_content.push_str(
203                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
204                );
205                dockerfile_content.push_str("    maven \\\n");
206                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
207            }
208            "go" => {
209                let base_image =
210                    version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
211                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
212                dockerfile_content.push_str(
213                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
214                );
215                dockerfile_content.push_str("    git \\\n");
216                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
217
218                if let Some(packages) = additional_packages {
219                    for package in packages {
220                        dockerfile_content.push_str(&format!("RUN go install {}\n", package));
221                    }
222                }
223            }
224            "dotnet" => {
225                let base_image = version
226                    .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
227                        format!("mcr.microsoft.com/dotnet/sdk:{}", v)
228                    });
229                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
230
231                if let Some(packages) = additional_packages {
232                    for package in packages {
233                        dockerfile_content
234                            .push_str(&format!("RUN dotnet tool install -g {}\n", package));
235                    }
236                }
237            }
238            "rust" => {
239                let base_image =
240                    version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
241                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
242                dockerfile_content.push_str(
243                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
244                );
245                dockerfile_content.push_str("    build-essential \\\n");
246                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
247
248                if let Some(packages) = additional_packages {
249                    for package in packages {
250                        dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
251                    }
252                }
253            }
254            _ => {
255                return Err(ContainerError::ContainerStart(format!(
256                    "Unsupported language: {}",
257                    language
258                )));
259            }
260        }
261
262        // Write the Dockerfile
263        std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
264            ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
265        })?;
266
267        // Build the customized image
268        let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
269        self.build_image(&dockerfile_path, &image_tag).await?;
270
271        // Store the customized image
272        Self::set_language_specific_image("", language, version, &image_tag);
273
274        Ok(image_tag)
275    }
276}
277
278pub fn is_available() -> bool {
279    // Use a very short timeout for the entire availability check
280    let overall_timeout = std::time::Duration::from_secs(3);
281
282    // Spawn a thread with the timeout to prevent blocking the main thread
283    let handle = std::thread::spawn(move || {
284        // Use safe FD redirection utility to suppress Docker error messages
285        match fd::with_stderr_to_null(|| {
286            // First, check if docker CLI is available as a quick test
287            if cfg!(target_os = "linux") || cfg!(target_os = "macos") {
288                // Try a simple docker version command with a short timeout
289                let process = std::process::Command::new("docker")
290                    .arg("version")
291                    .arg("--format")
292                    .arg("{{.Server.Version}}")
293                    .stdout(std::process::Stdio::null())
294                    .stderr(std::process::Stdio::null())
295                    .spawn();
296
297                match process {
298                    Ok(mut child) => {
299                        // Set a very short timeout for the process
300                        let status = std::thread::scope(|_| {
301                            // Try to wait for a short time
302                            for _ in 0..10 {
303                                match child.try_wait() {
304                                    Ok(Some(status)) => return status.success(),
305                                    Ok(None) => {
306                                        std::thread::sleep(std::time::Duration::from_millis(100))
307                                    }
308                                    Err(_) => return false,
309                                }
310                            }
311                            // Kill it if it takes too long
312                            let _ = child.kill();
313                            false
314                        });
315
316                        if !status {
317                            return false;
318                        }
319                    }
320                    Err(_) => {
321                        wrkflw_logging::debug("Docker CLI is not available");
322                        return false;
323                    }
324                }
325            }
326
327            // Try to connect to Docker daemon with a short timeout
328            let runtime = match tokio::runtime::Builder::new_current_thread()
329                .enable_all()
330                .build()
331            {
332                Ok(rt) => rt,
333                Err(e) => {
334                    wrkflw_logging::error(&format!(
335                        "Failed to create runtime for Docker availability check: {}",
336                        e
337                    ));
338                    return false;
339                }
340            };
341
342            runtime.block_on(async {
343                match tokio::time::timeout(std::time::Duration::from_secs(2), async {
344                    match Docker::connect_with_local_defaults() {
345                        Ok(docker) => {
346                            // Try to ping the Docker daemon with a short timeout
347                            match tokio::time::timeout(
348                                std::time::Duration::from_secs(1),
349                                docker.ping(),
350                            )
351                            .await
352                            {
353                                Ok(Ok(_)) => true,
354                                Ok(Err(e)) => {
355                                    wrkflw_logging::debug(&format!(
356                                        "Docker daemon ping failed: {}",
357                                        e
358                                    ));
359                                    false
360                                }
361                                Err(_) => {
362                                    wrkflw_logging::debug(
363                                        "Docker daemon ping timed out after 1 second",
364                                    );
365                                    false
366                                }
367                            }
368                        }
369                        Err(e) => {
370                            wrkflw_logging::debug(&format!(
371                                "Docker daemon connection failed: {}",
372                                e
373                            ));
374                            false
375                        }
376                    }
377                })
378                .await
379                {
380                    Ok(result) => result,
381                    Err(_) => {
382                        wrkflw_logging::debug("Docker availability check timed out");
383                        false
384                    }
385                }
386            })
387        }) {
388            Ok(result) => result,
389            Err(_) => {
390                wrkflw_logging::debug(
391                    "Failed to redirect stderr when checking Docker availability",
392                );
393                false
394            }
395        }
396    });
397
398    // Manual implementation of join with timeout
399    let start = std::time::Instant::now();
400
401    while start.elapsed() < overall_timeout {
402        if handle.is_finished() {
403            return match handle.join() {
404                Ok(result) => result,
405                Err(_) => {
406                    wrkflw_logging::warning("Docker availability check thread panicked");
407                    false
408                }
409            };
410        }
411        std::thread::sleep(std::time::Duration::from_millis(50));
412    }
413
414    wrkflw_logging::warning(
415        "Docker availability check timed out, assuming Docker is not available",
416    );
417    false
418}
419
420// Add container to tracking
421pub fn track_container(id: &str) {
422    if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
423        containers.push(id.to_string());
424    }
425}
426
427// Remove container from tracking
428pub fn untrack_container(id: &str) {
429    if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
430        containers.retain(|c| c != id);
431    }
432}
433
434// Add network to tracking
435pub fn track_network(id: &str) {
436    if let Ok(mut networks) = CREATED_NETWORKS.lock() {
437        networks.push(id.to_string());
438    }
439}
440
441// Remove network from tracking
442pub fn untrack_network(id: &str) {
443    if let Ok(mut networks) = CREATED_NETWORKS.lock() {
444        networks.retain(|n| n != id);
445    }
446}
447
448// Clean up all tracked resources
449pub async fn cleanup_resources(docker: &Docker) {
450    // Use a global timeout for the entire cleanup process
451    let cleanup_timeout = std::time::Duration::from_secs(5);
452
453    match tokio::time::timeout(cleanup_timeout, async {
454        // Perform both cleanups in parallel for efficiency
455        let (container_result, network_result) =
456            tokio::join!(cleanup_containers(docker), cleanup_networks(docker));
457
458        if let Err(e) = container_result {
459            wrkflw_logging::error(&format!("Error during container cleanup: {}", e));
460        }
461
462        if let Err(e) = network_result {
463            wrkflw_logging::error(&format!("Error during network cleanup: {}", e));
464        }
465    })
466    .await
467    {
468        Ok(_) => wrkflw_logging::debug("Docker cleanup completed within timeout"),
469        Err(_) => wrkflw_logging::warning(
470            "Docker cleanup timed out, some resources may not have been removed",
471        ),
472    }
473}
474
475// Clean up all tracked containers
476pub async fn cleanup_containers(docker: &Docker) -> Result<(), String> {
477    // Getting the containers to clean up should not take a long time
478    let containers_to_cleanup =
479        match tokio::time::timeout(std::time::Duration::from_millis(500), async {
480            match RUNNING_CONTAINERS.try_lock() {
481                Ok(containers) => containers.clone(),
482                Err(_) => {
483                    wrkflw_logging::error("Could not acquire container lock for cleanup");
484                    vec![]
485                }
486            }
487        })
488        .await
489        {
490            Ok(containers) => containers,
491            Err(_) => {
492                wrkflw_logging::error("Timeout while trying to get containers for cleanup");
493                vec![]
494            }
495        };
496
497    if containers_to_cleanup.is_empty() {
498        return Ok(());
499    }
500
501    wrkflw_logging::info(&format!(
502        "Cleaning up {} containers",
503        containers_to_cleanup.len()
504    ));
505
506    // Process each container with a timeout
507    for container_id in containers_to_cleanup {
508        // First try to stop the container
509        match tokio::time::timeout(
510            std::time::Duration::from_millis(1000),
511            docker.stop_container(&container_id, None),
512        )
513        .await
514        {
515            Ok(Ok(_)) => wrkflw_logging::debug(&format!("Stopped container: {}", container_id)),
516            Ok(Err(e)) => wrkflw_logging::warning(&format!(
517                "Error stopping container {}: {}",
518                container_id, e
519            )),
520            Err(_) => {
521                wrkflw_logging::warning(&format!("Timeout stopping container: {}", container_id))
522            }
523        }
524
525        // Then try to remove it
526        match tokio::time::timeout(
527            std::time::Duration::from_millis(1000),
528            docker.remove_container(&container_id, None),
529        )
530        .await
531        {
532            Ok(Ok(_)) => wrkflw_logging::debug(&format!("Removed container: {}", container_id)),
533            Ok(Err(e)) => wrkflw_logging::warning(&format!(
534                "Error removing container {}: {}",
535                container_id, e
536            )),
537            Err(_) => {
538                wrkflw_logging::warning(&format!("Timeout removing container: {}", container_id))
539            }
540        }
541
542        // Always untrack the container whether or not we succeeded to avoid future cleanup attempts
543        untrack_container(&container_id);
544    }
545
546    Ok(())
547}
548
549// Clean up all tracked networks
550pub async fn cleanup_networks(docker: &Docker) -> Result<(), String> {
551    // Getting the networks to clean up should not take a long time
552    let networks_to_cleanup =
553        match tokio::time::timeout(std::time::Duration::from_millis(500), async {
554            match CREATED_NETWORKS.try_lock() {
555                Ok(networks) => networks.clone(),
556                Err(_) => {
557                    wrkflw_logging::error("Could not acquire network lock for cleanup");
558                    vec![]
559                }
560            }
561        })
562        .await
563        {
564            Ok(networks) => networks,
565            Err(_) => {
566                wrkflw_logging::error("Timeout while trying to get networks for cleanup");
567                vec![]
568            }
569        };
570
571    if networks_to_cleanup.is_empty() {
572        return Ok(());
573    }
574
575    wrkflw_logging::info(&format!(
576        "Cleaning up {} networks",
577        networks_to_cleanup.len()
578    ));
579
580    for network_id in networks_to_cleanup {
581        match tokio::time::timeout(
582            std::time::Duration::from_millis(1000),
583            docker.remove_network(&network_id),
584        )
585        .await
586        {
587            Ok(Ok(_)) => {
588                wrkflw_logging::info(&format!("Successfully removed network: {}", network_id))
589            }
590            Ok(Err(e)) => {
591                wrkflw_logging::error(&format!("Error removing network {}: {}", network_id, e))
592            }
593            Err(_) => wrkflw_logging::warning(&format!("Timeout removing network: {}", network_id)),
594        }
595
596        // Always untrack the network whether or not we succeeded
597        untrack_network(&network_id);
598    }
599
600    Ok(())
601}
602
603// Create a new Docker network for a job
604pub async fn create_job_network(docker: &Docker) -> Result<String, ContainerError> {
605    let network_name = format!("wrkflw-network-{}", uuid::Uuid::new_v4());
606
607    let options = CreateNetworkOptions {
608        name: network_name.clone(),
609        driver: "bridge".to_string(),
610        ..Default::default()
611    };
612
613    let network = docker
614        .create_network(options)
615        .await
616        .map_err(|e| ContainerError::NetworkCreation(e.to_string()))?;
617
618    // network.id is Option<String>, unwrap it safely
619    let network_id = network.id.ok_or_else(|| {
620        ContainerError::NetworkOperation("Network created but no ID returned".to_string())
621    })?;
622
623    track_network(&network_id);
624    wrkflw_logging::info(&format!("Created Docker network: {}", network_id));
625
626    Ok(network_id)
627}
628
629#[async_trait]
630impl ContainerRuntime for DockerRuntime {
631    async fn run_container(
632        &self,
633        image: &str,
634        cmd: &[&str],
635        env_vars: &[(&str, &str)],
636        working_dir: &Path,
637        volumes: &[(&Path, &Path)],
638    ) -> Result<ContainerOutput, ContainerError> {
639        // Print detailed debugging info
640        wrkflw_logging::info(&format!("Docker: Running container with image: {}", image));
641
642        // Add a global timeout for all Docker operations to prevent freezing
643        let timeout_duration = std::time::Duration::from_secs(360); // Increased outer timeout to 6 minutes
644
645        // Run the entire container operation with a timeout
646        match tokio::time::timeout(
647            timeout_duration,
648            self.run_container_inner(image, cmd, env_vars, working_dir, volumes),
649        )
650        .await
651        {
652            Ok(result) => result,
653            Err(_) => {
654                wrkflw_logging::error("Docker operation timed out after 360 seconds");
655                Err(ContainerError::ContainerExecution(
656                    "Operation timed out".to_string(),
657                ))
658            }
659        }
660    }
661
662    async fn pull_image(&self, image: &str) -> Result<(), ContainerError> {
663        // Add a timeout for pull operations
664        let timeout_duration = std::time::Duration::from_secs(30);
665
666        match tokio::time::timeout(timeout_duration, self.pull_image_inner(image)).await {
667            Ok(result) => result,
668            Err(_) => {
669                wrkflw_logging::warning(&format!(
670                    "Pull of image {} timed out, continuing with existing image",
671                    image
672                ));
673                // Return success to allow continuing with existing image
674                Ok(())
675            }
676        }
677    }
678
679    async fn build_image(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
680        // Add a timeout for build operations
681        let timeout_duration = std::time::Duration::from_secs(120); // 2 minutes timeout for builds
682
683        match tokio::time::timeout(timeout_duration, self.build_image_inner(dockerfile, tag)).await
684        {
685            Ok(result) => result,
686            Err(_) => {
687                wrkflw_logging::error(&format!(
688                    "Building image {} timed out after 120 seconds",
689                    tag
690                ));
691                Err(ContainerError::ImageBuild(
692                    "Operation timed out".to_string(),
693                ))
694            }
695        }
696    }
697
698    async fn prepare_language_environment(
699        &self,
700        language: &str,
701        version: Option<&str>,
702        additional_packages: Option<Vec<String>>,
703    ) -> Result<String, ContainerError> {
704        // Check if we already have a customized image for this language and version
705        let key = format!("{}-{}", language, version.unwrap_or("latest"));
706        if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
707            return Ok(customized_image);
708        }
709
710        // Create a temporary Dockerfile for customization
711        let temp_dir = tempfile::tempdir().map_err(|e| {
712            ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
713        })?;
714
715        let dockerfile_path = temp_dir.path().join("Dockerfile");
716        let mut dockerfile_content = String::new();
717
718        // Add language-specific setup based on the language
719        match language {
720            "python" => {
721                let base_image =
722                    version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
723                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
724                dockerfile_content.push_str(
725                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
726                );
727                dockerfile_content.push_str("    build-essential \\\n");
728                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
729
730                if let Some(packages) = additional_packages {
731                    for package in packages {
732                        dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
733                    }
734                }
735            }
736            "node" => {
737                let base_image =
738                    version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
739                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
740                dockerfile_content.push_str(
741                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
742                );
743                dockerfile_content.push_str("    build-essential \\\n");
744                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
745
746                if let Some(packages) = additional_packages {
747                    for package in packages {
748                        dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
749                    }
750                }
751            }
752            "java" => {
753                let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
754                    format!("eclipse-temurin:{}", v)
755                });
756                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
757                dockerfile_content.push_str(
758                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
759                );
760                dockerfile_content.push_str("    maven \\\n");
761                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
762            }
763            "go" => {
764                let base_image =
765                    version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
766                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
767                dockerfile_content.push_str(
768                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
769                );
770                dockerfile_content.push_str("    git \\\n");
771                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
772
773                if let Some(packages) = additional_packages {
774                    for package in packages {
775                        dockerfile_content.push_str(&format!("RUN go install {}\n", package));
776                    }
777                }
778            }
779            "dotnet" => {
780                let base_image = version
781                    .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
782                        format!("mcr.microsoft.com/dotnet/sdk:{}", v)
783                    });
784                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
785
786                if let Some(packages) = additional_packages {
787                    for package in packages {
788                        dockerfile_content
789                            .push_str(&format!("RUN dotnet tool install -g {}\n", package));
790                    }
791                }
792            }
793            "rust" => {
794                let base_image =
795                    version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
796                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
797                dockerfile_content.push_str(
798                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
799                );
800                dockerfile_content.push_str("    build-essential \\\n");
801                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
802
803                if let Some(packages) = additional_packages {
804                    for package in packages {
805                        dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
806                    }
807                }
808            }
809            _ => {
810                return Err(ContainerError::ContainerStart(format!(
811                    "Unsupported language: {}",
812                    language
813                )));
814            }
815        }
816
817        // Write the Dockerfile
818        std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
819            ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
820        })?;
821
822        // Build the customized image
823        let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
824        self.build_image(&dockerfile_path, &image_tag).await?;
825
826        // Store the customized image
827        Self::set_language_specific_image("", language, version, &image_tag);
828
829        Ok(image_tag)
830    }
831}
832
833// Move the actual implementation to internal methods
834impl DockerRuntime {
835    async fn run_container_inner(
836        &self,
837        image: &str,
838        cmd: &[&str],
839        env_vars: &[(&str, &str)],
840        working_dir: &Path,
841        volumes: &[(&Path, &Path)],
842    ) -> Result<ContainerOutput, ContainerError> {
843        // Collect environment variables
844        let mut env: Vec<String> = env_vars
845            .iter()
846            .map(|(k, v)| format!("{}={}", k, v))
847            .collect();
848
849        let mut binds = Vec::new();
850        for (host_path, container_path) in volumes {
851            binds.push(format!(
852                "{}:{}",
853                host_path.to_string_lossy(),
854                container_path.to_string_lossy()
855            ));
856        }
857
858        // Convert command vector to Vec<String>
859        let cmd_vec: Vec<String> = cmd.iter().map(|&s| s.to_string()).collect();
860
861        wrkflw_logging::debug(&format!("Running command in Docker: {:?}", cmd_vec));
862        wrkflw_logging::debug(&format!("Environment: {:?}", env));
863        wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
864
865        // Determine platform-specific configurations
866        let is_windows_image = image.contains("windows")
867            || image.contains("servercore")
868            || image.contains("nanoserver");
869        let is_macos_emu =
870            image.contains("act-") && (image.contains("catthehacker") || image.contains("nektos"));
871
872        // Add platform-specific environment variables
873        if is_macos_emu {
874            // Add macOS-specific environment variables
875            env.push("RUNNER_OS=macOS".to_string());
876            env.push("RUNNER_ARCH=X64".to_string());
877            env.push("TMPDIR=/tmp".to_string());
878            env.push("HOME=/root".to_string());
879            env.push("GITHUB_WORKSPACE=/github/workspace".to_string());
880            env.push("PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_string());
881        }
882
883        // Create appropriate container options based on platform
884        let options = Some(CreateContainerOptions {
885            name: format!("wrkflw-{}", uuid::Uuid::new_v4()),
886            platform: if is_windows_image {
887                Some("windows".to_string())
888            } else {
889                None
890            },
891        });
892
893        // Configure host configuration based on platform
894        let host_config = if is_windows_image {
895            HostConfig {
896                binds: Some(binds),
897                isolation: Some(bollard::models::HostConfigIsolationEnum::PROCESS),
898                ..Default::default()
899            }
900        } else {
901            HostConfig {
902                binds: Some(binds),
903                ..Default::default()
904            }
905        };
906
907        // Create container config with platform-specific settings
908        let mut config = Config {
909            image: Some(image.to_string()),
910            cmd: Some(cmd_vec),
911            env: Some(env),
912            working_dir: Some(working_dir.to_string_lossy().to_string()),
913            host_config: Some(host_config),
914            // Windows containers need specific configuration
915            user: if is_windows_image {
916                Some("ContainerAdministrator".to_string())
917            } else {
918                None // Don't specify user for macOS emulation - use default root user
919            },
920            // Map appropriate entrypoint for different platforms
921            entrypoint: if is_macos_emu {
922                // For macOS, ensure we use bash
923                Some(vec!["bash".to_string(), "-l".to_string(), "-c".to_string()])
924            } else {
925                None
926            },
927            ..Default::default()
928        };
929
930        // Run platform-specific container setup
931        if is_macos_emu {
932            // Add special labels for macOS
933            let mut labels = HashMap::new();
934            labels.insert("wrkflw.platform".to_string(), "macos".to_string());
935            config.labels = Some(labels);
936        }
937
938        // Create container with a shorter timeout
939        let create_result = tokio::time::timeout(
940            std::time::Duration::from_secs(15),
941            self.docker.create_container(options, config),
942        )
943        .await;
944
945        let container = match create_result {
946            Ok(Ok(container)) => container,
947            Ok(Err(e)) => return Err(ContainerError::ContainerStart(e.to_string())),
948            Err(_) => {
949                return Err(ContainerError::ContainerStart(
950                    "Container creation timed out".to_string(),
951                ))
952            }
953        };
954
955        // Track the container before starting it to ensure cleanup even if starting fails
956        track_container(&container.id);
957
958        // Start container with a timeout
959        let start_result = tokio::time::timeout(
960            std::time::Duration::from_secs(15),
961            self.docker.start_container::<String>(&container.id, None),
962        )
963        .await;
964
965        match start_result {
966            Ok(Ok(_)) => {}
967            Ok(Err(e)) => {
968                // Clean up the container if start fails
969                let _ = self.docker.remove_container(&container.id, None).await;
970                untrack_container(&container.id);
971                return Err(ContainerError::ContainerExecution(e.to_string()));
972            }
973            Err(_) => {
974                // Clean up the container if starting times out
975                let _ = self.docker.remove_container(&container.id, None).await;
976                untrack_container(&container.id);
977                return Err(ContainerError::ContainerExecution(
978                    "Container start timed out".to_string(),
979                ));
980            }
981        }
982
983        // Wait for container to finish with a timeout (300 seconds)
984        let wait_result = tokio::time::timeout(
985            std::time::Duration::from_secs(300),
986            self.docker
987                .wait_container::<String>(&container.id, None)
988                .collect::<Vec<_>>(),
989        )
990        .await;
991
992        let exit_code = match wait_result {
993            Ok(results) => match results.first() {
994                Some(Ok(exit)) => exit.status_code as i32,
995                _ => -1,
996            },
997            Err(_) => {
998                wrkflw_logging::warning("Container wait operation timed out, treating as failure");
999                -1
1000            }
1001        };
1002
1003        // Get logs with a timeout
1004        let logs_result = tokio::time::timeout(
1005            std::time::Duration::from_secs(10),
1006            self.docker
1007                .logs::<String>(&container.id, None)
1008                .collect::<Vec<_>>(),
1009        )
1010        .await;
1011
1012        let mut stdout = String::new();
1013        let mut stderr = String::new();
1014
1015        if let Ok(logs) = logs_result {
1016            for log in logs.into_iter().flatten() {
1017                match log {
1018                    bollard::container::LogOutput::StdOut { message } => {
1019                        stdout.push_str(&String::from_utf8_lossy(&message));
1020                    }
1021                    bollard::container::LogOutput::StdErr { message } => {
1022                        stderr.push_str(&String::from_utf8_lossy(&message));
1023                    }
1024                    _ => {}
1025                }
1026            }
1027        } else {
1028            wrkflw_logging::warning("Retrieving container logs timed out");
1029        }
1030
1031        // Clean up container with a timeout, but preserve on failure if configured
1032        if exit_code == 0 || !self.preserve_containers_on_failure {
1033            let _ = tokio::time::timeout(
1034                std::time::Duration::from_secs(10),
1035                self.docker.remove_container(&container.id, None),
1036            )
1037            .await;
1038            untrack_container(&container.id);
1039        } else {
1040            // Container failed and we want to preserve it for debugging
1041            wrkflw_logging::info(&format!(
1042                "Preserving container {} for debugging (exit code: {}). Use 'docker exec -it {} bash' to inspect.",
1043                container.id, exit_code, container.id
1044            ));
1045            // Still untrack it from the automatic cleanup system to prevent it from being cleaned up later
1046            untrack_container(&container.id);
1047        }
1048
1049        // Log detailed information about the command execution for debugging
1050        if exit_code != 0 {
1051            wrkflw_logging::info(&format!(
1052                "Docker command failed with exit code: {}",
1053                exit_code
1054            ));
1055            wrkflw_logging::debug(&format!("Failed command: {:?}", cmd));
1056            wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
1057            wrkflw_logging::debug(&format!("STDERR: {}", stderr));
1058        }
1059
1060        Ok(ContainerOutput {
1061            stdout,
1062            stderr,
1063            exit_code,
1064        })
1065    }
1066
1067    async fn pull_image_inner(&self, image: &str) -> Result<(), ContainerError> {
1068        let options = bollard::image::CreateImageOptions {
1069            from_image: image,
1070            ..Default::default()
1071        };
1072
1073        let mut stream = self.docker.create_image(Some(options), None, None);
1074
1075        while let Some(result) = stream.next().await {
1076            if let Err(e) = result {
1077                return Err(ContainerError::ImagePull(e.to_string()));
1078            }
1079        }
1080
1081        Ok(())
1082    }
1083
1084    async fn build_image_inner(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
1085        let _context_dir = dockerfile.parent().unwrap_or(Path::new("."));
1086
1087        let tar_buffer = {
1088            let mut tar_builder = tar::Builder::new(Vec::new());
1089
1090            // Add Dockerfile to tar
1091            if let Ok(file) = std::fs::File::open(dockerfile) {
1092                let mut header = tar::Header::new_gnu();
1093                let metadata = file.metadata().map_err(|e| {
1094                    ContainerError::ContainerExecution(format!(
1095                        "Failed to get file metadata: {}",
1096                        e
1097                    ))
1098                })?;
1099                let modified_time = metadata
1100                    .modified()
1101                    .map_err(|e| {
1102                        ContainerError::ContainerExecution(format!(
1103                            "Failed to get file modification time: {}",
1104                            e
1105                        ))
1106                    })?
1107                    .elapsed()
1108                    .map_err(|e| {
1109                        ContainerError::ContainerExecution(format!(
1110                            "Failed to get elapsed time since modification: {}",
1111                            e
1112                        ))
1113                    })?
1114                    .as_secs();
1115                header.set_size(metadata.len());
1116                header.set_mode(0o644);
1117                header.set_mtime(modified_time);
1118                header.set_cksum();
1119
1120                tar_builder
1121                    .append_data(&mut header, "Dockerfile", file)
1122                    .map_err(|e| ContainerError::ImageBuild(e.to_string()))?;
1123            } else {
1124                return Err(ContainerError::ImageBuild(format!(
1125                    "Cannot open Dockerfile at {}",
1126                    dockerfile.display()
1127                )));
1128            }
1129
1130            tar_builder
1131                .into_inner()
1132                .map_err(|e| ContainerError::ImageBuild(e.to_string()))?
1133        };
1134
1135        let options = bollard::image::BuildImageOptions {
1136            dockerfile: "Dockerfile",
1137            t: tag,
1138            q: false,
1139            nocache: false,
1140            rm: true,
1141            ..Default::default()
1142        };
1143
1144        let mut stream = self
1145            .docker
1146            .build_image(options, None, Some(tar_buffer.into()));
1147
1148        while let Some(result) = stream.next().await {
1149            match result {
1150                Ok(_) => {
1151                    // For verbose output, we could log the build progress here
1152                }
1153                Err(e) => {
1154                    return Err(ContainerError::ImageBuild(e.to_string()));
1155                }
1156            }
1157        }
1158
1159        Ok(())
1160    }
1161}
1162
1163// Public accessor functions for testing
1164#[cfg(test)]
1165pub fn get_tracked_containers() -> Vec<String> {
1166    if let Ok(containers) = RUNNING_CONTAINERS.lock() {
1167        containers.clone()
1168    } else {
1169        vec![]
1170    }
1171}
1172
1173#[cfg(test)]
1174pub fn get_tracked_networks() -> Vec<String> {
1175    if let Ok(networks) = CREATED_NETWORKS.lock() {
1176        networks.clone()
1177    } else {
1178        vec![]
1179    }
1180}