#![cfg(unix)]
use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::rt::{Read, Write};
use hyper_util::client::legacy::connect::{Connected, Connection};
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
#[derive(Clone)]
pub struct UnixConnector {
path: String,
}
impl UnixConnector {
pub fn new(path: impl Into<String>) -> Self {
Self { path: path.into() }
}
}
pub struct UnixConnection(TokioIo<UnixStream>);
impl Connection for UnixConnection {
fn connected(&self) -> Connected {
Connected::new()
}
}
impl Read for UnixConnection {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl Write for UnixConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}
impl tower_service::Service<http::Uri> for UnixConnector {
type Response = UnixConnection;
type Error = std::io::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _uri: http::Uri) -> Self::Future {
let path = self.path.clone();
Box::pin(async move {
let stream = UnixStream::connect(&path).await?;
Ok(UnixConnection(TokioIo::new(stream)))
})
}
}
pub fn is_unix_addr(addr: &str) -> bool {
addr.starts_with("unix:") || addr.starts_with("/")
}
pub fn parse_unix_path(addr: &str) -> &str {
addr.strip_prefix("unix:").unwrap_or(addr)
}
pub fn build_unix_client(
socket_path: &str,
) -> hyper_util::client::legacy::Client<UnixConnector, crate::Body> {
let connector = UnixConnector::new(socket_path);
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.build(connector)
}