use async_trait::async_trait;
use bollard::{
container::{Config, CreateContainerOptions},
models::HostConfig,
network::CreateNetworkOptions,
Docker,
};
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
use wrkflw_logging;
use wrkflw_runtime::container::{ContainerError, ContainerOutput, ContainerRuntime};
use wrkflw_utils;
use wrkflw_utils::fd;
static RUNNING_CONTAINERS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
static CREATED_NETWORKS: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(Vec::new()));
#[allow(dead_code)]
static CUSTOMIZED_IMAGES: Lazy<Mutex<HashMap<String, String>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub struct DockerRuntime {
docker: Docker,
preserve_containers_on_failure: bool,
}
impl DockerRuntime {
pub fn new() -> Result<Self, ContainerError> {
Self::new_with_config(false)
}
pub fn new_with_config(preserve_containers_on_failure: bool) -> Result<Self, ContainerError> {
let docker = Docker::connect_with_local_defaults().map_err(|e| {
ContainerError::ContainerStart(format!("Failed to connect to Docker: {}", e))
})?;
Ok(DockerRuntime {
docker,
preserve_containers_on_failure,
})
}
#[allow(dead_code)]
pub fn get_customized_image(base_image: &str, customization: &str) -> Option<String> {
let key = format!("{}:{}", base_image, customization);
match CUSTOMIZED_IMAGES.lock() {
Ok(images) => images.get(&key).cloned(),
Err(e) => {
wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
None
}
}
}
#[allow(dead_code)]
pub fn set_customized_image(base_image: &str, customization: &str, new_image: &str) {
let key = format!("{}:{}", base_image, customization);
if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
images.insert(key, new_image.to_string());
}) {
wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
}
}
#[allow(dead_code)]
pub fn find_customized_image_key(image: &str, prefix: &str) -> Option<String> {
let image_keys = match CUSTOMIZED_IMAGES.lock() {
Ok(keys) => keys,
Err(e) => {
wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
return None;
}
};
for (key, _) in image_keys.iter() {
if key.starts_with(prefix) {
return Some(key.clone());
}
}
None
}
pub fn get_language_specific_image(
base_image: &str,
language: &str,
version: Option<&str>,
) -> Option<String> {
let key = match (language, version) {
("python", Some(ver)) => format!("python:{}", ver),
("node", Some(ver)) => format!("node:{}", ver),
("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
("go", Some(ver)) => format!("golang:{}", ver),
("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
("rust", Some(ver)) => format!("rust:{}", ver),
(lang, Some(ver)) => format!("{}:{}", lang, ver),
(lang, None) => lang.to_string(),
};
match CUSTOMIZED_IMAGES.lock() {
Ok(images) => images.get(&key).cloned(),
Err(e) => {
wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
None
}
}
}
pub fn set_language_specific_image(
base_image: &str,
language: &str,
version: Option<&str>,
new_image: &str,
) {
let key = match (language, version) {
("python", Some(ver)) => format!("python:{}", ver),
("node", Some(ver)) => format!("node:{}", ver),
("java", Some(ver)) => format!("eclipse-temurin:{}", ver),
("go", Some(ver)) => format!("golang:{}", ver),
("dotnet", Some(ver)) => format!("mcr.microsoft.com/dotnet/sdk:{}", ver),
("rust", Some(ver)) => format!("rust:{}", ver),
(lang, Some(ver)) => format!("{}:{}", lang, ver),
(lang, None) => lang.to_string(),
};
if let Err(e) = CUSTOMIZED_IMAGES.lock().map(|mut images| {
images.insert(key, new_image.to_string());
}) {
wrkflw_logging::error(&format!("Failed to acquire lock: {}", e));
}
}
#[allow(dead_code)]
pub async fn prepare_language_environment(
&self,
language: &str,
version: Option<&str>,
additional_packages: Option<Vec<String>>,
) -> Result<String, ContainerError> {
let key = format!("{}-{}", language, version.unwrap_or("latest"));
if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
return Ok(customized_image);
}
let temp_dir = tempfile::tempdir().map_err(|e| {
ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
})?;
let dockerfile_path = temp_dir.path().join("Dockerfile");
let mut dockerfile_content = String::new();
match language {
"python" => {
let base_image =
version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
}
}
}
"node" => {
let base_image =
version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
}
}
}
"java" => {
let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
format!("eclipse-temurin:{}", v)
});
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" maven \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
}
"go" => {
let base_image =
version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" git \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN go install {}\n", package));
}
}
}
"dotnet" => {
let base_image = version
.map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
format!("mcr.microsoft.com/dotnet/sdk:{}", v)
});
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content
.push_str(&format!("RUN dotnet tool install -g {}\n", package));
}
}
}
"rust" => {
let base_image =
version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
}
}
}
_ => {
return Err(ContainerError::ContainerStart(format!(
"Unsupported language: {}",
language
)));
}
}
std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
})?;
let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
self.build_image(&dockerfile_path, &image_tag).await?;
Self::set_language_specific_image("", language, version, &image_tag);
Ok(image_tag)
}
}
pub fn is_available() -> bool {
let overall_timeout = std::time::Duration::from_secs(3);
let handle = std::thread::spawn(move || {
match fd::with_stderr_to_null(|| {
if cfg!(target_os = "linux") || cfg!(target_os = "macos") {
let process = std::process::Command::new("docker")
.arg("version")
.arg("--format")
.arg("{{.Server.Version}}")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn();
match process {
Ok(mut child) => {
let status = std::thread::scope(|_| {
for _ in 0..10 {
match child.try_wait() {
Ok(Some(status)) => return status.success(),
Ok(None) => {
std::thread::sleep(std::time::Duration::from_millis(100))
}
Err(_) => return false,
}
}
let _ = child.kill();
false
});
if !status {
return false;
}
}
Err(_) => {
wrkflw_logging::debug("Docker CLI is not available");
return false;
}
}
}
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
wrkflw_logging::error(&format!(
"Failed to create runtime for Docker availability check: {}",
e
));
return false;
}
};
runtime.block_on(async {
match tokio::time::timeout(std::time::Duration::from_secs(2), async {
match Docker::connect_with_local_defaults() {
Ok(docker) => {
match tokio::time::timeout(
std::time::Duration::from_secs(1),
docker.ping(),
)
.await
{
Ok(Ok(_)) => true,
Ok(Err(e)) => {
wrkflw_logging::debug(&format!(
"Docker daemon ping failed: {}",
e
));
false
}
Err(_) => {
wrkflw_logging::debug(
"Docker daemon ping timed out after 1 second",
);
false
}
}
}
Err(e) => {
wrkflw_logging::debug(&format!(
"Docker daemon connection failed: {}",
e
));
false
}
}
})
.await
{
Ok(result) => result,
Err(_) => {
wrkflw_logging::debug("Docker availability check timed out");
false
}
}
})
}) {
Ok(result) => result,
Err(_) => {
wrkflw_logging::debug(
"Failed to redirect stderr when checking Docker availability",
);
false
}
}
});
let start = std::time::Instant::now();
while start.elapsed() < overall_timeout {
if handle.is_finished() {
return match handle.join() {
Ok(result) => result,
Err(_) => {
wrkflw_logging::warning("Docker availability check thread panicked");
false
}
};
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
wrkflw_logging::warning(
"Docker availability check timed out, assuming Docker is not available",
);
false
}
pub fn track_container(id: &str) {
if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
containers.push(id.to_string());
}
}
pub fn untrack_container(id: &str) {
if let Ok(mut containers) = RUNNING_CONTAINERS.lock() {
containers.retain(|c| c != id);
}
}
pub fn track_network(id: &str) {
if let Ok(mut networks) = CREATED_NETWORKS.lock() {
networks.push(id.to_string());
}
}
pub fn untrack_network(id: &str) {
if let Ok(mut networks) = CREATED_NETWORKS.lock() {
networks.retain(|n| n != id);
}
}
pub async fn cleanup_resources(docker: &Docker) {
let cleanup_timeout = std::time::Duration::from_secs(5);
match tokio::time::timeout(cleanup_timeout, async {
let (container_result, network_result) =
tokio::join!(cleanup_containers(docker), cleanup_networks(docker));
if let Err(e) = container_result {
wrkflw_logging::error(&format!("Error during container cleanup: {}", e));
}
if let Err(e) = network_result {
wrkflw_logging::error(&format!("Error during network cleanup: {}", e));
}
})
.await
{
Ok(_) => wrkflw_logging::debug("Docker cleanup completed within timeout"),
Err(_) => wrkflw_logging::warning(
"Docker cleanup timed out, some resources may not have been removed",
),
}
}
pub async fn cleanup_containers(docker: &Docker) -> Result<(), String> {
let containers_to_cleanup =
match tokio::time::timeout(std::time::Duration::from_millis(500), async {
match RUNNING_CONTAINERS.try_lock() {
Ok(containers) => containers.clone(),
Err(_) => {
wrkflw_logging::error("Could not acquire container lock for cleanup");
vec![]
}
}
})
.await
{
Ok(containers) => containers,
Err(_) => {
wrkflw_logging::error("Timeout while trying to get containers for cleanup");
vec![]
}
};
if containers_to_cleanup.is_empty() {
return Ok(());
}
wrkflw_logging::info(&format!(
"Cleaning up {} containers",
containers_to_cleanup.len()
));
for container_id in containers_to_cleanup {
match tokio::time::timeout(
std::time::Duration::from_millis(1000),
docker.stop_container(&container_id, None),
)
.await
{
Ok(Ok(_)) => wrkflw_logging::debug(&format!("Stopped container: {}", container_id)),
Ok(Err(e)) => wrkflw_logging::warning(&format!(
"Error stopping container {}: {}",
container_id, e
)),
Err(_) => {
wrkflw_logging::warning(&format!("Timeout stopping container: {}", container_id))
}
}
match tokio::time::timeout(
std::time::Duration::from_millis(1000),
docker.remove_container(&container_id, None),
)
.await
{
Ok(Ok(_)) => wrkflw_logging::debug(&format!("Removed container: {}", container_id)),
Ok(Err(e)) => wrkflw_logging::warning(&format!(
"Error removing container {}: {}",
container_id, e
)),
Err(_) => {
wrkflw_logging::warning(&format!("Timeout removing container: {}", container_id))
}
}
untrack_container(&container_id);
}
Ok(())
}
pub async fn cleanup_networks(docker: &Docker) -> Result<(), String> {
let networks_to_cleanup =
match tokio::time::timeout(std::time::Duration::from_millis(500), async {
match CREATED_NETWORKS.try_lock() {
Ok(networks) => networks.clone(),
Err(_) => {
wrkflw_logging::error("Could not acquire network lock for cleanup");
vec![]
}
}
})
.await
{
Ok(networks) => networks,
Err(_) => {
wrkflw_logging::error("Timeout while trying to get networks for cleanup");
vec![]
}
};
if networks_to_cleanup.is_empty() {
return Ok(());
}
wrkflw_logging::info(&format!(
"Cleaning up {} networks",
networks_to_cleanup.len()
));
for network_id in networks_to_cleanup {
match tokio::time::timeout(
std::time::Duration::from_millis(1000),
docker.remove_network(&network_id),
)
.await
{
Ok(Ok(_)) => {
wrkflw_logging::info(&format!("Successfully removed network: {}", network_id))
}
Ok(Err(e)) => {
wrkflw_logging::error(&format!("Error removing network {}: {}", network_id, e))
}
Err(_) => wrkflw_logging::warning(&format!("Timeout removing network: {}", network_id)),
}
untrack_network(&network_id);
}
Ok(())
}
pub async fn create_job_network(docker: &Docker) -> Result<String, ContainerError> {
let network_name = format!("wrkflw-network-{}", uuid::Uuid::new_v4());
let options = CreateNetworkOptions {
name: network_name.clone(),
driver: "bridge".to_string(),
..Default::default()
};
let network = docker
.create_network(options)
.await
.map_err(|e| ContainerError::NetworkCreation(e.to_string()))?;
let network_id = network.id.ok_or_else(|| {
ContainerError::NetworkOperation("Network created but no ID returned".to_string())
})?;
track_network(&network_id);
wrkflw_logging::info(&format!("Created Docker network: {}", network_id));
Ok(network_id)
}
#[async_trait]
impl ContainerRuntime for DockerRuntime {
async fn run_container(
&self,
image: &str,
cmd: &[&str],
env_vars: &[(&str, &str)],
working_dir: &Path,
volumes: &[(&Path, &Path)],
) -> Result<ContainerOutput, ContainerError> {
wrkflw_logging::info(&format!("Docker: Running container with image: {}", image));
let timeout_duration = std::time::Duration::from_secs(360);
match tokio::time::timeout(
timeout_duration,
self.run_container_inner(image, cmd, env_vars, working_dir, volumes),
)
.await
{
Ok(result) => result,
Err(_) => {
wrkflw_logging::error("Docker operation timed out after 360 seconds");
Err(ContainerError::ContainerExecution(
"Operation timed out".to_string(),
))
}
}
}
async fn pull_image(&self, image: &str) -> Result<(), ContainerError> {
let timeout_duration = std::time::Duration::from_secs(30);
match tokio::time::timeout(timeout_duration, self.pull_image_inner(image)).await {
Ok(result) => result,
Err(_) => {
wrkflw_logging::warning(&format!(
"Pull of image {} timed out, continuing with existing image",
image
));
Ok(())
}
}
}
async fn build_image(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
let timeout_duration = std::time::Duration::from_secs(120);
match tokio::time::timeout(timeout_duration, self.build_image_inner(dockerfile, tag)).await
{
Ok(result) => result,
Err(_) => {
wrkflw_logging::error(&format!(
"Building image {} timed out after 120 seconds",
tag
));
Err(ContainerError::ImageBuild(
"Operation timed out".to_string(),
))
}
}
}
async fn prepare_language_environment(
&self,
language: &str,
version: Option<&str>,
additional_packages: Option<Vec<String>>,
) -> Result<String, ContainerError> {
let key = format!("{}-{}", language, version.unwrap_or("latest"));
if let Some(customized_image) = Self::get_language_specific_image("", language, version) {
return Ok(customized_image);
}
let temp_dir = tempfile::tempdir().map_err(|e| {
ContainerError::ContainerStart(format!("Failed to create temp directory: {}", e))
})?;
let dockerfile_path = temp_dir.path().join("Dockerfile");
let mut dockerfile_content = String::new();
match language {
"python" => {
let base_image =
version.map_or("python:3.11-slim".to_string(), |v| format!("python:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN pip install {}\n", package));
}
}
}
"node" => {
let base_image =
version.map_or("node:20-slim".to_string(), |v| format!("node:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN npm install -g {}\n", package));
}
}
}
"java" => {
let base_image = version.map_or("eclipse-temurin:17-jdk".to_string(), |v| {
format!("eclipse-temurin:{}", v)
});
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" maven \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
}
"go" => {
let base_image =
version.map_or("golang:1.21-slim".to_string(), |v| format!("golang:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" git \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN go install {}\n", package));
}
}
}
"dotnet" => {
let base_image = version
.map_or("mcr.microsoft.com/dotnet/sdk:7.0".to_string(), |v| {
format!("mcr.microsoft.com/dotnet/sdk:{}", v)
});
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content
.push_str(&format!("RUN dotnet tool install -g {}\n", package));
}
}
}
"rust" => {
let base_image =
version.map_or("rust:latest".to_string(), |v| format!("rust:{}", v));
dockerfile_content.push_str(&format!("FROM {}\n\n", base_image));
dockerfile_content.push_str(
"RUN apt-get update && apt-get install -y --no-install-recommends \\\n",
);
dockerfile_content.push_str(" build-essential \\\n");
dockerfile_content.push_str(" && rm -rf /var/lib/apt/lists/*\n");
if let Some(packages) = additional_packages {
for package in packages {
dockerfile_content.push_str(&format!("RUN cargo install {}\n", package));
}
}
}
_ => {
return Err(ContainerError::ContainerStart(format!(
"Unsupported language: {}",
language
)));
}
}
std::fs::write(&dockerfile_path, dockerfile_content).map_err(|e| {
ContainerError::ContainerStart(format!("Failed to write Dockerfile: {}", e))
})?;
let image_tag = format!("wrkflw-{}-{}", language, version.unwrap_or("latest"));
self.build_image(&dockerfile_path, &image_tag).await?;
Self::set_language_specific_image("", language, version, &image_tag);
Ok(image_tag)
}
}
impl DockerRuntime {
async fn run_container_inner(
&self,
image: &str,
cmd: &[&str],
env_vars: &[(&str, &str)],
working_dir: &Path,
volumes: &[(&Path, &Path)],
) -> Result<ContainerOutput, ContainerError> {
if let Err(e) = self.pull_image_inner(image).await {
wrkflw_logging::warning(&format!(
"Failed to pull image {}: {}. Attempting to continue with existing image.",
image, e
));
}
let mut env: Vec<String> = env_vars
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
let mut binds = Vec::new();
for (host_path, container_path) in volumes {
binds.push(format!(
"{}:{}",
host_path.to_string_lossy(),
container_path.to_string_lossy()
));
}
let cmd_vec: Vec<String> = cmd.iter().map(|&s| s.to_string()).collect();
wrkflw_logging::debug(&format!("Running command in Docker: {:?}", cmd_vec));
wrkflw_logging::debug(&format!("Environment: {:?}", env));
wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
let is_windows_image = image.contains("windows")
|| image.contains("servercore")
|| image.contains("nanoserver");
let is_macos_emu =
image.contains("act-") && (image.contains("catthehacker") || image.contains("nektos"));
if is_macos_emu {
env.push("RUNNER_OS=macOS".to_string());
env.push("RUNNER_ARCH=X64".to_string());
env.push("TMPDIR=/tmp".to_string());
env.push("HOME=/root".to_string());
env.push("GITHUB_WORKSPACE=/github/workspace".to_string());
env.push("PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_string());
}
let options = Some(CreateContainerOptions {
name: format!("wrkflw-{}", uuid::Uuid::new_v4()),
platform: if is_windows_image {
Some("windows".to_string())
} else {
None
},
});
let host_config = if is_windows_image {
HostConfig {
binds: Some(binds),
isolation: Some(bollard::models::HostConfigIsolationEnum::PROCESS),
..Default::default()
}
} else {
HostConfig {
binds: Some(binds),
..Default::default()
}
};
let mut config = Config {
image: Some(image.to_string()),
cmd: Some(cmd_vec),
env: Some(env),
working_dir: Some(working_dir.to_string_lossy().to_string()),
host_config: Some(host_config),
user: if is_windows_image {
Some("ContainerAdministrator".to_string())
} else {
None },
entrypoint: if is_macos_emu {
Some(vec!["bash".to_string(), "-l".to_string(), "-c".to_string()])
} else {
None
},
..Default::default()
};
if is_macos_emu {
let mut labels = HashMap::new();
labels.insert("wrkflw.platform".to_string(), "macos".to_string());
config.labels = Some(labels);
}
let create_result = tokio::time::timeout(
std::time::Duration::from_secs(15),
self.docker.create_container(options, config),
)
.await;
let container = match create_result {
Ok(Ok(container)) => container,
Ok(Err(e)) => return Err(ContainerError::ContainerStart(e.to_string())),
Err(_) => {
return Err(ContainerError::ContainerStart(
"Container creation timed out".to_string(),
))
}
};
track_container(&container.id);
let start_result = tokio::time::timeout(
std::time::Duration::from_secs(15),
self.docker.start_container::<String>(&container.id, None),
)
.await;
match start_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
let _ = self.docker.remove_container(&container.id, None).await;
untrack_container(&container.id);
return Err(ContainerError::ContainerExecution(e.to_string()));
}
Err(_) => {
let _ = self.docker.remove_container(&container.id, None).await;
untrack_container(&container.id);
return Err(ContainerError::ContainerExecution(
"Container start timed out".to_string(),
));
}
}
let wait_result = tokio::time::timeout(
std::time::Duration::from_secs(300),
self.docker
.wait_container::<String>(&container.id, None)
.collect::<Vec<_>>(),
)
.await;
let exit_code = match wait_result {
Ok(results) => match results.first() {
Some(Ok(exit)) => exit.status_code as i32,
_ => -1,
},
Err(_) => {
wrkflw_logging::warning("Container wait operation timed out, treating as failure");
-1
}
};
let logs_result = tokio::time::timeout(
std::time::Duration::from_secs(10),
self.docker
.logs::<String>(&container.id, None)
.collect::<Vec<_>>(),
)
.await;
let mut stdout = String::new();
let mut stderr = String::new();
if let Ok(logs) = logs_result {
for log in logs.into_iter().flatten() {
match log {
bollard::container::LogOutput::StdOut { message } => {
stdout.push_str(&String::from_utf8_lossy(&message));
}
bollard::container::LogOutput::StdErr { message } => {
stderr.push_str(&String::from_utf8_lossy(&message));
}
_ => {}
}
}
} else {
wrkflw_logging::warning("Retrieving container logs timed out");
}
if exit_code == 0 || !self.preserve_containers_on_failure {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(10),
self.docker.remove_container(&container.id, None),
)
.await;
untrack_container(&container.id);
} else {
wrkflw_logging::info(&format!(
"Preserving container {} for debugging (exit code: {}). Use 'docker exec -it {} bash' to inspect.",
container.id, exit_code, container.id
));
untrack_container(&container.id);
}
if exit_code != 0 {
wrkflw_logging::info(&format!(
"Docker command failed with exit code: {}",
exit_code
));
wrkflw_logging::debug(&format!("Failed command: {:?}", cmd));
wrkflw_logging::debug(&format!("Working directory: {}", working_dir.display()));
wrkflw_logging::debug(&format!("STDERR: {}", stderr));
}
Ok(ContainerOutput {
stdout,
stderr,
exit_code,
})
}
async fn pull_image_inner(&self, image: &str) -> Result<(), ContainerError> {
let options = bollard::image::CreateImageOptions {
from_image: image,
..Default::default()
};
let mut stream = self.docker.create_image(Some(options), None, None);
while let Some(result) = stream.next().await {
if let Err(e) = result {
return Err(ContainerError::ImagePull(e.to_string()));
}
}
Ok(())
}
async fn build_image_inner(&self, dockerfile: &Path, tag: &str) -> Result<(), ContainerError> {
let _context_dir = dockerfile.parent().unwrap_or(Path::new("."));
let tar_buffer = {
let mut tar_builder = tar::Builder::new(Vec::new());
if let Ok(file) = std::fs::File::open(dockerfile) {
let mut header = tar::Header::new_gnu();
let metadata = file.metadata().map_err(|e| {
ContainerError::ContainerExecution(format!(
"Failed to get file metadata: {}",
e
))
})?;
let modified_time = metadata
.modified()
.map_err(|e| {
ContainerError::ContainerExecution(format!(
"Failed to get file modification time: {}",
e
))
})?
.elapsed()
.map_err(|e| {
ContainerError::ContainerExecution(format!(
"Failed to get elapsed time since modification: {}",
e
))
})?
.as_secs();
header.set_size(metadata.len());
header.set_mode(0o644);
header.set_mtime(modified_time);
header.set_cksum();
tar_builder
.append_data(&mut header, "Dockerfile", file)
.map_err(|e| ContainerError::ImageBuild(e.to_string()))?;
} else {
return Err(ContainerError::ImageBuild(format!(
"Cannot open Dockerfile at {}",
dockerfile.display()
)));
}
tar_builder
.into_inner()
.map_err(|e| ContainerError::ImageBuild(e.to_string()))?
};
let options = bollard::image::BuildImageOptions {
dockerfile: "Dockerfile",
t: tag,
q: false,
nocache: false,
rm: true,
..Default::default()
};
let mut stream = self
.docker
.build_image(options, None, Some(tar_buffer.into()));
while let Some(result) = stream.next().await {
match result {
Ok(_) => {
}
Err(e) => {
return Err(ContainerError::ImageBuild(e.to_string()));
}
}
}
Ok(())
}
}
#[cfg(test)]
pub fn get_tracked_containers() -> Vec<String> {
if let Ok(containers) = RUNNING_CONTAINERS.lock() {
containers.clone()
} else {
vec![]
}
}
#[cfg(test)]
pub fn get_tracked_networks() -> Vec<String> {
if let Ok(networks) = CREATED_NETWORKS.lock() {
networks.clone()
} else {
vec![]
}
}