wrkflw_executor/
docker.rs

1use async_trait::async_trait;
2use bollard::{
3    container::{Config, CreateContainerOptions},
4    models::HostConfig,
5    network::CreateNetworkOptions,
6    Docker,
7};
8use futures_util::StreamExt;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::Mutex;
13use wrkflw_logging;
14use wrkflw_runtime::container::{ContainerError, ContainerOutput, ContainerRuntime};
15use wrkflw_utils;
16use wrkflw_utils::fd;
17
18static RUNNING_CONTAINERS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
19static CREATED_NETWORKS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
20// Map to track customized images for a job
21#[allow(dead_code)]
22static CUSTOMIZED_IMAGES: Lazy<Mutex<HashMap<String, String>>> =
23    Lazy::new(|| Mutex::new(HashMap::new()));
24
25pub struct DockerRuntime {
26    docker: Docker,
27    preserve_containers_on_failure: bool,
28}
29
30impl DockerRuntime {
31    pub fn new() -> Result<Self, ContainerError> {
32        Self::new_with_config(false)
33    }
34
35    pub fn new_with_config(preserve_containers_on_failure: bool) -> Result<Self, ContainerError> {
36        let docker = Docker::connect_with_local_defaults().map_err(|e| {
37            ContainerError::ContainerStart(format!("Failed to connect to Docker: {}", e))
38        })?;
39
40        Ok(DockerRuntime {
41            docker,
42            preserve_containers_on_failure,
43        })
44    }
45
46    // Add a method to store and retrieve customized images (e.g., with Python installed)
47    #[allow(dead_code)]
48    pub fn get_customized_image(base_image: &str, customization: &str) -> Option<String> {
49        let key = format!("{}:{}", base_image, customization);
50        match CUSTOMIZED_IMAGES.lock() {
51            Ok(images) => images.get(&key).cloned(),
52            Err(e) => {
53                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
54                None
55            }
56        }
57    }
58
59    #[allow(dead_code)]
60    pub fn set_customized_image(base_image: &str, customization: &str, new_image: &str) {
61        let key = format!("{}:{}", base_image, customization);
62        if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
63            images.insert(key, new_image.to_string());
64        }) {
65            wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
66        }
67    }
68
69    /// Find a customized image key by prefix
70    #[allow(dead_code)]
71    pub fn find_customized_image_key(image: &str, prefix: &str) -> Option<String> {
72        let image_keys = match CUSTOMIZED_IMAGES.lock() {
73            Ok(keys) => keys,
74            Err(e) => {
75                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
76                return None;
77            }
78        };
79
80        // Look for any key that starts with the prefix
81        for (key, _) in image_keys.iter() {
82            if key.starts_with(prefix) {
83                return Some(key.clone());
84            }
85        }
86
87        None
88    }
89
90    /// Get a customized image with language-specific dependencies
91    pub fn get_language_specific_image(
92        base_image: &str,
93        language: &str,
94        version: Option<&str>,
95    ) -> Option<String> {
96        let key = match (language, version) {
97            ("python", Some(ver)) => format!("python:{}", ver),
98            ("node", Some(ver)) => format!("node:{}", ver),
99            ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
100            ("go", Some(ver)) => format!("golang:{}", ver),
101            ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
102            ("rust", Some(ver)) => format!("rust:{}", ver),
103            (lang, Some(ver)) => format!("{}:{}", lang, ver),
104            (lang, None) => lang.to_string(),
105        };
106
107        match CUSTOMIZED_IMAGES.lock() {
108            Ok(images) => images.get(&key).cloned(),
109            Err(e) => {
110                wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
111                None
112            }
113        }
114    }
115
116    /// Set a customized image with language-specific dependencies
117    pub fn set_language_specific_image(
118        base_image: &str,
119        language: &str,
120        version: Option<&str>,
121        new_image: &str,
122    ) {
123        let key = match (language, version) {
124            ("python", Some(ver)) => format!("python:{}", ver),
125            ("node", Some(ver)) => format!("node:{}", ver),
126            ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
127            ("go", Some(ver)) => format!("golang:{}", ver),
128            ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
129            ("rust", Some(ver)) => format!("rust:{}", ver),
130            (lang, Some(ver)) => format!("{}:{}", lang, ver),
131            (lang, None) => lang.to_string(),
132        };
133
134        if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
135            images.insert(key, new_image.to_string());
136        }) {
137            wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
138        }
139    }
140
141    /// Prepare a language-specific environment
142    #[allow(dead_code)]
143    pub async fn prepare_language_environment(
144        &self,
145        language: &str,
146        version: Option<&str>,
147        additional_packages: Option<Vec<String>>,
148    ) -> Result<String, ContainerError> {
149        // Check if we already have a customized image for this language and version
150        let key = format!("{}-{}", language, version.unwrap_or("latest"));
151        if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
152            return Ok(customized_image);
153        }
154
155        // Create a temporary Dockerfile for customization
156        let temp_dir = tempfile::tempdir().map_err(|e| {
157            ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
158        })?;
159
160        let dockerfile_path = temp_dir.path().join("Dockerfile");
161        let mut dockerfile_content = String::new();
162
163        // Add language-specific setup based on the language
164        match language {
165            "python" => {
166                let base_image =
167                    version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
168                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
169                dockerfile_content.push_str(
170                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
171                );
172                dockerfile_content.push_str("    build-essential \\\n");
173                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
174
175                if let Some(packages) = additional_packages {
176                    for package in packages {
177                        dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
178                    }
179                }
180            }
181            "node" => {
182                let base_image =
183                    version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
184                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
185                dockerfile_content.push_str(
186                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
187                );
188                dockerfile_content.push_str("    build-essential \\\n");
189                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
190
191                if let Some(packages) = additional_packages {
192                    for package in packages {
193                        dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
194                    }
195                }
196            }
197            "java" => {
198                let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
199                    format!("eclipse-temurin:{}", v)
200                });
201                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
202                dockerfile_content.push_str(
203                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
204                );
205                dockerfile_content.push_str("    maven \\\n");
206                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
207            }
208            "go" => {
209                let base_image =
210                    version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
211                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
212                dockerfile_content.push_str(
213                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
214                );
215                dockerfile_content.push_str("    git \\\n");
216                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
217
218                if let Some(packages) = additional_packages {
219                    for package in packages {
220                        dockerfile_content.push_str(&format!("RUN go install {}\n", package));
221                    }
222                }
223            }
224            "dotnet" => {
225                let base_image = version
226                    .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
227                        format!("mcr.microsoft.com/dotnet/sdk:{}", v)
228                    });
229                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
230
231                if let Some(packages) = additional_packages {
232                    for package in packages {
233                        dockerfile_content
234                            .push_str(&format!("RUN dotnet tool install -g {}\n", package));
235                    }
236                }
237            }
238            "rust" => {
239                let base_image =
240                    version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
241                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
242                dockerfile_content.push_str(
243                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
244                );
245                dockerfile_content.push_str("    build-essential \\\n");
246                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
247
248                if let Some(packages) = additional_packages {
249                    for package in packages {
250                        dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
251                    }
252                }
253            }
254            _ => {
255                return Err(ContainerError::ContainerStart(format!(
256                    "Unsupported language: {}",
257                    language
258                )));
259            }
260        }
261
262        // Write the Dockerfile
263        std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
264            ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
265        })?;
266
267        // Build the customized image
268        let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
269        self.build_image(&dockerfile_path, &image_tag).await?;
270
271        // Store the customized image
272        Self::set_language_specific_image("", language, version, &image_tag);
273
274        Ok(image_tag)
275    }
276}
277
278pub fn is_available() -> bool {
279    // Use a very short timeout for the entire availability check
280    let overall_timeout = std::time::Duration::from_secs(3);
281
282    // Spawn a thread with the timeout to prevent blocking the main thread
283    let handle = std::thread::spawn(move || {
284        // Use safe FD redirection utility to suppress Docker error messages
285        match fd::with_stderr_to_null(|| {
286            // First, check if docker CLI is available as a quick test
287            if cfg!(target_os = "linux") || cfg!(target_os = "macos") {
288                // Try a simple docker version command with a short timeout
289                let process = std::process::Command::new("docker")
290                    .arg("version")
291                    .arg("--format")
292                    .arg("{{.Server.Version}}")
293                    .stdout(std::process::Stdio::null())
294                    .stderr(std::process::Stdio::null())
295                    .spawn();
296
297                match process {
298                    Ok(mut child) => {
299                        // Set a very short timeout for the process
300                        let status = std::thread::scope(|_| {
301                            // Try to wait for a short time
302                            for _ in 0..10 {
303                                match child.try_wait() {
304                                    Ok(Some(status)) => return status.success(),
305                                    Ok(None) => {
306                                        std::thread::sleep(std::time::Duration::from_millis(100))
307                                    }
308                                    Err(_) => return false,
309                                }
310                            }
311                            // Kill it if it takes too long
312                            let _ = child.kill();
313                            false
314                        });
315
316                        if !status {
317                            return false;
318                        }
319                    }
320                    Err(_) => {
321                        wrkflw_logging::debug("Docker CLI is not available");
322                        return false;
323                    }
324                }
325            }
326
327            // Try to connect to Docker daemon with a short timeout
328            let runtime = match tokio::runtime::Builder::new_current_thread()
329                .enable_all()
330                .build()
331            {
332                Ok(rt) => rt,
333                Err(e) => {
334                    wrkflw_logging::error(&format!(
335                        "Failed to create runtime for Docker availability check: {}",
336                        e
337                    ));
338                    return false;
339                }
340            };
341
342            runtime.block_on(async {
343                match tokio::time::timeout(std::time::Duration::from_secs(2), async {
344                    match Docker::connect_with_local_defaults() {
345                        Ok(docker) => {
346                            // Try to ping the Docker daemon with a short timeout
347                            match tokio::time::timeout(
348                                std::time::Duration::from_secs(1),
349                                docker.ping(),
350                            )
351                            .await
352                            {
353                                Ok(Ok(_)) => true,
354                                Ok(Err(e)) => {
355                                    wrkflw_logging::debug(&format!(
356                                        "Docker daemon ping failed: {}",
357                                        e
358                                    ));
359                                    false
360                                }
361                                Err(_) => {
362                                    wrkflw_logging::debug(
363                                        "Docker daemon ping timed out after 1 second",
364                                    );
365                                    false
366                                }
367                            }
368                        }
369                        Err(e) => {
370                            wrkflw_logging::debug(&format!(
371                                "Docker daemon connection failed: {}",
372                                e
373                            ));
374                            false
375                        }
376                    }
377                })
378                .await
379                {
380                    Ok(result) => result,
381                    Err(_) => {
382                        wrkflw_logging::debug("Docker availability check timed out");
383                        false
384                    }
385                }
386            })
387        }) {
388            Ok(result) => result,
389            Err(_) => {
390                wrkflw_logging::debug(
391                    "Failed to redirect stderr when checking Docker availability",
392                );
393                false
394            }
395        }
396    });
397
398    // Manual implementation of join with timeout
399    let start = std::time::Instant::now();
400
401    while start.elapsed() < overall_timeout {
402        if handle.is_finished() {
403            return match handle.join() {
404                Ok(result) => result,
405                Err(_) => {
406                    wrkflw_logging::warning("Docker availability check thread panicked");
407                    false
408                }
409            };
410        }
411        std::thread::sleep(std::time::Duration::from_millis(50));
412    }
413
414    wrkflw_logging::warning(
415        "Docker availability check timed out, assuming Docker is not available",
416    );
417    false
418}
419
420// Add container to tracking
421pub fn track_container(id: &str) {
422    if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
423        containers.push(id.to_string());
424    }
425}
426
427// Remove container from tracking
428pub fn untrack_container(id: &str) {
429    if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
430        containers.retain(|c| c != id);
431    }
432}
433
434// Add network to tracking
435pub fn track_network(id: &str) {
436    if let Ok(mut networks) = CREATED_NETWORKS.lock() {
437        networks.push(id.to_string());
438    }
439}
440
441// Remove network from tracking
442pub fn untrack_network(id: &str) {
443    if let Ok(mut networks) = CREATED_NETWORKS.lock() {
444        networks.retain(|n| n != id);
445    }
446}
447
448// Clean up all tracked resources
449pub async fn cleanup_resources(docker: &Docker) {
450    // Use a global timeout for the entire cleanup process
451    let cleanup_timeout = std::time::Duration::from_secs(5);
452
453    match tokio::time::timeout(cleanup_timeout, async {
454        // Perform both cleanups in parallel for efficiency
455        let (container_result, network_result) =
456            tokio::join!(cleanup_containers(docker), cleanup_networks(docker));
457
458        if let Err(e) = container_result {
459            wrkflw_logging::error(&format!("Error during container cleanup: {}", e));
460        }
461
462        if let Err(e) = network_result {
463            wrkflw_logging::error(&format!("Error during network cleanup: {}", e));
464        }
465    })
466    .await
467    {
468        Ok(_) => wrkflw_logging::debug("Docker cleanup completed within timeout"),
469        Err(_) => wrkflw_logging::warning(
470            "Docker cleanup timed out, some resources may not have been removed",
471        ),
472    }
473}
474
475// Clean up all tracked containers
476pub async fn cleanup_containers(docker: &Docker) -> Result<(), String> {
477    // Getting the containers to clean up should not take a long time
478    let containers_to_cleanup =
479        match tokio::time::timeout(std::time::Duration::from_millis(500), async {
480            match RUNNING_CONTAINERS.try_lock() {
481                Ok(containers) => containers.clone(),
482                Err(_) => {
483                    wrkflw_logging::error("Could not acquire container lock for cleanup");
484                    vec![]
485                }
486            }
487        })
488        .await
489        {
490            Ok(containers) => containers,
491            Err(_) => {
492                wrkflw_logging::error("Timeout while trying to get containers for cleanup");
493                vec![]
494            }
495        };
496
497    if containers_to_cleanup.is_empty() {
498        return Ok(());
499    }
500
501    wrkflw_logging::info(&format!(
502        "Cleaning up {} containers",
503        containers_to_cleanup.len()
504    ));
505
506    // Process each container with a timeout
507    for container_id in containers_to_cleanup {
508        // First try to stop the container
509        match tokio::time::timeout(
510            std::time::Duration::from_millis(1000),
511            docker.stop_container(&container_id, None),
512        )
513        .await
514        {
515            Ok(Ok(_)) => wrkflw_logging::debug(&format!("Stopped container: {}", container_id)),
516            Ok(Err(e)) => wrkflw_logging::warning(&format!(
517                "Error stopping container {}: {}",
518                container_id, e
519            )),
520            Err(_) => {
521                wrkflw_logging::warning(&format!("Timeout stopping container: {}", container_id))
522            }
523        }
524
525        // Then try to remove it
526        match tokio::time::timeout(
527            std::time::Duration::from_millis(1000),
528            docker.remove_container(&container_id, None),
529        )
530        .await
531        {
532            Ok(Ok(_)) => wrkflw_logging::debug(&format!("Removed container: {}", container_id)),
533            Ok(Err(e)) => wrkflw_logging::warning(&format!(
534                "Error removing container {}: {}",
535                container_id, e
536            )),
537            Err(_) => {
538                wrkflw_logging::warning(&format!("Timeout removing container: {}", container_id))
539            }
540        }
541
542        // Always untrack the container whether or not we succeeded to avoid future cleanup attempts
543        untrack_container(&container_id);
544    }
545
546    Ok(())
547}
548
549// Clean up all tracked networks
550pub async fn cleanup_networks(docker: &Docker) -> Result<(), String> {
551    // Getting the networks to clean up should not take a long time
552    let networks_to_cleanup =
553        match tokio::time::timeout(std::time::Duration::from_millis(500), async {
554            match CREATED_NETWORKS.try_lock() {
555                Ok(networks) => networks.clone(),
556                Err(_) => {
557                    wrkflw_logging::error("Could not acquire network lock for cleanup");
558                    vec![]
559                }
560            }
561        })
562        .await
563        {
564            Ok(networks) => networks,
565            Err(_) => {
566                wrkflw_logging::error("Timeout while trying to get networks for cleanup");
567                vec![]
568            }
569        };
570
571    if networks_to_cleanup.is_empty() {
572        return Ok(());
573    }
574
575    wrkflw_logging::info(&format!(
576        "Cleaning up {} networks",
577        networks_to_cleanup.len()
578    ));
579
580    for network_id in networks_to_cleanup {
581        match tokio::time::timeout(
582            std::time::Duration::from_millis(1000),
583            docker.remove_network(&network_id),
584        )
585        .await
586        {
587            Ok(Ok(_)) => {
588                wrkflw_logging::info(&format!("Successfully removed network: {}", network_id))
589            }
590            Ok(Err(e)) => {
591                wrkflw_logging::error(&format!("Error removing network {}: {}", network_id, e))
592            }
593            Err(_) => wrkflw_logging::warning(&format!("Timeout removing network: {}", network_id)),
594        }
595
596        // Always untrack the network whether or not we succeeded
597        untrack_network(&network_id);
598    }
599
600    Ok(())
601}
602
603// Create a new Docker network for a job
604pub async fn create_job_network(docker: &Docker) -> Result<String, ContainerError> {
605    let network_name = format!("wrkflw-network-{}", uuid::Uuid::new_v4());
606
607    let options = CreateNetworkOptions {
608        name: network_name.clone(),
609        driver: "bridge".to_string(),
610        ..Default::default()
611    };
612
613    let network = docker
614        .create_network(options)
615        .await
616        .map_err(|e| ContainerError::NetworkCreation(e.to_string()))?;
617
618    // network.id is Option<String>, unwrap it safely
619    let network_id = network.id.ok_or_else(|| {
620        ContainerError::NetworkOperation("Network created but no ID returned".to_string())
621    })?;
622
623    track_network(&network_id);
624    wrkflw_logging::info(&format!("Created Docker network: {}", network_id));
625
626    Ok(network_id)
627}
628
629#[async_trait]
630impl ContainerRuntime for DockerRuntime {
631    async fn run_container(
632        &self,
633        image: &str,
634        cmd: &[&str],
635        env_vars: &[(&str, &str)],
636        working_dir: &Path,
637        volumes: &[(&Path, &Path)],
638    ) -> Result<ContainerOutput, ContainerError> {
639        // Print detailed debugging info
640        wrkflw_logging::info(&format!("Docker: Running container with image: {}", image));
641
642        // Add a global timeout for all Docker operations to prevent freezing
643        let timeout_duration = std::time::Duration::from_secs(360); // Increased outer timeout to 6 minutes
644
645        // Run the entire container operation with a timeout
646        match tokio::time::timeout(
647            timeout_duration,
648            self.run_container_inner(image, cmd, env_vars, working_dir, volumes),
649        )
650        .await
651        {
652            Ok(result) => result,
653            Err(_) => {
654                wrkflw_logging::error("Docker operation timed out after 360 seconds");
655                Err(ContainerError::ContainerExecution(
656                    "Operation timed out".to_string(),
657                ))
658            }
659        }
660    }
661
662    async fn pull_image(&self, image: &str) -> Result<(), ContainerError> {
663        // Add a timeout for pull operations
664        let timeout_duration = std::time::Duration::from_secs(30);
665
666        match tokio::time::timeout(timeout_duration, self.pull_image_inner(image)).await {
667            Ok(result) => result,
668            Err(_) => {
669                wrkflw_logging::warning(&format!(
670                    "Pull of image {} timed out, continuing with existing image",
671                    image
672                ));
673                // Return success to allow continuing with existing image
674                Ok(())
675            }
676        }
677    }
678
679    async fn build_image(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
680        // Add a timeout for build operations
681        let timeout_duration = std::time::Duration::from_secs(120); // 2 minutes timeout for builds
682
683        match tokio::time::timeout(timeout_duration, self.build_image_inner(dockerfile, tag)).await
684        {
685            Ok(result) => result,
686            Err(_) => {
687                wrkflw_logging::error(&format!(
688                    "Building image {} timed out after 120 seconds",
689                    tag
690                ));
691                Err(ContainerError::ImageBuild(
692                    "Operation timed out".to_string(),
693                ))
694            }
695        }
696    }
697
698    async fn prepare_language_environment(
699        &self,
700        language: &str,
701        version: Option<&str>,
702        additional_packages: Option<Vec<String>>,
703    ) -> Result<String, ContainerError> {
704        // Check if we already have a customized image for this language and version
705        let key = format!("{}-{}", language, version.unwrap_or("latest"));
706        if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
707            return Ok(customized_image);
708        }
709
710        // Create a temporary Dockerfile for customization
711        let temp_dir = tempfile::tempdir().map_err(|e| {
712            ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
713        })?;
714
715        let dockerfile_path = temp_dir.path().join("Dockerfile");
716        let mut dockerfile_content = String::new();
717
718        // Add language-specific setup based on the language
719        match language {
720            "python" => {
721                let base_image =
722                    version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
723                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
724                dockerfile_content.push_str(
725                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
726                );
727                dockerfile_content.push_str("    build-essential \\\n");
728                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
729
730                if let Some(packages) = additional_packages {
731                    for package in packages {
732                        dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
733                    }
734                }
735            }
736            "node" => {
737                let base_image =
738                    version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
739                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
740                dockerfile_content.push_str(
741                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
742                );
743                dockerfile_content.push_str("    build-essential \\\n");
744                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
745
746                if let Some(packages) = additional_packages {
747                    for package in packages {
748                        dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
749                    }
750                }
751            }
752            "java" => {
753                let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
754                    format!("eclipse-temurin:{}", v)
755                });
756                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
757                dockerfile_content.push_str(
758                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
759                );
760                dockerfile_content.push_str("    maven \\\n");
761                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
762            }
763            "go" => {
764                let base_image =
765                    version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
766                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
767                dockerfile_content.push_str(
768                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
769                );
770                dockerfile_content.push_str("    git \\\n");
771                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
772
773                if let Some(packages) = additional_packages {
774                    for package in packages {
775                        dockerfile_content.push_str(&format!("RUN go install {}\n", package));
776                    }
777                }
778            }
779            "dotnet" => {
780                let base_image = version
781                    .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
782                        format!("mcr.microsoft.com/dotnet/sdk:{}", v)
783                    });
784                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
785
786                if let Some(packages) = additional_packages {
787                    for package in packages {
788                        dockerfile_content
789                            .push_str(&format!("RUN dotnet tool install -g {}\n", package));
790                    }
791                }
792            }
793            "rust" => {
794                let base_image =
795                    version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
796                dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
797                dockerfile_content.push_str(
798                    "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
799                );
800                dockerfile_content.push_str("    build-essential \\\n");
801                dockerfile_content.push_str("    && rm -rf /var/lib/apt/lists/*\n");
802
803                if let Some(packages) = additional_packages {
804                    for package in packages {
805                        dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
806                    }
807                }
808            }
809            _ => {
810                return Err(ContainerError::ContainerStart(format!(
811                    "Unsupported language: {}",
812                    language
813                )));
814            }
815        }
816
817        // Write the Dockerfile
818        std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
819            ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
820        })?;
821
822        // Build the customized image
823        let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
824        self.build_image(&dockerfile_path, &image_tag).await?;
825
826        // Store the customized image
827        Self::set_language_specific_image("", language, version, &image_tag);
828
829        Ok(image_tag)
830    }
831}
832
833// Move the actual implementation to internal methods
834impl DockerRuntime {
835    async fn run_container_inner(
836        &self,
837        image: &str,
838        cmd: &[&str],
839        env_vars: &[(&str, &str)],
840        working_dir: &Path,
841        volumes: &[(&Path, &Path)],
842    ) -> Result<ContainerOutput, ContainerError> {
843        // First, try to pull the image if it's not available locally
844        if let Err(e) = self.pull_image_inner(image).await {
845            wrkflw_logging::warning(&format!(
846                "Failed to pull image {}: {}. Attempting to continue with existing image.",
847                image, e
848            ));
849        }
850
851        // Collect environment variables
852        let mut env: Vec<String> = env_vars
853            .iter()
854            .map(|(k, v)| format!("{}={}", k, v))
855            .collect();
856
857        let mut binds = Vec::new();
858        for (host_path, container_path) in volumes {
859            binds.push(format!(
860                "{}:{}",
861                host_path.to_string_lossy(),
862                container_path.to_string_lossy()
863            ));
864        }
865
866        // Convert command vector to Vec<String>
867        let cmd_vec: Vec<String> = cmd.iter().map(|&s| s.to_string()).collect();
868
869        wrkflw_logging::debug(&format!("Running command in Docker: {:?}", cmd_vec));
870        wrkflw_logging::debug(&format!("Environment: {:?}", env));
871        wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
872
873        // Determine platform-specific configurations
874        let is_windows_image = image.contains("windows")
875            || image.contains("servercore")
876            || image.contains("nanoserver");
877        let is_macos_emu =
878            image.contains("act-") && (image.contains("catthehacker") || image.contains("nektos"));
879
880        // Add platform-specific environment variables
881        if is_macos_emu {
882            // Add macOS-specific environment variables
883            env.push("RUNNER_OS=macOS".to_string());
884            env.push("RUNNER_ARCH=X64".to_string());
885            env.push("TMPDIR=/tmp".to_string());
886            env.push("HOME=/root".to_string());
887            env.push("GITHUB_WORKSPACE=/github/workspace".to_string());
888            env.push("PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_string());
889        }
890
891        // Create appropriate container options based on platform
892        let options = Some(CreateContainerOptions {
893            name: format!("wrkflw-{}", uuid::Uuid::new_v4()),
894            platform: if is_windows_image {
895                Some("windows".to_string())
896            } else {
897                None
898            },
899        });
900
901        // Configure host configuration based on platform
902        let host_config = if is_windows_image {
903            HostConfig {
904                binds: Some(binds),
905                isolation: Some(bollard::models::HostConfigIsolationEnum::PROCESS),
906                ..Default::default()
907            }
908        } else {
909            HostConfig {
910                binds: Some(binds),
911                ..Default::default()
912            }
913        };
914
915        // Create container config with platform-specific settings
916        let mut config = Config {
917            image: Some(image.to_string()),
918            cmd: Some(cmd_vec),
919            env: Some(env),
920            working_dir: Some(working_dir.to_string_lossy().to_string()),
921            host_config: Some(host_config),
922            // Windows containers need specific configuration
923            user: if is_windows_image {
924                Some("ContainerAdministrator".to_string())
925            } else {
926                None // Don't specify user for macOS emulation - use default root user
927            },
928            // Map appropriate entrypoint for different platforms
929            entrypoint: if is_macos_emu {
930                // For macOS, ensure we use bash
931                Some(vec!["bash".to_string(), "-l".to_string(), "-c".to_string()])
932            } else {
933                None
934            },
935            ..Default::default()
936        };
937
938        // Run platform-specific container setup
939        if is_macos_emu {
940            // Add special labels for macOS
941            let mut labels = HashMap::new();
942            labels.insert("wrkflw.platform".to_string(), "macos".to_string());
943            config.labels = Some(labels);
944        }
945
946        // Create container with a shorter timeout
947        let create_result = tokio::time::timeout(
948            std::time::Duration::from_secs(15),
949            self.docker.create_container(options, config),
950        )
951        .await;
952
953        let container = match create_result {
954            Ok(Ok(container)) => container,
955            Ok(Err(e)) => return Err(ContainerError::ContainerStart(e.to_string())),
956            Err(_) => {
957                return Err(ContainerError::ContainerStart(
958                    "Container creation timed out".to_string(),
959                ))
960            }
961        };
962
963        // Track the container before starting it to ensure cleanup even if starting fails
964        track_container(&container.id);
965
966        // Start container with a timeout
967        let start_result = tokio::time::timeout(
968            std::time::Duration::from_secs(15),
969            self.docker.start_container::<String>(&container.id, None),
970        )
971        .await;
972
973        match start_result {
974            Ok(Ok(_)) => {}
975            Ok(Err(e)) => {
976                // Clean up the container if start fails
977                let _ = self.docker.remove_container(&container.id, None).await;
978                untrack_container(&container.id);
979                return Err(ContainerError::ContainerExecution(e.to_string()));
980            }
981            Err(_) => {
982                // Clean up the container if starting times out
983                let _ = self.docker.remove_container(&container.id, None).await;
984                untrack_container(&container.id);
985                return Err(ContainerError::ContainerExecution(
986                    "Container start timed out".to_string(),
987                ));
988            }
989        }
990
991        // Wait for container to finish with a timeout (300 seconds)
992        let wait_result = tokio::time::timeout(
993            std::time::Duration::from_secs(300),
994            self.docker
995                .wait_container::<String>(&container.id, None)
996                .collect::<Vec<_>>(),
997        )
998        .await;
999
1000        let exit_code = match wait_result {
1001            Ok(results) => match results.first() {
1002                Some(Ok(exit)) => exit.status_code as i32,
1003                _ => -1,
1004            },
1005            Err(_) => {
1006                wrkflw_logging::warning("Container wait operation timed out, treating as failure");
1007                -1
1008            }
1009        };
1010
1011        // Get logs with a timeout
1012        let logs_result = tokio::time::timeout(
1013            std::time::Duration::from_secs(10),
1014            self.docker
1015                .logs::<String>(&container.id, None)
1016                .collect::<Vec<_>>(),
1017        )
1018        .await;
1019
1020        let mut stdout = String::new();
1021        let mut stderr = String::new();
1022
1023        if let Ok(logs) = logs_result {
1024            for log in logs.into_iter().flatten() {
1025                match log {
1026                    bollard::container::LogOutput::StdOut { message } => {
1027                        stdout.push_str(&String::from_utf8_lossy(&message));
1028                    }
1029                    bollard::container::LogOutput::StdErr { message } => {
1030                        stderr.push_str(&String::from_utf8_lossy(&message));
1031                    }
1032                    _ => {}
1033                }
1034            }
1035        } else {
1036            wrkflw_logging::warning("Retrieving container logs timed out");
1037        }
1038
1039        // Clean up container with a timeout, but preserve on failure if configured
1040        if exit_code == 0 || !self.preserve_containers_on_failure {
1041            let _ = tokio::time::timeout(
1042                std::time::Duration::from_secs(10),
1043                self.docker.remove_container(&container.id, None),
1044            )
1045            .await;
1046            untrack_container(&container.id);
1047        } else {
1048            // Container failed and we want to preserve it for debugging
1049            wrkflw_logging::info(&format!(
1050                "Preserving container {} for debugging (exit code: {}). Use 'docker exec -it {} bash' to inspect.",
1051                container.id, exit_code, container.id
1052            ));
1053            // Still untrack it from the automatic cleanup system to prevent it from being cleaned up later
1054            untrack_container(&container.id);
1055        }
1056
1057        // Log detailed information about the command execution for debugging
1058        if exit_code != 0 {
1059            wrkflw_logging::info(&format!(
1060                "Docker command failed with exit code: {}",
1061                exit_code
1062            ));
1063            wrkflw_logging::debug(&format!("Failed command: {:?}", cmd));
1064            wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
1065            wrkflw_logging::debug(&format!("STDERR: {}", stderr));
1066        }
1067
1068        Ok(ContainerOutput {
1069            stdout,
1070            stderr,
1071            exit_code,
1072        })
1073    }
1074
1075    async fn pull_image_inner(&self, image: &str) -> Result<(), ContainerError> {
1076        let options = bollard::image::CreateImageOptions {
1077            from_image: image,
1078            ..Default::default()
1079        };
1080
1081        let mut stream = self.docker.create_image(Some(options), None, None);
1082
1083        while let Some(result) = stream.next().await {
1084            if let Err(e) = result {
1085                return Err(ContainerError::ImagePull(e.to_string()));
1086            }
1087        }
1088
1089        Ok(())
1090    }
1091
1092    async fn build_image_inner(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
1093        let _context_dir = dockerfile.parent().unwrap_or(Path::new("."));
1094
1095        let tar_buffer = {
1096            let mut tar_builder = tar::Builder::new(Vec::new());
1097
1098            // Add Dockerfile to tar
1099            if let Ok(file) = std::fs::File::open(dockerfile) {
1100                let mut header = tar::Header::new_gnu();
1101                let metadata = file.metadata().map_err(|e| {
1102                    ContainerError::ContainerExecution(format!(
1103                        "Failed to get file metadata: {}",
1104                        e
1105                    ))
1106                })?;
1107                let modified_time = metadata
1108                    .modified()
1109                    .map_err(|e| {
1110                        ContainerError::ContainerExecution(format!(
1111                            "Failed to get file modification time: {}",
1112                            e
1113                        ))
1114                    })?
1115                    .elapsed()
1116                    .map_err(|e| {
1117                        ContainerError::ContainerExecution(format!(
1118                            "Failed to get elapsed time since modification: {}",
1119                            e
1120                        ))
1121                    })?
1122                    .as_secs();
1123                header.set_size(metadata.len());
1124                header.set_mode(0o644);
1125                header.set_mtime(modified_time);
1126                header.set_cksum();
1127
1128                tar_builder
1129                    .append_data(&mut header, "Dockerfile", file)
1130                    .map_err(|e| ContainerError::ImageBuild(e.to_string()))?;
1131            } else {
1132                return Err(ContainerError::ImageBuild(format!(
1133                    "Cannot open Dockerfile at {}",
1134                    dockerfile.display()
1135                )));
1136            }
1137
1138            tar_builder
1139                .into_inner()
1140                .map_err(|e| ContainerError::ImageBuild(e.to_string()))?
1141        };
1142
1143        let options = bollard::image::BuildImageOptions {
1144            dockerfile: "Dockerfile",
1145            t: tag,
1146            q: false,
1147            nocache: false,
1148            rm: true,
1149            ..Default::default()
1150        };
1151
1152        let mut stream = self
1153            .docker
1154            .build_image(options, None, Some(tar_buffer.into()));
1155
1156        while let Some(result) = stream.next().await {
1157            match result {
1158                Ok(_) => {
1159                    // For verbose output, we could log the build progress here
1160                }
1161                Err(e) => {
1162                    return Err(ContainerError::ImageBuild(e.to_string()));
1163                }
1164            }
1165        }
1166
1167        Ok(())
1168    }
1169}
1170
1171// Public accessor functions for testing
1172#[cfg(test)]
1173pub fn get_tracked_containers() -> Vec<String> {
1174    if let Ok(containers) = RUNNING_CONTAINERS.lock() {
1175        containers.clone()
1176    } else {
1177        vec![]
1178    }
1179}
1180
1181#[cfg(test)]
1182pub fn get_tracked_networks() -> Vec<String> {
1183    if let Ok(networks) = CREATED_NETWORKS.lock() {
1184        networks.clone()
1185    } else {
1186        vec![]
1187    }
1188}