hyper_socket/
lib.rs

1//! This crate provides an instance of [`Connect`] which communicates over a local Unix
2//! Domain Socket rather than TCP.
3//!
4//! Numerous system daemons expose such sockets but use HTTP in order to unify their local and
5//! remote RPC APIs (such as [Consul](https://consul.io)). This connector is a mean to communicate
6//! with those services.
7//!
8//! NB: As sockets are named by a file path and not a DNS name, the hostname of any requests are not
9//! used for initiating a connection-- all requests, regardless of the intended destination, are
10//! routed to the same socket.
11
12use futures::prelude::*;
13use hyper::client::Client;
14use hyper::client::connect::{Connection, Connected};
15use hyper::http::Uri;
16use hyper::service::Service;
17use std::future::Future;
18use std::ops::{Deref, DerefMut};
19use std::path::Path;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use tokio::io::{AsyncRead, AsyncWrite, Result};
24use tokio::net::UnixStream;
25
26
27/// A connector to a local Unix Domain Socket which uses HTTP as the application-layer protocol.
28///
29/// ```rust
30/// use hyper::{Body, Client};
31/// use hyper_socket::UnixSocketConnector;
32///
33/// let connector: UnixSocketConnector = UnixSocketConnector::new("/run/consul.sock");
34/// let client: Client<_, Body> = Client::builder().build(connector);
35/// ```
36///
37/// For more information, please refer to the [module documentation][crate].
38#[derive(Clone, Debug)]
39pub struct UnixSocketConnector(Arc<Path>);
40
41impl UnixSocketConnector {
42    pub fn new<P: AsRef<Path>>(path: P) -> Self {
43        let path = Arc::from(path.as_ref());
44        UnixSocketConnector(path)
45    }
46
47    pub fn connect(&self) -> impl Future<Output=Result<UnixSocketConnection>> {
48        UnixStream::connect(Arc::clone(&self.0)).map_ok(UnixSocketConnection)
49    }
50
51    pub fn client<P: AsRef<Path>>(path: P) -> Client<Self> {
52        Client::builder().build(UnixSocketConnector::new(path))
53    }
54}
55
56impl Service<Uri> for UnixSocketConnector{
57    type Response = UnixSocketConnection;
58    type Error = tokio::io::Error;
59    type Future = Pin<Box<UnixSocketFuture>>;
60
61    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<()>> {
62        Poll::Ready(Ok(()))
63    }
64
65    fn call(&mut self, _: Uri) -> Self::Future {
66        Box::pin(self.connect())
67    }
68}
69
70/// A wrapper around Tokio's [UnixStream][] type, implementing [Connection][].
71pub struct UnixSocketConnection(UnixStream);
72
73impl AsyncRead for UnixSocketConnection {
74    #[inline(always)]
75    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
76        Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
77    }
78}
79
80impl AsyncWrite for UnixSocketConnection {
81    #[inline(always)]
82    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])-> Poll<Result<usize>> {
83        Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
84    }
85
86    #[inline(always)]
87    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)-> Poll<Result<()>> {
88        Pin::new(&mut self.get_mut().0).poll_flush(cx)
89    }
90
91    #[inline(always)]
92    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context)-> Poll<Result<()>> {
93        Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
94    }
95}
96
97impl Connection for UnixSocketConnection {
98    fn connected(&self) -> Connected {
99        Connected::new().proxy(true)
100    }
101}
102
103impl Deref for UnixSocketConnection {
104    type Target = UnixStream;
105
106    fn deref(&self) -> &UnixStream {
107        &self.0
108    }
109}
110
111impl DerefMut for UnixSocketConnection {
112    fn deref_mut(&mut self) -> &mut UnixStream {
113        &mut self.0
114    }
115}
116
117type UnixSocketFuture = dyn Future<Output=Result<UnixSocketConnection>> + Send;