1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#![cfg(unix)]
use std::{hash::BuildHasher, time::Duration};
use hpx::{Client, Proxy};
use http::Method;
use http_body_util::Full;
use hyper::{Request, Response, body::Incoming, service::service_fn};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder,
};
use tokio::{net::UnixListener, task};
fn random_sock_path() -> std::path::PathBuf {
let mut buf = std::env::temp_dir();
// libstd uses system random to create each one
let rng = std::collections::hash_map::RandomState::new();
let n = rng.hash_one("uds-sock");
buf.push(format!("test-uds-sock-{}", n));
buf
}
#[tokio::test]
async fn test_unix_socket() {
let sock_path = random_sock_path();
let listener = UnixListener::bind(&sock_path).unwrap();
let server = async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let service = service_fn(|_req: Request<Incoming>| async {
Ok::<_, hyper::Error>(Response::new(Full::new(&b"hello unix"[..])))
});
task::spawn(async move {
if let Err(e) = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.await
{
eprintln!("server error: {:?}", e);
}
});
}
};
tokio::spawn(server);
let client = Client::builder()
.proxy(Proxy::unix(sock_path).unwrap())
.timeout(Duration::from_secs(10))
.build()
.unwrap();
let resp = client.get("http://localhost/").send().await.unwrap();
let body = resp.text().await.unwrap();
assert_eq!(body, "hello unix");
}
#[tokio::test]
async fn test_proxy_unix_socket() {
let sock_path = random_sock_path();
let listener = UnixListener::bind(&sock_path).unwrap();
let server = async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let service = service_fn(|req: Request<Incoming>| {
async move {
if Method::CONNECT == req.method() {
// Received an HTTP request like:
// ```
// CONNECT www.domain.com:443 HTTP/1.1
// Host: www.domain.com:443
// Proxy-Connection: Keep-Alive
// ```
//
// When HTTP method is CONNECT we should return an empty body,
// then we can eventually upgrade the connection and talk a new protocol.
//
// Note: only after client received an empty body with STATUS_OK can the
// connection be upgraded, so we can't return a response inside
// `on_upgrade` future.
let authority = req.uri().authority().cloned().unwrap();
tokio::task::spawn({
let req = req;
async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
tracing::info!("upgraded connection to: {}", authority);
if let Ok(mut io) =
tokio::net::TcpStream::connect(authority.to_string())
.await
{
let _ = tokio::io::copy_bidirectional(
&mut TokioIo::new(upgraded),
&mut io,
)
.await;
}
}
Err(e) => tracing::warn!("upgrade error: {}", e),
}
}
});
Ok::<_, hyper::Error>(Response::new(Full::new(&b""[..])))
} else {
Ok::<_, hyper::Error>(Response::new(Full::new(
&b"unsupported request method"[..],
)))
}
}
});
task::spawn(async move {
if let Err(e) = Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades(io, service)
.await
{
eprintln!("server error: {:?}", e);
}
});
}
};
tokio::spawn(server);
let client = Client::builder()
.proxy(Proxy::unix(sock_path).unwrap())
.timeout(Duration::from_secs(30))
.build()
.unwrap();
let resp = client.get("https://example.com").send().await.unwrap();
assert!(resp.status().is_success(), "Expected successful response");
}