use anyhow::{Context, Result};
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::{Method, Request};
use hyper_util::rt::TokioIo;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::path::{Path, PathBuf};
use tokio::net::UnixStream;
pub const DEFAULT_SOCKET_PATH: &str = "/var/run/arcbox.sock";
pub struct DaemonClient {
socket_path: PathBuf,
}
impl DaemonClient {
pub fn new() -> Self {
Self {
socket_path: PathBuf::from(DEFAULT_SOCKET_PATH),
}
}
pub fn with_socket(path: impl AsRef<Path>) -> Self {
Self {
socket_path: path.as_ref().to_path_buf(),
}
}
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
pub async fn is_running(&self) -> bool {
self.ping().await.is_ok()
}
pub async fn ping(&self) -> Result<()> {
let _: serde_json::Value = self.get("/_ping").await?;
Ok(())
}
pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let body = self.request(Method::GET, path, None::<()>).await?;
serde_json::from_slice(&body).context("failed to parse response")
}
pub async fn get_raw(&self, path: &str) -> Result<Vec<u8>> {
let body = self.request(Method::GET, path, None::<()>).await?;
Ok(body.to_vec())
}
pub async fn post<T: DeserializeOwned, B: Serialize>(
&self,
path: &str,
body: Option<B>,
) -> Result<T> {
let body = self.request(Method::POST, path, body).await?;
serde_json::from_slice(&body).context("failed to parse response")
}
pub async fn post_empty<B: Serialize>(&self, path: &str, body: Option<B>) -> Result<()> {
self.request(Method::POST, path, body).await?;
Ok(())
}
pub async fn post_raw<B: Serialize>(&self, path: &str, body: Option<B>) -> Result<Vec<u8>> {
let body = self.request(Method::POST, path, body).await?;
Ok(body.to_vec())
}
pub async fn delete(&self, path: &str) -> Result<()> {
self.request(Method::DELETE, path, None::<()>).await?;
Ok(())
}
pub async fn stream_logs<F>(&self, path: &str, mut callback: F) -> Result<()>
where
F: FnMut(&[u8]),
{
self.stream_logs_with_cancel(path, &mut callback, tokio_util::sync::CancellationToken::new()).await
}
pub async fn stream_logs_with_cancel<F>(
&self,
path: &str,
callback: &mut F,
cancel_token: tokio_util::sync::CancellationToken,
) -> Result<()>
where
F: FnMut(&[u8]),
{
use std::io::{stdout, Write};
let stream = UnixStream::connect(&self.socket_path)
.await
.with_context(|| {
format!(
"failed to connect to daemon at {}",
self.socket_path.display()
)
})?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
.await
.context("HTTP handshake failed")?;
tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::debug!("Log stream connection closed: {}", e);
}
});
let request = Request::builder()
.method(Method::GET)
.uri(format!("http://localhost{}", path))
.header("Host", "localhost")
.body(Full::new(Bytes::new()))
.context("failed to build request")?;
let response = sender
.send_request(request)
.await
.context("failed to send request")?;
let status = response.status();
if !status.is_success() {
anyhow::bail!("daemon returned error {}", status);
}
let mut body = response.into_body();
let mut buffer = Vec::with_capacity(4096);
let mut frames_processed = 0u64;
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
tracing::debug!("Log stream cancelled after {} frames", frames_processed);
break;
}
frame_result = body.frame() => {
match frame_result {
Some(Ok(frame)) => {
if let Some(data) = frame.data_ref() {
buffer.extend_from_slice(data);
while let Some((stream_type, content)) = extract_log_frame(&buffer) {
let frame_size = 8 + content.len();
let _ = stream_type; callback(content);
frames_processed += 1;
buffer.drain(..frame_size);
}
if frames_processed % 10 == 0 {
stdout().flush().ok();
}
}
}
Some(Err(e)) => {
tracing::debug!("Error reading log frame: {}", e);
break;
}
None => {
break;
}
}
}
}
}
while let Some((_, content)) = extract_log_frame(&buffer) {
let frame_size = 8 + content.len();
callback(content);
buffer.drain(..frame_size);
}
if !buffer.is_empty() {
if buffer.len() < 8 || buffer[0] > 2 {
callback(&buffer);
}
}
stdout().flush().ok();
tracing::debug!("Log stream completed after {} frames", frames_processed);
Ok(())
}
async fn request<B: Serialize>(
&self,
method: Method,
path: &str,
body: Option<B>,
) -> Result<Bytes> {
let stream = UnixStream::connect(&self.socket_path)
.await
.with_context(|| {
format!(
"failed to connect to daemon at {}",
self.socket_path.display()
)
})?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
.await
.context("HTTP handshake failed")?;
tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::error!("Connection error: {}", e);
}
});
let request = if let Some(body) = body {
let body_bytes = serde_json::to_vec(&body).context("failed to serialize body")?;
Request::builder()
.method(method)
.uri(format!("http://localhost{}", path))
.header("Host", "localhost")
.header("Content-Type", "application/json")
.header("Content-Length", body_bytes.len())
.body(Full::new(Bytes::from(body_bytes)))
.context("failed to build request")?
} else {
Request::builder()
.method(method)
.uri(format!("http://localhost{}", path))
.header("Host", "localhost")
.body(Full::new(Bytes::new()))
.context("failed to build request")?
};
let response = sender
.send_request(request)
.await
.context("failed to send request")?;
let status = response.status();
let body = response
.into_body()
.collect()
.await
.context("failed to read response")?
.to_bytes();
if !status.is_success() {
let error_msg = String::from_utf8_lossy(&body);
anyhow::bail!("daemon returned error {}: {}", status, error_msg);
}
Ok(body)
}
}
impl Default for DaemonClient {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ContainerSummary {
pub id: String,
pub names: Vec<String>,
pub image: String,
pub image_id: String,
pub command: String,
pub created: i64,
pub state: String,
pub status: String,
}
#[derive(Debug, Clone, Default, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct CreateContainerRequest {
pub image: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub cmd: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub entrypoint: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub env: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub working_dir: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
pub tty: bool,
pub open_stdin: bool,
pub attach_stdin: bool,
pub attach_stdout: bool,
pub attach_stderr: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub host_config: Option<HostConfig>,
}
#[derive(Debug, Clone, Default, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct HostConfig {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub binds: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub port_bindings: Option<std::collections::HashMap<String, Vec<PortBinding>>>,
pub auto_remove: bool,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct PortBinding {
pub host_ip: String,
pub host_port: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct CreateContainerResponse {
pub id: String,
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ContainerInspect {
pub id: String,
pub name: String,
pub state: ContainerState,
pub config: ContainerConfig,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ContainerState {
pub status: String,
pub running: bool,
pub paused: bool,
pub exit_code: i32,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ContainerConfig {
pub image: String,
pub cmd: Option<Vec<String>>,
pub tty: bool,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct ExecCreateRequest {
pub attach_stdin: bool,
pub attach_stdout: bool,
pub attach_stderr: bool,
pub tty: bool,
pub cmd: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub env: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub working_dir: Option<String>,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ExecCreateResponse {
pub id: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ContainerWaitResponse {
pub status_code: i64,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ImageSummary {
pub id: String,
pub repo_tags: Vec<String>,
pub created: i64,
pub size: i64,
}
pub async fn get_client() -> Result<DaemonClient> {
let client = DaemonClient::new();
if !client.is_running().await {
anyhow::bail!(
"Cannot connect to ArcBox daemon at {}\n\
Is the daemon running? Start it with: arcbox daemon",
client.socket_path().display()
);
}
Ok(client)
}
pub fn truncate(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len.saturating_sub(3)])
}
}
pub fn short_id(id: &str) -> &str {
if id.len() > 12 {
&id[..12]
} else {
id
}
}
pub fn relative_time(timestamp: i64) -> String {
let now = chrono::Utc::now().timestamp();
let diff = now - timestamp;
if diff < 60 {
format!("{} seconds ago", diff)
} else if diff < 3600 {
format!("{} minutes ago", diff / 60)
} else if diff < 86400 {
format!("{} hours ago", diff / 3600)
} else {
format!("{} days ago", diff / 86400)
}
}
fn extract_log_frame(buffer: &[u8]) -> Option<(u8, &[u8])> {
if buffer.len() < 8 {
return None;
}
let stream_type = buffer[0];
let size = u32::from_be_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize;
let frame_end = 8 + size;
if buffer.len() < frame_end {
return None;
}
Some((stream_type, &buffer[8..frame_end]))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_short_id() {
assert_eq!(short_id("abc123def456789"), "abc123def456");
assert_eq!(short_id("short"), "short");
}
#[test]
fn test_truncate() {
assert_eq!(truncate("hello world", 20), "hello world");
assert_eq!(truncate("hello world this is long", 15), "hello world ...");
}
}