1use async_trait::async_trait;
2use bollard::{
3 container::{Config, CreateContainerOptions},
4 models::HostConfig,
5 network::CreateNetworkOptions,
6 Docker,
7};
8use futures_util::StreamExt;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::Mutex;
13use wrkflw_logging;
14use wrkflw_runtime::container::{ContainerError, ContainerOutput, ContainerRuntime};
15use wrkflw_utils;
16use wrkflw_utils::fd;
17
18static RUNNING_CONTAINERS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
19static CREATED_NETWORKS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
20#[allow(dead_code)]
22static CUSTOMIZED_IMAGES: Lazy<Mutex<HashMap<String, String>>> =
23 Lazy::new(|| Mutex::new(HashMap::new()));
24
25pub struct DockerRuntime {
26 docker: Docker,
27 preserve_containers_on_failure: bool,
28}
29
30impl DockerRuntime {
31 pub fn new() -> Result<Self, ContainerError> {
32 Self::new_with_config(false)
33 }
34
35 pub fn new_with_config(preserve_containers_on_failure: bool) -> Result<Self, ContainerError> {
36 let docker = Docker::connect_with_local_defaults().map_err(|e| {
37 ContainerError::ContainerStart(format!("Failed to connect to Docker: {}", e))
38 })?;
39
40 Ok(DockerRuntime {
41 docker,
42 preserve_containers_on_failure,
43 })
44 }
45
46 #[allow(dead_code)]
48 pub fn get_customized_image(base_image: &str, customization: &str) -> Option<String> {
49 let key = format!("{}:{}", base_image, customization);
50 match CUSTOMIZED_IMAGES.lock() {
51 Ok(images) => images.get(&key).cloned(),
52 Err(e) => {
53 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
54 None
55 }
56 }
57 }
58
59 #[allow(dead_code)]
60 pub fn set_customized_image(base_image: &str, customization: &str, new_image: &str) {
61 let key = format!("{}:{}", base_image, customization);
62 if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
63 images.insert(key, new_image.to_string());
64 }) {
65 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
66 }
67 }
68
69 #[allow(dead_code)]
71 pub fn find_customized_image_key(image: &str, prefix: &str) -> Option<String> {
72 let image_keys = match CUSTOMIZED_IMAGES.lock() {
73 Ok(keys) => keys,
74 Err(e) => {
75 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
76 return None;
77 }
78 };
79
80 for (key, _) in image_keys.iter() {
82 if key.starts_with(prefix) {
83 return Some(key.clone());
84 }
85 }
86
87 None
88 }
89
90 pub fn get_language_specific_image(
92 base_image: &str,
93 language: &str,
94 version: Option<&str>,
95 ) -> Option<String> {
96 let key = match (language, version) {
97 ("python", Some(ver)) => format!("python:{}", ver),
98 ("node", Some(ver)) => format!("node:{}", ver),
99 ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
100 ("go", Some(ver)) => format!("golang:{}", ver),
101 ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
102 ("rust", Some(ver)) => format!("rust:{}", ver),
103 (lang, Some(ver)) => format!("{}:{}", lang, ver),
104 (lang, None) => lang.to_string(),
105 };
106
107 match CUSTOMIZED_IMAGES.lock() {
108 Ok(images) => images.get(&key).cloned(),
109 Err(e) => {
110 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
111 None
112 }
113 }
114 }
115
116 pub fn set_language_specific_image(
118 base_image: &str,
119 language: &str,
120 version: Option<&str>,
121 new_image: &str,
122 ) {
123 let key = match (language, version) {
124 ("python", Some(ver)) => format!("python:{}", ver),
125 ("node", Some(ver)) => format!("node:{}", ver),
126 ("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
127 ("go", Some(ver)) => format!("golang:{}", ver),
128 ("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
129 ("rust", Some(ver)) => format!("rust:{}", ver),
130 (lang, Some(ver)) => format!("{}:{}", lang, ver),
131 (lang, None) => lang.to_string(),
132 };
133
134 if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
135 images.insert(key, new_image.to_string());
136 }) {
137 wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
138 }
139 }
140
141 #[allow(dead_code)]
143 pub async fn prepare_language_environment(
144 &self,
145 language: &str,
146 version: Option<&str>,
147 additional_packages: Option<Vec<String>>,
148 ) -> Result<String, ContainerError> {
149 let key = format!("{}-{}", language, version.unwrap_or("latest"));
151 if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
152 return Ok(customized_image);
153 }
154
155 let temp_dir = tempfile::tempdir().map_err(|e| {
157 ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
158 })?;
159
160 let dockerfile_path = temp_dir.path().join("Dockerfile");
161 let mut dockerfile_content = String::new();
162
163 match language {
165 "python" => {
166 let base_image =
167 version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
168 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
169 dockerfile_content.push_str(
170 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
171 );
172 dockerfile_content.push_str(" build-essential \\\n");
173 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
174
175 if let Some(packages) = additional_packages {
176 for package in packages {
177 dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
178 }
179 }
180 }
181 "node" => {
182 let base_image =
183 version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
184 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
185 dockerfile_content.push_str(
186 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
187 );
188 dockerfile_content.push_str(" build-essential \\\n");
189 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
190
191 if let Some(packages) = additional_packages {
192 for package in packages {
193 dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
194 }
195 }
196 }
197 "java" => {
198 let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
199 format!("eclipse-temurin:{}", v)
200 });
201 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
202 dockerfile_content.push_str(
203 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
204 );
205 dockerfile_content.push_str(" maven \\\n");
206 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
207 }
208 "go" => {
209 let base_image =
210 version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
211 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
212 dockerfile_content.push_str(
213 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
214 );
215 dockerfile_content.push_str(" git \\\n");
216 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
217
218 if let Some(packages) = additional_packages {
219 for package in packages {
220 dockerfile_content.push_str(&format!("RUN go install {}\n", package));
221 }
222 }
223 }
224 "dotnet" => {
225 let base_image = version
226 .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
227 format!("mcr.microsoft.com/dotnet/sdk:{}", v)
228 });
229 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
230
231 if let Some(packages) = additional_packages {
232 for package in packages {
233 dockerfile_content
234 .push_str(&format!("RUN dotnet tool install -g {}\n", package));
235 }
236 }
237 }
238 "rust" => {
239 let base_image =
240 version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
241 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
242 dockerfile_content.push_str(
243 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
244 );
245 dockerfile_content.push_str(" build-essential \\\n");
246 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
247
248 if let Some(packages) = additional_packages {
249 for package in packages {
250 dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
251 }
252 }
253 }
254 _ => {
255 return Err(ContainerError::ContainerStart(format!(
256 "Unsupported language: {}",
257 language
258 )));
259 }
260 }
261
262 std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
264 ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
265 })?;
266
267 let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
269 self.build_image(&dockerfile_path, &image_tag).await?;
270
271 Self::set_language_specific_image("", language, version, &image_tag);
273
274 Ok(image_tag)
275 }
276}
277
278pub fn is_available() -> bool {
279 let overall_timeout = std::time::Duration::from_secs(3);
281
282 let handle = std::thread::spawn(move || {
284 match fd::with_stderr_to_null(|| {
286 if cfg!(target_os = "linux") || cfg!(target_os = "macos") {
288 let process = std::process::Command::new("docker")
290 .arg("version")
291 .arg("--format")
292 .arg("{{.Server.Version}}")
293 .stdout(std::process::Stdio::null())
294 .stderr(std::process::Stdio::null())
295 .spawn();
296
297 match process {
298 Ok(mut child) => {
299 let status = std::thread::scope(|_| {
301 for _ in 0..10 {
303 match child.try_wait() {
304 Ok(Some(status)) => return status.success(),
305 Ok(None) => {
306 std::thread::sleep(std::time::Duration::from_millis(100))
307 }
308 Err(_) => return false,
309 }
310 }
311 let _ = child.kill();
313 false
314 });
315
316 if !status {
317 return false;
318 }
319 }
320 Err(_) => {
321 wrkflw_logging::debug("Docker CLI is not available");
322 return false;
323 }
324 }
325 }
326
327 let runtime = match tokio::runtime::Builder::new_current_thread()
329 .enable_all()
330 .build()
331 {
332 Ok(rt) => rt,
333 Err(e) => {
334 wrkflw_logging::error(&format!(
335 "Failed to create runtime for Docker availability check: {}",
336 e
337 ));
338 return false;
339 }
340 };
341
342 runtime.block_on(async {
343 match tokio::time::timeout(std::time::Duration::from_secs(2), async {
344 match Docker::connect_with_local_defaults() {
345 Ok(docker) => {
346 match tokio::time::timeout(
348 std::time::Duration::from_secs(1),
349 docker.ping(),
350 )
351 .await
352 {
353 Ok(Ok(_)) => true,
354 Ok(Err(e)) => {
355 wrkflw_logging::debug(&format!(
356 "Docker daemon ping failed: {}",
357 e
358 ));
359 false
360 }
361 Err(_) => {
362 wrkflw_logging::debug(
363 "Docker daemon ping timed out after 1 second",
364 );
365 false
366 }
367 }
368 }
369 Err(e) => {
370 wrkflw_logging::debug(&format!(
371 "Docker daemon connection failed: {}",
372 e
373 ));
374 false
375 }
376 }
377 })
378 .await
379 {
380 Ok(result) => result,
381 Err(_) => {
382 wrkflw_logging::debug("Docker availability check timed out");
383 false
384 }
385 }
386 })
387 }) {
388 Ok(result) => result,
389 Err(_) => {
390 wrkflw_logging::debug(
391 "Failed to redirect stderr when checking Docker availability",
392 );
393 false
394 }
395 }
396 });
397
398 let start = std::time::Instant::now();
400
401 while start.elapsed() < overall_timeout {
402 if handle.is_finished() {
403 return match handle.join() {
404 Ok(result) => result,
405 Err(_) => {
406 wrkflw_logging::warning("Docker availability check thread panicked");
407 false
408 }
409 };
410 }
411 std::thread::sleep(std::time::Duration::from_millis(50));
412 }
413
414 wrkflw_logging::warning(
415 "Docker availability check timed out, assuming Docker is not available",
416 );
417 false
418}
419
420pub fn track_container(id: &str) {
422 if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
423 containers.push(id.to_string());
424 }
425}
426
427pub fn untrack_container(id: &str) {
429 if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
430 containers.retain(|c| c != id);
431 }
432}
433
434pub fn track_network(id: &str) {
436 if let Ok(mut networks) = CREATED_NETWORKS.lock() {
437 networks.push(id.to_string());
438 }
439}
440
441pub fn untrack_network(id: &str) {
443 if let Ok(mut networks) = CREATED_NETWORKS.lock() {
444 networks.retain(|n| n != id);
445 }
446}
447
448pub async fn cleanup_resources(docker: &Docker) {
450 let cleanup_timeout = std::time::Duration::from_secs(5);
452
453 match tokio::time::timeout(cleanup_timeout, async {
454 let (container_result, network_result) =
456 tokio::join!(cleanup_containers(docker), cleanup_networks(docker));
457
458 if let Err(e) = container_result {
459 wrkflw_logging::error(&format!("Error during container cleanup: {}", e));
460 }
461
462 if let Err(e) = network_result {
463 wrkflw_logging::error(&format!("Error during network cleanup: {}", e));
464 }
465 })
466 .await
467 {
468 Ok(_) => wrkflw_logging::debug("Docker cleanup completed within timeout"),
469 Err(_) => wrkflw_logging::warning(
470 "Docker cleanup timed out, some resources may not have been removed",
471 ),
472 }
473}
474
475pub async fn cleanup_containers(docker: &Docker) -> Result<(), String> {
477 let containers_to_cleanup =
479 match tokio::time::timeout(std::time::Duration::from_millis(500), async {
480 match RUNNING_CONTAINERS.try_lock() {
481 Ok(containers) => containers.clone(),
482 Err(_) => {
483 wrkflw_logging::error("Could not acquire container lock for cleanup");
484 vec![]
485 }
486 }
487 })
488 .await
489 {
490 Ok(containers) => containers,
491 Err(_) => {
492 wrkflw_logging::error("Timeout while trying to get containers for cleanup");
493 vec![]
494 }
495 };
496
497 if containers_to_cleanup.is_empty() {
498 return Ok(());
499 }
500
501 wrkflw_logging::info(&format!(
502 "Cleaning up {} containers",
503 containers_to_cleanup.len()
504 ));
505
506 for container_id in containers_to_cleanup {
508 match tokio::time::timeout(
510 std::time::Duration::from_millis(1000),
511 docker.stop_container(&container_id, None),
512 )
513 .await
514 {
515 Ok(Ok(_)) => wrkflw_logging::debug(&format!("Stopped container: {}", container_id)),
516 Ok(Err(e)) => wrkflw_logging::warning(&format!(
517 "Error stopping container {}: {}",
518 container_id, e
519 )),
520 Err(_) => {
521 wrkflw_logging::warning(&format!("Timeout stopping container: {}", container_id))
522 }
523 }
524
525 match tokio::time::timeout(
527 std::time::Duration::from_millis(1000),
528 docker.remove_container(&container_id, None),
529 )
530 .await
531 {
532 Ok(Ok(_)) => wrkflw_logging::debug(&format!("Removed container: {}", container_id)),
533 Ok(Err(e)) => wrkflw_logging::warning(&format!(
534 "Error removing container {}: {}",
535 container_id, e
536 )),
537 Err(_) => {
538 wrkflw_logging::warning(&format!("Timeout removing container: {}", container_id))
539 }
540 }
541
542 untrack_container(&container_id);
544 }
545
546 Ok(())
547}
548
549pub async fn cleanup_networks(docker: &Docker) -> Result<(), String> {
551 let networks_to_cleanup =
553 match tokio::time::timeout(std::time::Duration::from_millis(500), async {
554 match CREATED_NETWORKS.try_lock() {
555 Ok(networks) => networks.clone(),
556 Err(_) => {
557 wrkflw_logging::error("Could not acquire network lock for cleanup");
558 vec![]
559 }
560 }
561 })
562 .await
563 {
564 Ok(networks) => networks,
565 Err(_) => {
566 wrkflw_logging::error("Timeout while trying to get networks for cleanup");
567 vec![]
568 }
569 };
570
571 if networks_to_cleanup.is_empty() {
572 return Ok(());
573 }
574
575 wrkflw_logging::info(&format!(
576 "Cleaning up {} networks",
577 networks_to_cleanup.len()
578 ));
579
580 for network_id in networks_to_cleanup {
581 match tokio::time::timeout(
582 std::time::Duration::from_millis(1000),
583 docker.remove_network(&network_id),
584 )
585 .await
586 {
587 Ok(Ok(_)) => {
588 wrkflw_logging::info(&format!("Successfully removed network: {}", network_id))
589 }
590 Ok(Err(e)) => {
591 wrkflw_logging::error(&format!("Error removing network {}: {}", network_id, e))
592 }
593 Err(_) => wrkflw_logging::warning(&format!("Timeout removing network: {}", network_id)),
594 }
595
596 untrack_network(&network_id);
598 }
599
600 Ok(())
601}
602
603pub async fn create_job_network(docker: &Docker) -> Result<String, ContainerError> {
605 let network_name = format!("wrkflw-network-{}", uuid::Uuid::new_v4());
606
607 let options = CreateNetworkOptions {
608 name: network_name.clone(),
609 driver: "bridge".to_string(),
610 ..Default::default()
611 };
612
613 let network = docker
614 .create_network(options)
615 .await
616 .map_err(|e| ContainerError::NetworkCreation(e.to_string()))?;
617
618 let network_id = network.id.ok_or_else(|| {
620 ContainerError::NetworkOperation("Network created but no ID returned".to_string())
621 })?;
622
623 track_network(&network_id);
624 wrkflw_logging::info(&format!("Created Docker network: {}", network_id));
625
626 Ok(network_id)
627}
628
629#[async_trait]
630impl ContainerRuntime for DockerRuntime {
631 async fn run_container(
632 &self,
633 image: &str,
634 cmd: &[&str],
635 env_vars: &[(&str, &str)],
636 working_dir: &Path,
637 volumes: &[(&Path, &Path)],
638 ) -> Result<ContainerOutput, ContainerError> {
639 wrkflw_logging::info(&format!("Docker: Running container with image: {}", image));
641
642 let timeout_duration = std::time::Duration::from_secs(360); match tokio::time::timeout(
647 timeout_duration,
648 self.run_container_inner(image, cmd, env_vars, working_dir, volumes),
649 )
650 .await
651 {
652 Ok(result) => result,
653 Err(_) => {
654 wrkflw_logging::error("Docker operation timed out after 360 seconds");
655 Err(ContainerError::ContainerExecution(
656 "Operation timed out".to_string(),
657 ))
658 }
659 }
660 }
661
662 async fn pull_image(&self, image: &str) -> Result<(), ContainerError> {
663 let timeout_duration = std::time::Duration::from_secs(30);
665
666 match tokio::time::timeout(timeout_duration, self.pull_image_inner(image)).await {
667 Ok(result) => result,
668 Err(_) => {
669 wrkflw_logging::warning(&format!(
670 "Pull of image {} timed out, continuing with existing image",
671 image
672 ));
673 Ok(())
675 }
676 }
677 }
678
679 async fn build_image(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
680 let timeout_duration = std::time::Duration::from_secs(120); match tokio::time::timeout(timeout_duration, self.build_image_inner(dockerfile, tag)).await
684 {
685 Ok(result) => result,
686 Err(_) => {
687 wrkflw_logging::error(&format!(
688 "Building image {} timed out after 120 seconds",
689 tag
690 ));
691 Err(ContainerError::ImageBuild(
692 "Operation timed out".to_string(),
693 ))
694 }
695 }
696 }
697
698 async fn prepare_language_environment(
699 &self,
700 language: &str,
701 version: Option<&str>,
702 additional_packages: Option<Vec<String>>,
703 ) -> Result<String, ContainerError> {
704 let key = format!("{}-{}", language, version.unwrap_or("latest"));
706 if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
707 return Ok(customized_image);
708 }
709
710 let temp_dir = tempfile::tempdir().map_err(|e| {
712 ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
713 })?;
714
715 let dockerfile_path = temp_dir.path().join("Dockerfile");
716 let mut dockerfile_content = String::new();
717
718 match language {
720 "python" => {
721 let base_image =
722 version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
723 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
724 dockerfile_content.push_str(
725 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
726 );
727 dockerfile_content.push_str(" build-essential \\\n");
728 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
729
730 if let Some(packages) = additional_packages {
731 for package in packages {
732 dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
733 }
734 }
735 }
736 "node" => {
737 let base_image =
738 version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
739 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
740 dockerfile_content.push_str(
741 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
742 );
743 dockerfile_content.push_str(" build-essential \\\n");
744 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
745
746 if let Some(packages) = additional_packages {
747 for package in packages {
748 dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
749 }
750 }
751 }
752 "java" => {
753 let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
754 format!("eclipse-temurin:{}", v)
755 });
756 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
757 dockerfile_content.push_str(
758 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
759 );
760 dockerfile_content.push_str(" maven \\\n");
761 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
762 }
763 "go" => {
764 let base_image =
765 version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
766 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
767 dockerfile_content.push_str(
768 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
769 );
770 dockerfile_content.push_str(" git \\\n");
771 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
772
773 if let Some(packages) = additional_packages {
774 for package in packages {
775 dockerfile_content.push_str(&format!("RUN go install {}\n", package));
776 }
777 }
778 }
779 "dotnet" => {
780 let base_image = version
781 .map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
782 format!("mcr.microsoft.com/dotnet/sdk:{}", v)
783 });
784 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
785
786 if let Some(packages) = additional_packages {
787 for package in packages {
788 dockerfile_content
789 .push_str(&format!("RUN dotnet tool install -g {}\n", package));
790 }
791 }
792 }
793 "rust" => {
794 let base_image =
795 version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
796 dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
797 dockerfile_content.push_str(
798 "RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
799 );
800 dockerfile_content.push_str(" build-essential \\\n");
801 dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
802
803 if let Some(packages) = additional_packages {
804 for package in packages {
805 dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
806 }
807 }
808 }
809 _ => {
810 return Err(ContainerError::ContainerStart(format!(
811 "Unsupported language: {}",
812 language
813 )));
814 }
815 }
816
817 std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
819 ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
820 })?;
821
822 let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
824 self.build_image(&dockerfile_path, &image_tag).await?;
825
826 Self::set_language_specific_image("", language, version, &image_tag);
828
829 Ok(image_tag)
830 }
831}
832
833impl DockerRuntime {
835 async fn run_container_inner(
836 &self,
837 image: &str,
838 cmd: &[&str],
839 env_vars: &[(&str, &str)],
840 working_dir: &Path,
841 volumes: &[(&Path, &Path)],
842 ) -> Result<ContainerOutput, ContainerError> {
843 let mut env: Vec<String> = env_vars
845 .iter()
846 .map(|(k, v)| format!("{}={}", k, v))
847 .collect();
848
849 let mut binds = Vec::new();
850 for (host_path, container_path) in volumes {
851 binds.push(format!(
852 "{}:{}",
853 host_path.to_string_lossy(),
854 container_path.to_string_lossy()
855 ));
856 }
857
858 let cmd_vec: Vec<String> = cmd.iter().map(|&s| s.to_string()).collect();
860
861 wrkflw_logging::debug(&format!("Running command in Docker: {:?}", cmd_vec));
862 wrkflw_logging::debug(&format!("Environment: {:?}", env));
863 wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
864
865 let is_windows_image = image.contains("windows")
867 || image.contains("servercore")
868 || image.contains("nanoserver");
869 let is_macos_emu =
870 image.contains("act-") && (image.contains("catthehacker") || image.contains("nektos"));
871
872 if is_macos_emu {
874 env.push("RUNNER_OS=macOS".to_string());
876 env.push("RUNNER_ARCH=X64".to_string());
877 env.push("TMPDIR=/tmp".to_string());
878 env.push("HOME=/root".to_string());
879 env.push("GITHUB_WORKSPACE=/github/workspace".to_string());
880 env.push("PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_string());
881 }
882
883 let options = Some(CreateContainerOptions {
885 name: format!("wrkflw-{}", uuid::Uuid::new_v4()),
886 platform: if is_windows_image {
887 Some("windows".to_string())
888 } else {
889 None
890 },
891 });
892
893 let host_config = if is_windows_image {
895 HostConfig {
896 binds: Some(binds),
897 isolation: Some(bollard::models::HostConfigIsolationEnum::PROCESS),
898 ..Default::default()
899 }
900 } else {
901 HostConfig {
902 binds: Some(binds),
903 ..Default::default()
904 }
905 };
906
907 let mut config = Config {
909 image: Some(image.to_string()),
910 cmd: Some(cmd_vec),
911 env: Some(env),
912 working_dir: Some(working_dir.to_string_lossy().to_string()),
913 host_config: Some(host_config),
914 user: if is_windows_image {
916 Some("ContainerAdministrator".to_string())
917 } else {
918 None },
920 entrypoint: if is_macos_emu {
922 Some(vec!["bash".to_string(), "-l".to_string(), "-c".to_string()])
924 } else {
925 None
926 },
927 ..Default::default()
928 };
929
930 if is_macos_emu {
932 let mut labels = HashMap::new();
934 labels.insert("wrkflw.platform".to_string(), "macos".to_string());
935 config.labels = Some(labels);
936 }
937
938 let create_result = tokio::time::timeout(
940 std::time::Duration::from_secs(15),
941 self.docker.create_container(options, config),
942 )
943 .await;
944
945 let container = match create_result {
946 Ok(Ok(container)) => container,
947 Ok(Err(e)) => return Err(ContainerError::ContainerStart(e.to_string())),
948 Err(_) => {
949 return Err(ContainerError::ContainerStart(
950 "Container creation timed out".to_string(),
951 ))
952 }
953 };
954
955 track_container(&container.id);
957
958 let start_result = tokio::time::timeout(
960 std::time::Duration::from_secs(15),
961 self.docker.start_container::<String>(&container.id, None),
962 )
963 .await;
964
965 match start_result {
966 Ok(Ok(_)) => {}
967 Ok(Err(e)) => {
968 let _ = self.docker.remove_container(&container.id, None).await;
970 untrack_container(&container.id);
971 return Err(ContainerError::ContainerExecution(e.to_string()));
972 }
973 Err(_) => {
974 let _ = self.docker.remove_container(&container.id, None).await;
976 untrack_container(&container.id);
977 return Err(ContainerError::ContainerExecution(
978 "Container start timed out".to_string(),
979 ));
980 }
981 }
982
983 let wait_result = tokio::time::timeout(
985 std::time::Duration::from_secs(300),
986 self.docker
987 .wait_container::<String>(&container.id, None)
988 .collect::<Vec<_>>(),
989 )
990 .await;
991
992 let exit_code = match wait_result {
993 Ok(results) => match results.first() {
994 Some(Ok(exit)) => exit.status_code as i32,
995 _ => -1,
996 },
997 Err(_) => {
998 wrkflw_logging::warning("Container wait operation timed out, treating as failure");
999 -1
1000 }
1001 };
1002
1003 let logs_result = tokio::time::timeout(
1005 std::time::Duration::from_secs(10),
1006 self.docker
1007 .logs::<String>(&container.id, None)
1008 .collect::<Vec<_>>(),
1009 )
1010 .await;
1011
1012 let mut stdout = String::new();
1013 let mut stderr = String::new();
1014
1015 if let Ok(logs) = logs_result {
1016 for log in logs.into_iter().flatten() {
1017 match log {
1018 bollard::container::LogOutput::StdOut { message } => {
1019 stdout.push_str(&String::from_utf8_lossy(&message));
1020 }
1021 bollard::container::LogOutput::StdErr { message } => {
1022 stderr.push_str(&String::from_utf8_lossy(&message));
1023 }
1024 _ => {}
1025 }
1026 }
1027 } else {
1028 wrkflw_logging::warning("Retrieving container logs timed out");
1029 }
1030
1031 if exit_code == 0 || !self.preserve_containers_on_failure {
1033 let _ = tokio::time::timeout(
1034 std::time::Duration::from_secs(10),
1035 self.docker.remove_container(&container.id, None),
1036 )
1037 .await;
1038 untrack_container(&container.id);
1039 } else {
1040 wrkflw_logging::info(&format!(
1042 "Preserving container {} for debugging (exit code: {}). Use 'docker exec -it {} bash' to inspect.",
1043 container.id, exit_code, container.id
1044 ));
1045 untrack_container(&container.id);
1047 }
1048
1049 if exit_code != 0 {
1051 wrkflw_logging::info(&format!(
1052 "Docker command failed with exit code: {}",
1053 exit_code
1054 ));
1055 wrkflw_logging::debug(&format!("Failed command: {:?}", cmd));
1056 wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
1057 wrkflw_logging::debug(&format!("STDERR: {}", stderr));
1058 }
1059
1060 Ok(ContainerOutput {
1061 stdout,
1062 stderr,
1063 exit_code,
1064 })
1065 }
1066
1067 async fn pull_image_inner(&self, image: &str) -> Result<(), ContainerError> {
1068 let options = bollard::image::CreateImageOptions {
1069 from_image: image,
1070 ..Default::default()
1071 };
1072
1073 let mut stream = self.docker.create_image(Some(options), None, None);
1074
1075 while let Some(result) = stream.next().await {
1076 if let Err(e) = result {
1077 return Err(ContainerError::ImagePull(e.to_string()));
1078 }
1079 }
1080
1081 Ok(())
1082 }
1083
1084 async fn build_image_inner(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
1085 let _context_dir = dockerfile.parent().unwrap_or(Path::new("."));
1086
1087 let tar_buffer = {
1088 let mut tar_builder = tar::Builder::new(Vec::new());
1089
1090 if let Ok(file) = std::fs::File::open(dockerfile) {
1092 let mut header = tar::Header::new_gnu();
1093 let metadata = file.metadata().map_err(|e| {
1094 ContainerError::ContainerExecution(format!(
1095 "Failed to get file metadata: {}",
1096 e
1097 ))
1098 })?;
1099 let modified_time = metadata
1100 .modified()
1101 .map_err(|e| {
1102 ContainerError::ContainerExecution(format!(
1103 "Failed to get file modification time: {}",
1104 e
1105 ))
1106 })?
1107 .elapsed()
1108 .map_err(|e| {
1109 ContainerError::ContainerExecution(format!(
1110 "Failed to get elapsed time since modification: {}",
1111 e
1112 ))
1113 })?
1114 .as_secs();
1115 header.set_size(metadata.len());
1116 header.set_mode(0o644);
1117 header.set_mtime(modified_time);
1118 header.set_cksum();
1119
1120 tar_builder
1121 .append_data(&mut header, "Dockerfile", file)
1122 .map_err(|e| ContainerError::ImageBuild(e.to_string()))?;
1123 } else {
1124 return Err(ContainerError::ImageBuild(format!(
1125 "Cannot open Dockerfile at {}",
1126 dockerfile.display()
1127 )));
1128 }
1129
1130 tar_builder
1131 .into_inner()
1132 .map_err(|e| ContainerError::ImageBuild(e.to_string()))?
1133 };
1134
1135 let options = bollard::image::BuildImageOptions {
1136 dockerfile: "Dockerfile",
1137 t: tag,
1138 q: false,
1139 nocache: false,
1140 rm: true,
1141 ..Default::default()
1142 };
1143
1144 let mut stream = self
1145 .docker
1146 .build_image(options, None, Some(tar_buffer.into()));
1147
1148 while let Some(result) = stream.next().await {
1149 match result {
1150 Ok(_) => {
1151 }
1153 Err(e) => {
1154 return Err(ContainerError::ImageBuild(e.to_string()));
1155 }
1156 }
1157 }
1158
1159 Ok(())
1160 }
1161}
1162
1163#[cfg(test)]
1165pub fn get_tracked_containers() -> Vec<String> {
1166 if let Ok(containers) = RUNNING_CONTAINERS.lock() {
1167 containers.clone()
1168 } else {
1169 vec![]
1170 }
1171}
1172
1173#[cfg(test)]
1174pub fn get_tracked_networks() -> Vec<String> {
1175 if let Ok(networks) = CREATED_NETWORKS.lock() {
1176 networks.clone()
1177 } else {
1178 vec![]
1179 }
1180}