gatel_core/proxy/
unix_upstream.rs1#![cfg(unix)]
10
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use hyper::rt::{Read, Write};
15use hyper_util::client::legacy::connect::{Connected, Connection};
16use hyper_util::rt::TokioIo;
17use tokio::net::UnixStream;
18
19#[derive(Clone)]
21pub struct UnixConnector {
22 path: String,
23}
24
25impl UnixConnector {
26 pub fn new(path: impl Into<String>) -> Self {
28 Self { path: path.into() }
29 }
30}
31
32pub struct UnixConnection(TokioIo<UnixStream>);
34
35impl Connection for UnixConnection {
36 fn connected(&self) -> Connected {
37 Connected::new()
38 }
39}
40
41impl Read for UnixConnection {
42 fn poll_read(
43 mut self: Pin<&mut Self>,
44 cx: &mut Context<'_>,
45 buf: hyper::rt::ReadBufCursor<'_>,
46 ) -> Poll<std::io::Result<()>> {
47 Pin::new(&mut self.0).poll_read(cx, buf)
48 }
49}
50
51impl Write for UnixConnection {
52 fn poll_write(
53 mut self: Pin<&mut Self>,
54 cx: &mut Context<'_>,
55 buf: &[u8],
56 ) -> Poll<std::io::Result<usize>> {
57 Pin::new(&mut self.0).poll_write(cx, buf)
58 }
59
60 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
61 Pin::new(&mut self.0).poll_flush(cx)
62 }
63
64 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
65 Pin::new(&mut self.0).poll_shutdown(cx)
66 }
67}
68
69impl tower_service::Service<http::Uri> for UnixConnector {
70 type Response = UnixConnection;
71 type Error = std::io::Error;
72 type Future =
73 Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
74
75 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 Poll::Ready(Ok(()))
77 }
78
79 fn call(&mut self, _uri: http::Uri) -> Self::Future {
80 let path = self.path.clone();
81 Box::pin(async move {
82 let stream = UnixStream::connect(&path).await?;
83 Ok(UnixConnection(TokioIo::new(stream)))
84 })
85 }
86}
87
88pub fn is_unix_addr(addr: &str) -> bool {
90 addr.starts_with("unix:") || addr.starts_with("/")
91}
92
93pub fn parse_unix_path(addr: &str) -> &str {
95 addr.strip_prefix("unix:").unwrap_or(addr)
96}
97
98pub fn build_unix_client(
100 socket_path: &str,
101) -> hyper_util::client::legacy::Client<UnixConnector, crate::Body> {
102 let connector = UnixConnector::new(socket_path);
103 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
104 .build(connector)
105}