1use async_trait::async_trait;
2use bollard::{
3 container::{Config, CreateContainerOptions},
4 models::HostConfig,
5 network::CreateNetworkOptions,
6 Docker,
7};
8use futures_util::StreamExt;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::Mutex;
13use wrkflw_logging;
14use wrkflw_runtime::container::{ContainerError, ContainerOutput, ContainerRuntime};
15use wrkflw_utils;
16use wrkflw_utils::fd;
17
18static RUNNING_CONTAINERS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
19static CREATED_NETWORKS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
20#[allow(dead_code)]
22static CUSTOMIZED_IMAGES: Lazy<Mutex<HashMap<String, String>>> =
23 Lazy::new(|| Mutex::new(HashMap::new()));
24
25pub struct DockerRuntime {
26 docker: Docker,
27 preserve_containers_on_failure: bool,
28}
29
30impl DockerRuntime {
31 pub fn new() -> Result<Self, ContainerError> {
32 Self::new_with_config(false)
33 }
34
35 pub fn new_with_config(preserve_containers_on_failure: bool) -> Result<Self, ContainerError> {
36 let docker = Docker::connect_with_local_defaults().map_err(|e| {
37 ContainerError::ContainerStart(format!("Failed to connect to Docker: {}", e))
38 })?;
39
40 Ok(DockerRuntime {
41 docker,
42 preserve_containers_on_failure,
43 })
44 }
45
46 #[allow(dead_code)]
48 pub fn get_customized_image(base_image: &str, customization: &str) -> Option<String> {
49 let key = format!("{}:{}", base_image, customization);
50 match CUSTOMIZED_IMAGES.lock() {
51 Ok(images) => images.get(&key).cloned(),
52 Err(e) => {
53 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
54 None
55 }
56 }
57 }
58
59 #[allow(dead_code)]
60 pub fn set_customized_image(base_image: &str, customization: &str, new_image: &str) {
61 let key = format!("{}:{}", base_image, customization);
62 if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
63 images.insert(key, new_image.to_string());
64 }) {
65 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
66 }
67 }
68
69 #[allow(dead_code)]
71 pub fn find_customized_image_key(image: &str, prefix: &str) -> Option<String> {
72 let image_keys = match CUSTOMIZED_IMAGES.lock() {
73 Ok(keys) => keys,
74 Err(e) => {
75 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
76 return None;
77 }
78 };
79
80 for (key, _) in image_keys.iter() {
82 if key.starts_with(prefix) {
83 return Some(key.clone());
84 }
85 }
86
87 None
88 }
89
90 pub fn get_language_specific_image(
92 base_image: &str,
93 language: &str,
94 version: Option<&str>,
95 ) -> Option<String> {
96 let key = match (language, version) {
97 ("python", Some(ver)) => format!("python:{}", ver),
98 ("node", Some(ver)) => format!("node:{}", ver),
99 ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
100 ("go", Some(ver)) => format!("golang:{}", ver),
101 ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
102 ("rust", Some(ver)) => format!("rust:{}", ver),
103 (lang, Some(ver)) => format!("{}:{}", lang, ver),
104 (lang, None) => lang.to_string(),
105 };
106
107 match CUSTOMIZED_IMAGES.lock() {
108 Ok(images) => images.get(&key).cloned(),
109 Err(e) => {
110 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
111 None
112 }
113 }
114 }
115
116 pub fn set_language_specific_image(
118 base_image: &str,
119 language: &str,
120 version: Option<&str>,
121 new_image: &str,
122 ) {
123 let key = match (language, version) {
124 ("python", Some(ver)) => format!("python:{}", ver),
125 ("node", Some(ver)) => format!("node:{}", ver),
126 ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
127 ("go", Some(ver)) => format!("golang:{}", ver),
128 ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
129 ("rust", Some(ver)) => format!("rust:{}", ver),
130 (lang, Some(ver)) => format!("{}:{}", lang, ver),
131 (lang, None) => lang.to_string(),
132 };
133
134 if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
135 images.insert(key, new_image.to_string());
136 }) {
137 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
138 }
139 }
140
141 #[allow(dead_code)]
143 pub async fn prepare_language_environment(
144 &self,
145 language: &str,
146 version: Option<&str>,
147 additional_packages: Option<Vec<String>>,
148 ) -> Result<String, ContainerError> {
149 let key = format!("{}-{}", language, version.unwrap_or("latest"));
151 if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
152 return Ok(customized_image);
153 }
154
155 let temp_dir = tempfile::tempdir().map_err(|e| {
157 ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
158 })?;
159
160 let dockerfile_path = temp_dir.path().join("Dockerfile");
161 let mut dockerfile_content = String::new();
162
163 match language {
165 "python" => {
166 let base_image =
167 version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
168 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
169 dockerfile_content.push_str(
170 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
171 );
172 dockerfile_content.push_str(" build-essential \\\n");
173 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
174
175 if let Some(packages) = additional_packages {
176 for package in packages {
177 dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
178 }
179 }
180 }
181 "node" => {
182 let base_image =
183 version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
184 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
185 dockerfile_content.push_str(
186 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
187 );
188 dockerfile_content.push_str(" build-essential \\\n");
189 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
190
191 if let Some(packages) = additional_packages {
192 for package in packages {
193 dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
194 }
195 }
196 }
197 "java" => {
198 let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
199 format!("eclipse-temurin:{}", v)
200 });
201 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
202 dockerfile_content.push_str(
203 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
204 );
205 dockerfile_content.push_str(" maven \\\n");
206 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
207 }
208 "go" => {
209 let base_image =
210 version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
211 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
212 dockerfile_content.push_str(
213 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
214 );
215 dockerfile_content.push_str(" git \\\n");
216 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
217
218 if let Some(packages) = additional_packages {
219 for package in packages {
220 dockerfile_content.push_str(&format!("RUN go install {}\n", package));
221 }
222 }
223 }
224 "dotnet" => {
225 let base_image = version
226 .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
227 format!("mcr.microsoft.com/dotnet/sdk:{}", v)
228 });
229 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
230
231 if let Some(packages) = additional_packages {
232 for package in packages {
233 dockerfile_content
234 .push_str(&format!("RUN dotnet tool install -g {}\n", package));
235 }
236 }
237 }
238 "rust" => {
239 let base_image =
240 version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
241 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
242 dockerfile_content.push_str(
243 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
244 );
245 dockerfile_content.push_str(" build-essential \\\n");
246 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
247
248 if let Some(packages) = additional_packages {
249 for package in packages {
250 dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
251 }
252 }
253 }
254 _ => {
255 return Err(ContainerError::ContainerStart(format!(
256 "Unsupported language: {}",
257 language
258 )));
259 }
260 }
261
262 std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
264 ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
265 })?;
266
267 let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
269 self.build_image(&dockerfile_path, &image_tag).await?;
270
271 Self::set_language_specific_image("", language, version, &image_tag);
273
274 Ok(image_tag)
275 }
276}
277
278pub fn is_available() -> bool {
279 let overall_timeout = std::time::Duration::from_secs(3);
281
282 let handle = std::thread::spawn(move || {
284 match fd::with_stderr_to_null(|| {
286 if cfg!(target_os = "linux") || cfg!(target_os = "macos") {
288 let process = std::process::Command::new("docker")
290 .arg("version")
291 .arg("--format")
292 .arg("{{.Server.Version}}")
293 .stdout(std::process::Stdio::null())
294 .stderr(std::process::Stdio::null())
295 .spawn();
296
297 match process {
298 Ok(mut child) => {
299 let status = std::thread::scope(|_| {
301 for _ in 0..10 {
303 match child.try_wait() {
304 Ok(Some(status)) => return status.success(),
305 Ok(None) => {
306 std::thread::sleep(std::time::Duration::from_millis(100))
307 }
308 Err(_) => return false,
309 }
310 }
311 let _ = child.kill();
313 false
314 });
315
316 if !status {
317 return false;
318 }
319 }
320 Err(_) => {
321 wrkflw_logging::debug("Docker CLI is not available");
322 return false;
323 }
324 }
325 }
326
327 let runtime = match tokio::runtime::Builder::new_current_thread()
329 .enable_all()
330 .build()
331 {
332 Ok(rt) => rt,
333 Err(e) => {
334 wrkflw_logging::error(&format!(
335 "Failed to create runtime for Docker availability check: {}",
336 e
337 ));
338 return false;
339 }
340 };
341
342 runtime.block_on(async {
343 match tokio::time::timeout(std::time::Duration::from_secs(2), async {
344 match Docker::connect_with_local_defaults() {
345 Ok(docker) => {
346 match tokio::time::timeout(
348 std::time::Duration::from_secs(1),
349 docker.ping(),
350 )
351 .await
352 {
353 Ok(Ok(_)) => true,
354 Ok(Err(e)) => {
355 wrkflw_logging::debug(&format!(
356 "Docker daemon ping failed: {}",
357 e
358 ));
359 false
360 }
361 Err(_) => {
362 wrkflw_logging::debug(
363 "Docker daemon ping timed out after 1 second",
364 );
365 false
366 }
367 }
368 }
369 Err(e) => {
370 wrkflw_logging::debug(&format!(
371 "Docker daemon connection failed: {}",
372 e
373 ));
374 false
375 }
376 }
377 })
378 .await
379 {
380 Ok(result) => result,
381 Err(_) => {
382 wrkflw_logging::debug("Docker availability check timed out");
383 false
384 }
385 }
386 })
387 }) {
388 Ok(result) => result,
389 Err(_) => {
390 wrkflw_logging::debug(
391 "Failed to redirect stderr when checking Docker availability",
392 );
393 false
394 }
395 }
396 });
397
398 let start = std::time::Instant::now();
400
401 while start.elapsed() < overall_timeout {
402 if handle.is_finished() {
403 return match handle.join() {
404 Ok(result) => result,
405 Err(_) => {
406 wrkflw_logging::warning("Docker availability check thread panicked");
407 false
408 }
409 };
410 }
411 std::thread::sleep(std::time::Duration::from_millis(50));
412 }
413
414 wrkflw_logging::warning(
415 "Docker availability check timed out, assuming Docker is not available",
416 );
417 false
418}
419
420pub fn track_container(id: &str) {
422 if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
423 containers.push(id.to_string());
424 }
425}
426
427pub fn untrack_container(id: &str) {
429 if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
430 containers.retain(|c| c != id);
431 }
432}
433
434pub fn track_network(id: &str) {
436 if let Ok(mut networks) = CREATED_NETWORKS.lock() {
437 networks.push(id.to_string());
438 }
439}
440
441pub fn untrack_network(id: &str) {
443 if let Ok(mut networks) = CREATED_NETWORKS.lock() {
444 networks.retain(|n| n != id);
445 }
446}
447
448pub async fn cleanup_resources(docker: &Docker) {
450 let cleanup_timeout = std::time::Duration::from_secs(5);
452
453 match tokio::time::timeout(cleanup_timeout, async {
454 let (container_result, network_result) =
456 tokio::join!(cleanup_containers(docker), cleanup_networks(docker));
457
458 if let Err(e) = container_result {
459 wrkflw_logging::error(&format!("Error during container cleanup: {}", e));
460 }
461
462 if let Err(e) = network_result {
463 wrkflw_logging::error(&format!("Error during network cleanup: {}", e));
464 }
465 })
466 .await
467 {
468 Ok(_) => wrkflw_logging::debug("Docker cleanup completed within timeout"),
469 Err(_) => wrkflw_logging::warning(
470 "Docker cleanup timed out, some resources may not have been removed",
471 ),
472 }
473}
474
475pub async fn cleanup_containers(docker: &Docker) -> Result<(), String> {
477 let containers_to_cleanup =
479 match tokio::time::timeout(std::time::Duration::from_millis(500), async {
480 match RUNNING_CONTAINERS.try_lock() {
481 Ok(containers) => containers.clone(),
482 Err(_) => {
483 wrkflw_logging::error("Could not acquire container lock for cleanup");
484 vec![]
485 }
486 }
487 })
488 .await
489 {
490 Ok(containers) => containers,
491 Err(_) => {
492 wrkflw_logging::error("Timeout while trying to get containers for cleanup");
493 vec![]
494 }
495 };
496
497 if containers_to_cleanup.is_empty() {
498 return Ok(());
499 }
500
501 wrkflw_logging::info(&format!(
502 "Cleaning up {} containers",
503 containers_to_cleanup.len()
504 ));
505
506 for container_id in containers_to_cleanup {
508 match tokio::time::timeout(
510 std::time::Duration::from_millis(1000),
511 docker.stop_container(&container_id, None),
512 )
513 .await
514 {
515 Ok(Ok(_)) => wrkflw_logging::debug(&format!("Stopped container: {}", container_id)),
516 Ok(Err(e)) => wrkflw_logging::warning(&format!(
517 "Error stopping container {}: {}",
518 container_id, e
519 )),
520 Err(_) => {
521 wrkflw_logging::warning(&format!("Timeout stopping container: {}", container_id))
522 }
523 }
524
525 match tokio::time::timeout(
527 std::time::Duration::from_millis(1000),
528 docker.remove_container(&container_id, None),
529 )
530 .await
531 {
532 Ok(Ok(_)) => wrkflw_logging::debug(&format!("Removed container: {}", container_id)),
533 Ok(Err(e)) => wrkflw_logging::warning(&format!(
534 "Error removing container {}: {}",
535 container_id, e
536 )),
537 Err(_) => {
538 wrkflw_logging::warning(&format!("Timeout removing container: {}", container_id))
539 }
540 }
541
542 untrack_container(&container_id);
544 }
545
546 Ok(())
547}
548
549pub async fn cleanup_networks(docker: &Docker) -> Result<(), String> {
551 let networks_to_cleanup =
553 match tokio::time::timeout(std::time::Duration::from_millis(500), async {
554 match CREATED_NETWORKS.try_lock() {
555 Ok(networks) => networks.clone(),
556 Err(_) => {
557 wrkflw_logging::error("Could not acquire network lock for cleanup");
558 vec![]
559 }
560 }
561 })
562 .await
563 {
564 Ok(networks) => networks,
565 Err(_) => {
566 wrkflw_logging::error("Timeout while trying to get networks for cleanup");
567 vec![]
568 }
569 };
570
571 if networks_to_cleanup.is_empty() {
572 return Ok(());
573 }
574
575 wrkflw_logging::info(&format!(
576 "Cleaning up {} networks",
577 networks_to_cleanup.len()
578 ));
579
580 for network_id in networks_to_cleanup {
581 match tokio::time::timeout(
582 std::time::Duration::from_millis(1000),
583 docker.remove_network(&network_id),
584 )
585 .await
586 {
587 Ok(Ok(_)) => {
588 wrkflw_logging::info(&format!("Successfully removed network: {}", network_id))
589 }
590 Ok(Err(e)) => {
591 wrkflw_logging::error(&format!("Error removing network {}: {}", network_id, e))
592 }
593 Err(_) => wrkflw_logging::warning(&format!("Timeout removing network: {}", network_id)),
594 }
595
596 untrack_network(&network_id);
598 }
599
600 Ok(())
601}
602
603pub async fn create_job_network(docker: &Docker) -> Result<String, ContainerError> {
605 let network_name = format!("wrkflw-network-{}", uuid::Uuid::new_v4());
606
607 let options = CreateNetworkOptions {
608 name: network_name.clone(),
609 driver: "bridge".to_string(),
610 ..Default::default()
611 };
612
613 let network = docker
614 .create_network(options)
615 .await
616 .map_err(|e| ContainerError::NetworkCreation(e.to_string()))?;
617
618 let network_id = network.id.ok_or_else(|| {
620 ContainerError::NetworkOperation("Network created but no ID returned".to_string())
621 })?;
622
623 track_network(&network_id);
624 wrkflw_logging::info(&format!("Created Docker network: {}", network_id));
625
626 Ok(network_id)
627}
628
629#[async_trait]
630impl ContainerRuntime for DockerRuntime {
631 async fn run_container(
632 &self,
633 image: &str,
634 cmd: &[&str],
635 env_vars: &[(&str, &str)],
636 working_dir: &Path,
637 volumes: &[(&Path, &Path)],
638 ) -> Result<ContainerOutput, ContainerError> {
639 wrkflw_logging::info(&format!("Docker: Running container with image: {}", image));
641
642 let timeout_duration = std::time::Duration::from_secs(360); match tokio::time::timeout(
647 timeout_duration,
648 self.run_container_inner(image, cmd, env_vars, working_dir, volumes),
649 )
650 .await
651 {
652 Ok(result) => result,
653 Err(_) => {
654 wrkflw_logging::error("Docker operation timed out after 360 seconds");
655 Err(ContainerError::ContainerExecution(
656 "Operation timed out".to_string(),
657 ))
658 }
659 }
660 }
661
662 async fn pull_image(&self, image: &str) -> Result<(), ContainerError> {
663 let timeout_duration = std::time::Duration::from_secs(30);
665
666 match tokio::time::timeout(timeout_duration, self.pull_image_inner(image)).await {
667 Ok(result) => result,
668 Err(_) => {
669 wrkflw_logging::warning(&format!(
670 "Pull of image {} timed out, continuing with existing image",
671 image
672 ));
673 Ok(())
675 }
676 }
677 }
678
679 async fn build_image(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
680 let timeout_duration = std::time::Duration::from_secs(120); match tokio::time::timeout(timeout_duration, self.build_image_inner(dockerfile, tag)).await
684 {
685 Ok(result) => result,
686 Err(_) => {
687 wrkflw_logging::error(&format!(
688 "Building image {} timed out after 120 seconds",
689 tag
690 ));
691 Err(ContainerError::ImageBuild(
692 "Operation timed out".to_string(),
693 ))
694 }
695 }
696 }
697
698 async fn prepare_language_environment(
699 &self,
700 language: &str,
701 version: Option<&str>,
702 additional_packages: Option<Vec<String>>,
703 ) -> Result<String, ContainerError> {
704 let key = format!("{}-{}", language, version.unwrap_or("latest"));
706 if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
707 return Ok(customized_image);
708 }
709
710 let temp_dir = tempfile::tempdir().map_err(|e| {
712 ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
713 })?;
714
715 let dockerfile_path = temp_dir.path().join("Dockerfile");
716 let mut dockerfile_content = String::new();
717
718 match language {
720 "python" => {
721 let base_image =
722 version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
723 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
724 dockerfile_content.push_str(
725 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
726 );
727 dockerfile_content.push_str(" build-essential \\\n");
728 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
729
730 if let Some(packages) = additional_packages {
731 for package in packages {
732 dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
733 }
734 }
735 }
736 "node" => {
737 let base_image =
738 version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
739 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
740 dockerfile_content.push_str(
741 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
742 );
743 dockerfile_content.push_str(" build-essential \\\n");
744 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
745
746 if let Some(packages) = additional_packages {
747 for package in packages {
748 dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
749 }
750 }
751 }
752 "java" => {
753 let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
754 format!("eclipse-temurin:{}", v)
755 });
756 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
757 dockerfile_content.push_str(
758 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
759 );
760 dockerfile_content.push_str(" maven \\\n");
761 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
762 }
763 "go" => {
764 let base_image =
765 version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
766 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
767 dockerfile_content.push_str(
768 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
769 );
770 dockerfile_content.push_str(" git \\\n");
771 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
772
773 if let Some(packages) = additional_packages {
774 for package in packages {
775 dockerfile_content.push_str(&format!("RUN go install {}\n", package));
776 }
777 }
778 }
779 "dotnet" => {
780 let base_image = version
781 .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
782 format!("mcr.microsoft.com/dotnet/sdk:{}", v)
783 });
784 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
785
786 if let Some(packages) = additional_packages {
787 for package in packages {
788 dockerfile_content
789 .push_str(&format!("RUN dotnet tool install -g {}\n", package));
790 }
791 }
792 }
793 "rust" => {
794 let base_image =
795 version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
796 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
797 dockerfile_content.push_str(
798 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
799 );
800 dockerfile_content.push_str(" build-essential \\\n");
801 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
802
803 if let Some(packages) = additional_packages {
804 for package in packages {
805 dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
806 }
807 }
808 }
809 _ => {
810 return Err(ContainerError::ContainerStart(format!(
811 "Unsupported language: {}",
812 language
813 )));
814 }
815 }
816
817 std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
819 ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
820 })?;
821
822 let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
824 self.build_image(&dockerfile_path, &image_tag).await?;
825
826 Self::set_language_specific_image("", language, version, &image_tag);
828
829 Ok(image_tag)
830 }
831}
832
833impl DockerRuntime {
835 async fn run_container_inner(
836 &self,
837 image: &str,
838 cmd: &[&str],
839 env_vars: &[(&str, &str)],
840 working_dir: &Path,
841 volumes: &[(&Path, &Path)],
842 ) -> Result<ContainerOutput, ContainerError> {
843 if let Err(e) = self.pull_image_inner(image).await {
845 wrkflw_logging::warning(&format!(
846 "Failed to pull image {}: {}. Attempting to continue with existing image.",
847 image, e
848 ));
849 }
850
851 let mut env: Vec<String> = env_vars
853 .iter()
854 .map(|(k, v)| format!("{}={}", k, v))
855 .collect();
856
857 let mut binds = Vec::new();
858 for (host_path, container_path) in volumes {
859 binds.push(format!(
860 "{}:{}",
861 host_path.to_string_lossy(),
862 container_path.to_string_lossy()
863 ));
864 }
865
866 let cmd_vec: Vec<String> = cmd.iter().map(|&s| s.to_string()).collect();
868
869 wrkflw_logging::debug(&format!("Running command in Docker: {:?}", cmd_vec));
870 wrkflw_logging::debug(&format!("Environment: {:?}", env));
871 wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
872
873 let is_windows_image = image.contains("windows")
875 || image.contains("servercore")
876 || image.contains("nanoserver");
877 let is_macos_emu =
878 image.contains("act-") && (image.contains("catthehacker") || image.contains("nektos"));
879
880 if is_macos_emu {
882 env.push("RUNNER_OS=macOS".to_string());
884 env.push("RUNNER_ARCH=X64".to_string());
885 env.push("TMPDIR=/tmp".to_string());
886 env.push("HOME=/root".to_string());
887 env.push("GITHUB_WORKSPACE=/github/workspace".to_string());
888 env.push("PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_string());
889 }
890
891 let options = Some(CreateContainerOptions {
893 name: format!("wrkflw-{}", uuid::Uuid::new_v4()),
894 platform: if is_windows_image {
895 Some("windows".to_string())
896 } else {
897 None
898 },
899 });
900
901 let host_config = if is_windows_image {
903 HostConfig {
904 binds: Some(binds),
905 isolation: Some(bollard::models::HostConfigIsolationEnum::PROCESS),
906 ..Default::default()
907 }
908 } else {
909 HostConfig {
910 binds: Some(binds),
911 ..Default::default()
912 }
913 };
914
915 let mut config = Config {
917 image: Some(image.to_string()),
918 cmd: Some(cmd_vec),
919 env: Some(env),
920 working_dir: Some(working_dir.to_string_lossy().to_string()),
921 host_config: Some(host_config),
922 user: if is_windows_image {
924 Some("ContainerAdministrator".to_string())
925 } else {
926 None },
928 entrypoint: if is_macos_emu {
930 Some(vec!["bash".to_string(), "-l".to_string(), "-c".to_string()])
932 } else {
933 None
934 },
935 ..Default::default()
936 };
937
938 if is_macos_emu {
940 let mut labels = HashMap::new();
942 labels.insert("wrkflw.platform".to_string(), "macos".to_string());
943 config.labels = Some(labels);
944 }
945
946 let create_result = tokio::time::timeout(
948 std::time::Duration::from_secs(15),
949 self.docker.create_container(options, config),
950 )
951 .await;
952
953 let container = match create_result {
954 Ok(Ok(container)) => container,
955 Ok(Err(e)) => return Err(ContainerError::ContainerStart(e.to_string())),
956 Err(_) => {
957 return Err(ContainerError::ContainerStart(
958 "Container creation timed out".to_string(),
959 ))
960 }
961 };
962
963 track_container(&container.id);
965
966 let start_result = tokio::time::timeout(
968 std::time::Duration::from_secs(15),
969 self.docker.start_container::<String>(&container.id, None),
970 )
971 .await;
972
973 match start_result {
974 Ok(Ok(_)) => {}
975 Ok(Err(e)) => {
976 let _ = self.docker.remove_container(&container.id, None).await;
978 untrack_container(&container.id);
979 return Err(ContainerError::ContainerExecution(e.to_string()));
980 }
981 Err(_) => {
982 let _ = self.docker.remove_container(&container.id, None).await;
984 untrack_container(&container.id);
985 return Err(ContainerError::ContainerExecution(
986 "Container start timed out".to_string(),
987 ));
988 }
989 }
990
991 let wait_result = tokio::time::timeout(
993 std::time::Duration::from_secs(300),
994 self.docker
995 .wait_container::<String>(&container.id, None)
996 .collect::<Vec<_>>(),
997 )
998 .await;
999
1000 let exit_code = match wait_result {
1001 Ok(results) => match results.first() {
1002 Some(Ok(exit)) => exit.status_code as i32,
1003 _ => -1,
1004 },
1005 Err(_) => {
1006 wrkflw_logging::warning("Container wait operation timed out, treating as failure");
1007 -1
1008 }
1009 };
1010
1011 let logs_result = tokio::time::timeout(
1013 std::time::Duration::from_secs(10),
1014 self.docker
1015 .logs::<String>(&container.id, None)
1016 .collect::<Vec<_>>(),
1017 )
1018 .await;
1019
1020 let mut stdout = String::new();
1021 let mut stderr = String::new();
1022
1023 if let Ok(logs) = logs_result {
1024 for log in logs.into_iter().flatten() {
1025 match log {
1026 bollard::container::LogOutput::StdOut { message } => {
1027 stdout.push_str(&String::from_utf8_lossy(&message));
1028 }
1029 bollard::container::LogOutput::StdErr { message } => {
1030 stderr.push_str(&String::from_utf8_lossy(&message));
1031 }
1032 _ => {}
1033 }
1034 }
1035 } else {
1036 wrkflw_logging::warning("Retrieving container logs timed out");
1037 }
1038
1039 if exit_code == 0 || !self.preserve_containers_on_failure {
1041 let _ = tokio::time::timeout(
1042 std::time::Duration::from_secs(10),
1043 self.docker.remove_container(&container.id, None),
1044 )
1045 .await;
1046 untrack_container(&container.id);
1047 } else {
1048 wrkflw_logging::info(&format!(
1050 "Preserving container {} for debugging (exit code: {}). Use 'docker exec -it {} bash' to inspect.",
1051 container.id, exit_code, container.id
1052 ));
1053 untrack_container(&container.id);
1055 }
1056
1057 if exit_code != 0 {
1059 wrkflw_logging::info(&format!(
1060 "Docker command failed with exit code: {}",
1061 exit_code
1062 ));
1063 wrkflw_logging::debug(&format!("Failed command: {:?}", cmd));
1064 wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
1065 wrkflw_logging::debug(&format!("STDERR: {}", stderr));
1066 }
1067
1068 Ok(ContainerOutput {
1069 stdout,
1070 stderr,
1071 exit_code,
1072 })
1073 }
1074
1075 async fn pull_image_inner(&self, image: &str) -> Result<(), ContainerError> {
1076 let options = bollard::image::CreateImageOptions {
1077 from_image: image,
1078 ..Default::default()
1079 };
1080
1081 let mut stream = self.docker.create_image(Some(options), None, None);
1082
1083 while let Some(result) = stream.next().await {
1084 if let Err(e) = result {
1085 return Err(ContainerError::ImagePull(e.to_string()));
1086 }
1087 }
1088
1089 Ok(())
1090 }
1091
1092 async fn build_image_inner(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
1093 let _context_dir = dockerfile.parent().unwrap_or(Path::new("."));
1094
1095 let tar_buffer = {
1096 let mut tar_builder = tar::Builder::new(Vec::new());
1097
1098 if let Ok(file) = std::fs::File::open(dockerfile) {
1100 let mut header = tar::Header::new_gnu();
1101 let metadata = file.metadata().map_err(|e| {
1102 ContainerError::ContainerExecution(format!(
1103 "Failed to get file metadata: {}",
1104 e
1105 ))
1106 })?;
1107 let modified_time = metadata
1108 .modified()
1109 .map_err(|e| {
1110 ContainerError::ContainerExecution(format!(
1111 "Failed to get file modification time: {}",
1112 e
1113 ))
1114 })?
1115 .elapsed()
1116 .map_err(|e| {
1117 ContainerError::ContainerExecution(format!(
1118 "Failed to get elapsed time since modification: {}",
1119 e
1120 ))
1121 })?
1122 .as_secs();
1123 header.set_size(metadata.len());
1124 header.set_mode(0o644);
1125 header.set_mtime(modified_time);
1126 header.set_cksum();
1127
1128 tar_builder
1129 .append_data(&mut header, "Dockerfile", file)
1130 .map_err(|e| ContainerError::ImageBuild(e.to_string()))?;
1131 } else {
1132 return Err(ContainerError::ImageBuild(format!(
1133 "Cannot open Dockerfile at {}",
1134 dockerfile.display()
1135 )));
1136 }
1137
1138 tar_builder
1139 .into_inner()
1140 .map_err(|e| ContainerError::ImageBuild(e.to_string()))?
1141 };
1142
1143 let options = bollard::image::BuildImageOptions {
1144 dockerfile: "Dockerfile",
1145 t: tag,
1146 q: false,
1147 nocache: false,
1148 rm: true,
1149 ..Default::default()
1150 };
1151
1152 let mut stream = self
1153 .docker
1154 .build_image(options, None, Some(tar_buffer.into()));
1155
1156 while let Some(result) = stream.next().await {
1157 match result {
1158 Ok(_) => {
1159 }
1161 Err(e) => {
1162 return Err(ContainerError::ImageBuild(e.to_string()));
1163 }
1164 }
1165 }
1166
1167 Ok(())
1168 }
1169}
1170
1171#[cfg(test)]
1173pub fn get_tracked_containers() -> Vec<String> {
1174 if let Ok(containers) = RUNNING_CONTAINERS.lock() {
1175 containers.clone()
1176 } else {
1177 vec![]
1178 }
1179}
1180
1181#[cfg(test)]
1182pub fn get_tracked_networks() -> Vec<String> {
1183 if let Ok(networks) = CREATED_NETWORKS.lock() {
1184 networks.clone()
1185 } else {
1186 vec![]
1187 }
1188}