use crate::api::{create_router, strip_api_version_prefix};
use crate::error::{DockerError, Result};
use crate::proxy::VsockConnector;
use arcbox_core::Runtime;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::net::UnixListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tower::{Layer, Service};
use tower_http::trace::TraceLayer;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub socket_path: PathBuf,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
socket_path: default_socket_path(),
}
}
}
fn default_socket_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("/tmp"))
.join(".arcbox")
.join("docker.sock")
}
pub struct DockerApiServer {
config: ServerConfig,
runtime: Arc<Runtime>,
}
impl DockerApiServer {
#[must_use]
pub const fn new(config: ServerConfig, runtime: Arc<Runtime>) -> Self {
Self { config, runtime }
}
#[must_use]
pub fn socket_path(&self) -> &Path {
&self.config.socket_path
}
pub async fn run(&self, shutdown: CancellationToken) -> Result<()> {
let _ = std::fs::remove_file(&self.config.socket_path);
if let Some(parent) = self.config.socket_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let listener = UnixListener::bind(&self.config.socket_path)
.map_err(|e| crate::error::DockerError::Server(e.to_string()))?;
tracing::info!(
"Docker API server listening on {}",
self.config.socket_path.display()
);
tracing::info!("Docker API backend: smart proxy to guest dockerd");
self.run_native_http(listener, shutdown).await
}
}
impl DockerApiServer {
async fn run_native_http(
&self,
listener: UnixListener,
shutdown: CancellationToken,
) -> Result<()> {
let connector = Arc::new(VsockConnector::new(Arc::clone(&self.runtime)));
let version_layer = tower::util::MapRequestLayer::new(strip_api_version_prefix);
let app = version_layer.layer(
create_router(Arc::clone(&self.runtime), connector).layer(TraceLayer::new_for_http()),
);
let mut connections = JoinSet::new();
loop {
let stream = tokio::select! {
result = listener.accept() => {
let (stream, _) = result.map_err(|e| DockerError::Server(e.to_string()))?;
stream
}
() = shutdown.cancelled() => {
tracing::info!("Docker API server shutting down, waiting for {} in-flight connection(s)", connections.len());
break;
}
};
let tower_service = app.clone();
connections.spawn(async move {
let hyper_service =
hyper::service::service_fn(move |request: hyper::Request<Incoming>| {
tower_service.clone().call(request)
});
if let Err(err) = http1::Builder::new()
.serve_connection(TokioIo::new(stream), hyper_service)
.with_upgrades()
.await
{
let err_str = err.to_string().to_lowercase();
if !err_str.contains("shutting down")
&& !err_str.contains("connection reset")
&& !err_str.contains("broken pipe")
&& !err_str.contains("connection closed")
&& !err_str.contains("incomplete")
{
tracing::error!("Error serving connection: {}", err);
}
}
});
}
while connections.join_next().await.is_some() {}
Ok(())
}
}