Skip to main content

gatel_core/proxy/
unix_upstream.rs

1//! Unix domain socket upstream support.
2//!
3//! Allows proxying requests to backends listening on Unix sockets instead of
4//! TCP. The upstream address is specified as `unix:/path/to/socket` in the
5//! configuration.
6//!
7//! This module is only available on Unix platforms.
8
9#![cfg(unix)]
10
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use hyper::rt::{Read, Write};
15use hyper_util::client::legacy::connect::{Connected, Connection};
16use hyper_util::rt::TokioIo;
17use tokio::net::UnixStream;
18
19/// A connector that establishes connections over Unix domain sockets.
20#[derive(Clone)]
21pub struct UnixConnector {
22    path: String,
23}
24
25impl UnixConnector {
26    /// Create a connector for the given socket path.
27    pub fn new(path: impl Into<String>) -> Self {
28        Self { path: path.into() }
29    }
30}
31
32/// Wrapper around `TokioIo<UnixStream>` that implements `Connection`.
33pub struct UnixConnection(TokioIo<UnixStream>);
34
35impl Connection for UnixConnection {
36    fn connected(&self) -> Connected {
37        Connected::new()
38    }
39}
40
41impl Read for UnixConnection {
42    fn poll_read(
43        mut self: Pin<&mut Self>,
44        cx: &mut Context<'_>,
45        buf: hyper::rt::ReadBufCursor<'_>,
46    ) -> Poll<std::io::Result<()>> {
47        Pin::new(&mut self.0).poll_read(cx, buf)
48    }
49}
50
51impl Write for UnixConnection {
52    fn poll_write(
53        mut self: Pin<&mut Self>,
54        cx: &mut Context<'_>,
55        buf: &[u8],
56    ) -> Poll<std::io::Result<usize>> {
57        Pin::new(&mut self.0).poll_write(cx, buf)
58    }
59
60    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
61        Pin::new(&mut self.0).poll_flush(cx)
62    }
63
64    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
65        Pin::new(&mut self.0).poll_shutdown(cx)
66    }
67}
68
69impl tower_service::Service<http::Uri> for UnixConnector {
70    type Response = UnixConnection;
71    type Error = std::io::Error;
72    type Future =
73        Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
74
75    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76        Poll::Ready(Ok(()))
77    }
78
79    fn call(&mut self, _uri: http::Uri) -> Self::Future {
80        let path = self.path.clone();
81        Box::pin(async move {
82            let stream = UnixStream::connect(&path).await?;
83            Ok(UnixConnection(TokioIo::new(stream)))
84        })
85    }
86}
87
88/// Returns `true` if the address looks like a Unix socket path.
89pub fn is_unix_addr(addr: &str) -> bool {
90    addr.starts_with("unix:") || addr.starts_with("/")
91}
92
93/// Extract the socket path from a `unix:/path/to/sock` address.
94pub fn parse_unix_path(addr: &str) -> &str {
95    addr.strip_prefix("unix:").unwrap_or(addr)
96}
97
98/// Build an HTTP client that connects over a Unix domain socket.
99pub fn build_unix_client(
100    socket_path: &str,
101) -> hyper_util::client::legacy::Client<UnixConnector, crate::Body> {
102    let connector = UnixConnector::new(socket_path);
103    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
104        .build(connector)
105}