use super::types::{DockerPullResult, DockerStatus};
use chrono::Utc;
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio::sync::{Mutex, RwLock};
const CACHE_TTL_MS: u64 = 30_000;
const DEFAULT_TIMEOUT_MS: u64 = 5_000;
pub struct DockerDetector {
cached_status: Arc<RwLock<Option<DockerStatus>>>,
cached_at: Arc<RwLock<Instant>>,
refresh_lock: Arc<Mutex<()>>,
}
impl Default for DockerDetector {
fn default() -> Self {
Self::new()
}
}
impl DockerDetector {
pub fn new() -> Self {
Self {
cached_status: Arc::new(RwLock::new(None)),
cached_at: Arc::new(RwLock::new(Instant::now() - Duration::from_secs(3600))),
refresh_lock: Arc::new(Mutex::new(())),
}
}
async fn cached_status_if_fresh(&self, now: Instant) -> Option<DockerStatus> {
let cached = self.cached_status.read().await;
let cached_time = *self.cached_at.read().await;
cached.as_ref().and_then(|status| {
if now.duration_since(cached_time).as_millis() < CACHE_TTL_MS as u128 {
Some(status.clone())
} else {
None
}
})
}
pub async fn check_availability(&self, force_refresh: bool) -> DockerStatus {
self.check_availability_with_runner(force_refresh, |checked_at| async move {
self.run_docker_info(&checked_at).await
})
.await
}
async fn check_availability_with_runner<F, Fut>(
&self,
force_refresh: bool,
runner: F,
) -> DockerStatus
where
F: FnOnce(String) -> Fut,
Fut: std::future::Future<Output = DockerStatus>,
{
let started_at = Instant::now();
if !force_refresh {
if let Some(status) = self.cached_status_if_fresh(started_at).await {
return status;
}
let _refresh_guard = self.refresh_lock.lock().await;
let refreshed_at = Instant::now();
if let Some(status) = self.cached_status_if_fresh(refreshed_at).await {
return status;
}
let checked_at = Utc::now().to_rfc3339();
let status = runner(checked_at).await;
*self.cached_status.write().await = Some(status.clone());
*self.cached_at.write().await = refreshed_at;
return status;
}
let checked_at = Utc::now().to_rfc3339();
let status = runner(checked_at).await;
*self.cached_status.write().await = Some(status.clone());
*self.cached_at.write().await = started_at;
status
}
async fn run_docker_info(&self, checked_at: &str) -> DockerStatus {
let result = tokio::time::timeout(
Duration::from_millis(DEFAULT_TIMEOUT_MS),
docker_command()
.args(["info", "--format", "{{json .}}"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output(),
)
.await;
match result {
Ok(Ok(output)) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
let (version, api_version) = self.parse_docker_info(&stdout);
DockerStatus {
available: true,
daemon_running: true,
version,
api_version,
error: None,
checked_at: checked_at.to_string(),
}
}
Ok(Ok(output)) => {
let stderr = String::from_utf8_lossy(&output.stderr);
DockerStatus {
available: false,
daemon_running: false,
error: Some(stderr.to_string()),
checked_at: checked_at.to_string(),
..Default::default()
}
}
Ok(Err(e)) => DockerStatus {
available: false,
daemon_running: false,
error: Some(format!("Failed to run docker: {}", e)),
checked_at: checked_at.to_string(),
..Default::default()
},
Err(_) => DockerStatus {
available: false,
daemon_running: false,
error: Some("Docker command timed out".to_string()),
checked_at: checked_at.to_string(),
..Default::default()
},
}
}
fn parse_docker_info(&self, stdout: &str) -> (Option<String>, Option<String>) {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(stdout.trim()) {
let version = json
.get("ServerVersion")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let api_version = json
.get("ClientInfo")
.and_then(|c| c.get("ApiVersion"))
.and_then(|v| v.as_str())
.or_else(|| json.get("APIVersion").and_then(|v| v.as_str()))
.map(|s| s.to_string());
(version, api_version)
} else {
(None, None)
}
}
pub async fn is_image_available(&self, image: &str) -> bool {
let result = tokio::time::timeout(
Duration::from_millis(DEFAULT_TIMEOUT_MS),
docker_command()
.args(["images", "-q", image])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output(),
)
.await;
match result {
Ok(Ok(output)) if output.status.success() => {
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
}
_ => false,
}
}
pub async fn pull_image(&self, image: &str) -> DockerPullResult {
let result = tokio::time::timeout(
Duration::from_secs(10 * 60),
docker_command()
.args(["pull", image])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output(),
)
.await;
match result {
Ok(Ok(output)) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let combined = format!(
"{}{}",
stdout,
if stderr.is_empty() {
"".to_string()
} else {
format!("\n{}", stderr)
}
);
DockerPullResult {
ok: true,
image: image.to_string(),
output: Some(combined.trim().to_string()),
error: None,
}
}
Ok(Ok(output)) => {
let stderr = String::from_utf8_lossy(&output.stderr);
DockerPullResult {
ok: false,
image: image.to_string(),
output: None,
error: Some(stderr.to_string()),
}
}
Ok(Err(e)) => DockerPullResult {
ok: false,
image: image.to_string(),
output: None,
error: Some(format!("Failed to run docker pull: {}", e)),
},
Err(_) => DockerPullResult {
ok: false,
image: image.to_string(),
output: None,
error: Some("Docker pull timed out".to_string()),
},
}
}
}
fn docker_command() -> Command {
let mut command = Command::new("docker");
command.kill_on_drop(true);
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
command.as_std_mut().creation_flags(0x0800_0000);
}
command
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Notify;
#[tokio::test]
async fn check_availability_coalesces_concurrent_requests() {
let detector = DockerDetector::new();
let invocations = Arc::new(AtomicUsize::new(0));
let first_counter = invocations.clone();
let second_counter = invocations.clone();
let first = detector.check_availability_with_runner(false, move |checked_at| {
let counter = first_counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
DockerStatus {
available: true,
daemon_running: true,
checked_at,
..Default::default()
}
}
});
let second = detector.check_availability_with_runner(false, move |checked_at| {
let counter = second_counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
DockerStatus {
available: true,
daemon_running: true,
checked_at,
..Default::default()
}
}
});
let (left, right) = tokio::time::timeout(Duration::from_secs(1), async {
tokio::join!(first, second)
})
.await
.expect("concurrent availability checks should complete");
assert!(left.available);
assert!(right.available);
assert_eq!(invocations.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn force_refresh_bypasses_in_flight_probe() {
let detector = Arc::new(DockerDetector::new());
let invocations = Arc::new(AtomicUsize::new(0));
let background_started = Arc::new(Notify::new());
let background_release = Arc::new(Notify::new());
let background_detector = detector.clone();
let background_counter = invocations.clone();
let background_started_signal = background_started.clone();
let background_release_signal = background_release.clone();
let background = tokio::spawn(async move {
background_detector
.check_availability_with_runner(false, move |checked_at| {
let counter = background_counter.clone();
let started = background_started_signal.clone();
let release = background_release_signal.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
started.notify_waiters();
release.notified().await;
DockerStatus {
available: true,
daemon_running: true,
checked_at,
..Default::default()
}
}
})
.await
});
background_started.notified().await;
let refresh_detector = detector.clone();
let refresh_counter = invocations.clone();
let refresh = tokio::spawn(async move {
refresh_detector
.check_availability_with_runner(true, move |checked_at| {
let counter = refresh_counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
DockerStatus {
available: true,
daemon_running: true,
checked_at,
..Default::default()
}
}
})
.await
});
let refreshed = tokio::time::timeout(Duration::from_millis(50), refresh)
.await
.expect("force refresh should not wait for in-flight background probe")
.expect("force refresh task should complete");
assert!(refreshed.available);
background_release.notify_waiters();
let background = background.await.expect("background task should complete");
assert!(background.available);
assert_eq!(invocations.load(Ordering::SeqCst), 2);
}
}