use std::net::Ipv4Addr;
use std::path::Path;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio_util::sync::CancellationToken;
use crate::dns_server::GuestDnsServer;
const DOCKER_SOCK: &str = "/var/run/docker.sock";
const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
pub async fn reconcile_and_watch(dns: &GuestDnsServer, cancel: CancellationToken) {
loop {
while !Path::new(DOCKER_SOCK).exists() {
tokio::select! {
() = cancel.cancelled() => return,
() = tokio::time::sleep(RETRY_DELAY) => {}
}
}
match run_once(dns, &cancel).await {
Ok(()) => return, Err(e) => {
tracing::warn!(error = %e, "docker event listener disconnected, retrying");
tokio::select! {
() = cancel.cancelled() => return,
() = tokio::time::sleep(RETRY_DELAY) => {}
}
}
}
}
}
async fn run_once(dns: &GuestDnsServer, cancel: &CancellationToken) -> anyhow::Result<()> {
if let Err(e) = reconcile_existing(dns).await {
tracing::warn!(error = %e, "initial container reconciliation failed");
}
let stream = UnixStream::connect(DOCKER_SOCK).await?;
let (reader, mut writer) = tokio::io::split(stream);
let request = "GET /events?filters=%7B%22type%22%3A%5B%22container%22%5D%7D HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: keep-alive\r\n\r\n";
writer.write_all(request.as_bytes()).await?;
let mut lines = BufReader::new(reader).lines();
loop {
let Some(line) = read_line_or_cancel(&mut lines, cancel).await? else {
return Ok(());
};
if line.is_empty() {
break; }
}
loop {
let Some(line) = read_line_or_cancel(&mut lines, cancel).await? else {
return Ok(());
};
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
continue;
}
if let Err(e) = handle_event(dns, trimmed).await {
tracing::debug!(error = %e, "failed to process docker event");
}
}
}
async fn read_line_or_cancel(
lines: &mut tokio::io::Lines<BufReader<tokio::io::ReadHalf<UnixStream>>>,
cancel: &CancellationToken,
) -> anyhow::Result<Option<String>> {
tokio::select! {
() = cancel.cancelled() => Ok(None),
result = lines.next_line() => {
match result? {
Some(line) => Ok(Some(line)),
None => anyhow::bail!("event stream ended"),
}
}
}
}
async fn reconcile_existing(dns: &GuestDnsServer) -> anyhow::Result<()> {
let containers = docker_get("/containers/json").await?;
let arr = containers
.as_array()
.ok_or_else(|| anyhow::anyhow!("expected array"))?;
for c in arr {
let Some(id) = c["Id"].as_str() else {
continue;
};
let short_id = &id[..12.min(id.len())];
if let Err(e) = register_container_by_id(dns, short_id).await {
tracing::debug!(id = short_id, error = %e, "failed to register container");
}
}
tracing::info!(count = arr.len(), "reconciled existing containers");
Ok(())
}
async fn handle_event(dns: &GuestDnsServer, json_str: &str) -> anyhow::Result<()> {
let event: serde_json::Value = serde_json::from_str(json_str)?;
let action = event["Action"].as_str().unwrap_or_default();
let id_full = event["Actor"]["ID"].as_str().unwrap_or_default();
let id = &id_full[..12.min(id_full.len())];
match action {
"start" => {
register_container_by_id(dns, id).await?;
}
"die" | "destroy" => {
let attrs = &event["Actor"]["Attributes"];
let name = attrs["name"].as_str().unwrap_or_default();
if !name.is_empty() {
let compose = attrs.as_object().and_then(crate::dns::extract_compose_info);
for alias in crate::dns::collect_aliases(name, compose.as_ref()) {
dns.deregister_container(&alias, name).await;
}
}
}
"rename" => {
let attrs = &event["Actor"]["Attributes"];
let old_name = attrs["oldName"]
.as_str()
.unwrap_or_default()
.trim_start_matches('/');
if !old_name.is_empty() {
let compose = attrs.as_object().and_then(crate::dns::extract_compose_info);
for alias in crate::dns::collect_aliases(old_name, compose.as_ref()) {
dns.deregister_container(&alias, old_name).await;
}
}
register_container_by_id(dns, id).await?;
}
_ => {}
}
Ok(())
}
async fn register_container_by_id(dns: &GuestDnsServer, id: &str) -> anyhow::Result<()> {
let info = docker_get(&format!("/containers/{id}/json")).await?;
let name = info["Name"]
.as_str()
.unwrap_or_default()
.trim_start_matches('/');
if name.is_empty() {
return Ok(());
}
let networks = &info["NetworkSettings"]["Networks"];
let ip = networks
.as_object()
.and_then(|nets| {
nets.values().find_map(|net| {
let ip_str = net["IPAddress"].as_str()?;
ip_str.parse::<Ipv4Addr>().ok()
})
})
.ok_or_else(|| anyhow::anyhow!("no IP for container {name}"))?;
let compose = info["Config"]["Labels"]
.as_object()
.and_then(crate::dns::extract_compose_info);
let aliases = crate::dns::collect_aliases(name, compose.as_ref());
for alias in &aliases {
dns.register_container(alias, name, ip).await;
}
tracing::debug!(name, %ip, aliases = ?aliases, "registered container DNS");
Ok(())
}
async fn docker_get(path: &str) -> anyhow::Result<serde_json::Value> {
let stream = UnixStream::connect(DOCKER_SOCK).await?;
let (reader, mut writer) = tokio::io::split(stream);
let request = format!("GET {path} HTTP/1.0\r\nHost: localhost\r\n\r\n");
writer.write_all(request.as_bytes()).await?;
writer.shutdown().await?;
let mut buf = Vec::new();
tokio::io::AsyncReadExt::read_to_end(&mut BufReader::new(reader), &mut buf).await?;
let body_start = buf
.windows(4)
.position(|w| w == b"\r\n\r\n")
.map_or(0, |p| p + 4);
let body = &buf[body_start..];
Ok(serde_json::from_slice(body)?)
}