use bytes::Bytes;
use http_body_util::{BodyExt, Full, Limited};
use hyper::body::Incoming;
use hyper::client::conn::http1;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use serde::{de::DeserializeOwned, Serialize};
use super::error::PodmanError;
mod encode;
pub(crate) use encode::urlencoded;
type BoxBody = Full<Bytes>;
const MAX_RESPONSE_BYTES: usize = 64 * 1024 * 1024;
const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
pub type Result<T> = std::result::Result<T, PodmanError>;
pub struct Client {
socket_path: String,
}
impl Client {
pub fn new(socket_path: impl Into<String>) -> Self {
Self {
socket_path: socket_path.into(),
}
}
async fn connect(&self) -> Result<http1::SendRequest<BoxBody>> {
#[cfg(unix)]
{
let stream = tokio::net::UnixStream::connect(&self.socket_path).await?;
let io = TokioIo::new(stream);
let (sender, conn) = http1::handshake(io).await?;
tokio::spawn(async move {
let _ = conn.await;
});
Ok(sender)
}
#[cfg(windows)]
{
let pipe = {
let mut last_err = None;
let mut result = None;
for _ in 0..20 {
match tokio::net::windows::named_pipe::ClientOptions::new()
.open(&self.socket_path)
{
Ok(p) => {
result = Some(p);
break;
}
Err(e) if e.raw_os_error() == Some(231) => {
last_err = Some(e);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
Err(e) => return Err(PodmanError::Connect(e)),
}
}
result.ok_or_else(|| {
PodmanError::Connect(last_err.unwrap_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"named pipe busy after 20 retries",
)
}))
})?
};
let io = TokioIo::new(pipe);
let (sender, conn) = http1::handshake(io).await?;
tokio::spawn(async move {
let _ = conn.await;
});
Ok(sender)
}
}
fn build_request(
method: Method,
path: &str,
body: BoxBody,
content_type: Option<&str>,
) -> Result<Request<BoxBody>> {
let uri: hyper::Uri = format!("http://localhost{path}").parse().map_err(
|e: hyper::http::uri::InvalidUri| PodmanError::Api {
status: 0,
message: format!("invalid API path '{path}': {e}"),
},
)?;
let mut builder = Request::builder()
.method(method)
.uri(uri)
.header(hyper::header::HOST, "localhost");
if let Some(ct) = content_type {
builder = builder.header(hyper::header::CONTENT_TYPE, ct);
}
builder.body(body).map_err(|e| PodmanError::Api {
status: 0,
message: e.to_string(),
})
}
async fn send(&self, req: Request<BoxBody>) -> Result<Response<Incoming>> {
let mut sender = tokio::time::timeout(CONNECT_TIMEOUT, self.connect())
.await
.map_err(|_| PodmanError::Api {
status: 0,
message: format!(
"timed out after {}s connecting to the Podman socket",
CONNECT_TIMEOUT.as_secs()
),
})??;
sender.send_request(req).await.map_err(PodmanError::Hyper)
}
async fn read_body(resp: Response<Incoming>) -> Result<(StatusCode, Vec<u8>)> {
let status = resp.status();
let collected = tokio::time::timeout(
READ_TIMEOUT,
Limited::new(resp.into_body(), MAX_RESPONSE_BYTES).collect(),
)
.await
.map_err(|_| PodmanError::Api {
status: 0,
message: format!(
"timed out after {}s reading the response body from the Podman socket",
READ_TIMEOUT.as_secs()
),
})?
.map_err(|e| PodmanError::Api {
status: 0,
message: format!("reading response body: {e}"),
})?;
Ok((status, collected.to_bytes().to_vec()))
}
fn check_status(status: StatusCode, body: &[u8]) -> Result<()> {
if status.is_success() {
return Ok(());
}
#[derive(serde::Deserialize)]
struct ApiError {
cause: Option<String>,
message: Option<String>,
}
let msg = if let Ok(e) = serde_json::from_slice::<ApiError>(body) {
e.message
.or(e.cause)
.unwrap_or_else(|| String::from_utf8_lossy(body).into_owned())
} else {
String::from_utf8_lossy(body).into_owned()
};
Err(PodmanError::Api {
status: status.as_u16(),
message: msg,
})
}
async fn stream_or_err(resp: Response<Incoming>) -> Result<Response<Incoming>> {
if resp.status().is_success() {
return Ok(resp);
}
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)?;
unreachable!("check_status returns Err for a non-success status")
}
pub async fn ping(&self) -> Result<()> {
let req = Self::build_request(Method::GET, "/libpod/_ping", Full::new(Bytes::new()), None)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)
}
pub async fn get_json<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let req = Self::build_request(Method::GET, path, Full::new(Bytes::new()), None)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)?;
serde_json::from_slice(&body).map_err(PodmanError::Json)
}
pub async fn get_stream(&self, path: &str) -> Result<Response<Incoming>> {
let req = Self::build_request(Method::GET, path, Full::new(Bytes::new()), None)?;
Self::stream_or_err(self.send(req).await?).await
}
pub async fn post_json<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T> {
let json = serde_json::to_vec(body).map_err(PodmanError::Json)?;
let req = Self::build_request(
Method::POST,
path,
Full::new(Bytes::from(json)),
Some("application/json"),
)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)?;
serde_json::from_slice(&body).map_err(PodmanError::Json)
}
pub async fn post_json_ok<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
let json = serde_json::to_vec(body).map_err(PodmanError::Json)?;
let req = Self::build_request(
Method::POST,
path,
Full::new(Bytes::from(json)),
Some("application/json"),
)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)
}
pub async fn post_json_stream<B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<Response<Incoming>> {
let json = serde_json::to_vec(body).map_err(PodmanError::Json)?;
let req = Self::build_request(
Method::POST,
path,
Full::new(Bytes::from(json)),
Some("application/json"),
)?;
Self::stream_or_err(self.send(req).await?).await
}
pub async fn post_empty_ok(&self, path: &str) -> Result<()> {
let req = Self::build_request(Method::POST, path, Full::new(Bytes::new()), None)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
if status == StatusCode::NOT_MODIFIED {
return Ok(());
}
Self::check_status(status, &body)
}
pub async fn post_empty_stream(&self, path: &str) -> Result<Response<Incoming>> {
let req = Self::build_request(Method::POST, path, Full::new(Bytes::new()), None)?;
Self::stream_or_err(self.send(req).await?).await
}
pub async fn post_empty_json<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let req = Self::build_request(Method::POST, path, Full::new(Bytes::new()), None)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)?;
serde_json::from_slice(&body).map_err(PodmanError::Json)
}
pub async fn post_bytes_stream(
&self,
path: &str,
bytes: Bytes,
content_type: &str,
) -> Result<Response<Incoming>> {
let req = Self::build_request(Method::POST, path, Full::new(bytes), Some(content_type))?;
Self::stream_or_err(self.send(req).await?).await
}
pub async fn post_bytes_json<T: DeserializeOwned>(
&self,
path: &str,
bytes: Bytes,
content_type: &str,
) -> Result<T> {
let req = Self::build_request(Method::POST, path, Full::new(bytes), Some(content_type))?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)?;
serde_json::from_slice(&body).map_err(PodmanError::Json)
}
pub async fn put_bytes_ok(&self, path: &str, bytes: Bytes, content_type: &str) -> Result<()> {
let req = Self::build_request(Method::PUT, path, Full::new(bytes), Some(content_type))?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
Self::check_status(status, &body)
}
pub async fn delete_ok(&self, path: &str) -> Result<()> {
let req = Self::build_request(Method::DELETE, path, Full::new(Bytes::new()), None)?;
let resp = self.send(req).await?;
let (status, body) = Self::read_body(resp).await?;
if status == StatusCode::NOT_FOUND {
return Ok(());
}
Self::check_status(status, &body)
}
}
#[cfg(test)]
mod tests {
use hyper::StatusCode;
use super::Client;
#[test]
fn check_status_ok_on_200() {
Client::check_status(StatusCode::OK, b"").unwrap();
}
#[test]
fn check_status_ok_on_201() {
Client::check_status(StatusCode::CREATED, b"").unwrap();
}
#[test]
fn check_status_error_on_404() {
let err = Client::check_status(StatusCode::NOT_FOUND, b"not found").unwrap_err();
assert!(err.is_status(404));
assert!(err.to_string().contains("not found"));
}
#[test]
fn check_status_parses_podman_json_error() {
let body = br#"{"message":"container not found","cause":"no such container"}"#;
let err = Client::check_status(StatusCode::NOT_FOUND, body).unwrap_err();
assert!(err.is_status(404));
assert!(err.to_string().contains("container not found"));
}
#[test]
fn check_status_falls_back_to_cause_when_no_message() {
let body = br#"{"cause":"volume in use"}"#;
let err = Client::check_status(StatusCode::CONFLICT, body).unwrap_err();
assert!(err.is_status(409));
assert!(err.to_string().contains("volume in use"));
}
#[test]
fn check_status_falls_back_to_raw_body_on_non_json() {
let err = Client::check_status(StatusCode::INTERNAL_SERVER_ERROR, b"plain text error")
.unwrap_err();
assert!(err.is_status(500));
assert!(err.to_string().contains("plain text error"));
}
#[test]
fn build_request_valid_path() {
use bytes::Bytes;
use http_body_util::Full;
use hyper::Method;
Client::build_request(Method::GET, "/libpod/_ping", Full::new(Bytes::new()), None).unwrap();
}
#[test]
fn client_new_stores_socket_path() {
let c = Client::new("/run/user/1000/podman/podman.sock");
drop(c); }
}