shadowsocks_service/local/http/
server.rs1use std::{io, net::SocketAddr, sync::Arc, time::Duration};
6
7use hyper::{body, server::conn::http1, service};
8use log::{error, info, trace};
9use shadowsocks::{config::ServerAddr, net::TcpListener};
10use tokio::{
11 io::{AsyncRead, AsyncWrite},
12 time,
13};
14
15use crate::local::{
16 context::ServiceContext, loadbalancing::PingBalancer, net::tcp::listener::create_standard_tcp_listener,
17};
18
19use super::{http_client::HttpClient, http_service::HttpService, tokio_rt::TokioIo};
20
21pub struct HttpBuilder {
23 context: Arc<ServiceContext>,
24 client_config: ServerAddr,
25 balancer: PingBalancer,
26 #[cfg(target_os = "macos")]
27 launchd_tcp_socket_name: Option<String>,
28}
29
30impl HttpBuilder {
31 pub fn new(client_config: ServerAddr, balancer: PingBalancer) -> Self {
33 let context = ServiceContext::new();
34 Self::with_context(Arc::new(context), client_config, balancer)
35 }
36
37 pub fn with_context(context: Arc<ServiceContext>, client_config: ServerAddr, balancer: PingBalancer) -> Self {
39 Self {
40 context,
41 client_config,
42 balancer,
43 #[cfg(target_os = "macos")]
44 launchd_tcp_socket_name: None,
45 }
46 }
47
48 #[cfg(target_os = "macos")]
49 pub fn set_launchd_tcp_socket_name(&mut self, n: String) {
50 self.launchd_tcp_socket_name = Some(n);
51 }
52
53 pub async fn build(self) -> io::Result<Http> {
55 cfg_if::cfg_if! {
56 if #[cfg(target_os = "macos")] {
57 let listener = match self.launchd_tcp_socket_name {
58 Some(launchd_socket_name) => {
59 use tokio::net::TcpListener as TokioTcpListener;
60 use crate::net::launch_activate_socket::get_launch_activate_tcp_listener;
61
62 let std_listener = get_launch_activate_tcp_listener(&launchd_socket_name, true)?;
63 let tokio_listener = TokioTcpListener::from_std(std_listener)?;
64 TcpListener::from_listener(tokio_listener, self.context.accept_opts())?
65 } _ => {
66 create_standard_tcp_listener(&self.context, &self.client_config).await?
67 }
68 };
69 } else {
70 let listener = create_standard_tcp_listener(&self.context, &self.client_config).await?;
71 }
72 }
73
74 Ok(Http {
77 context: self.context,
78 listener,
79 balancer: self.balancer,
80 })
81 }
82}
83
84pub struct Http {
86 context: Arc<ServiceContext>,
87 listener: TcpListener,
88 balancer: PingBalancer,
89}
90
91impl Http {
92 pub fn local_addr(&self) -> io::Result<SocketAddr> {
94 self.listener.local_addr()
95 }
96
97 pub async fn run(self) -> io::Result<()> {
99 info!(
103 "shadowsocks HTTP listening on {}",
104 self.listener.local_addr().expect("http local_addr")
105 );
106
107 let handler = HttpConnectionHandler::new(self.context, self.balancer);
108
109 loop {
110 let (stream, peer_addr) = match self.listener.accept().await {
111 Ok(s) => s,
112 Err(err) => {
113 error!("failed to accept HTTP clients, err: {}", err);
114 time::sleep(Duration::from_secs(1)).await;
115 continue;
116 }
117 };
118
119 trace!("HTTP accepted client from {}", peer_addr);
120 let handler = handler.clone();
121 tokio::spawn(async move {
122 if let Err(err) = handler.serve_connection(stream, peer_addr).await {
123 error!("HTTP connection {} handler failed with error: {}", peer_addr, err);
124 }
125 });
126 }
127 }
128}
129
130#[derive(Clone)]
134pub struct HttpConnectionHandler {
135 context: Arc<ServiceContext>,
136 balancer: PingBalancer,
137 http_client: HttpClient<body::Incoming>,
138}
139
140impl HttpConnectionHandler {
141 pub fn new(context: Arc<ServiceContext>, balancer: PingBalancer) -> Self {
143 Self {
144 context,
145 balancer,
146 http_client: HttpClient::new(),
147 }
148 }
149
150 pub async fn serve_connection<S>(self, stream: S, peer_addr: SocketAddr) -> hyper::Result<()>
152 where
153 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
154 {
155 let Self {
156 context,
157 balancer,
158 http_client,
159 } = self;
160
161 let io = TokioIo::new(stream);
162
163 http1::Builder::new()
166 .keep_alive(true)
167 .title_case_headers(true)
168 .preserve_header_case(true)
169 .serve_connection(
170 io,
171 service::service_fn(move |req| {
172 HttpService::new(context.clone(), peer_addr, http_client.clone(), balancer.clone())
173 .serve_connection(req)
174 }),
175 )
176 .with_upgrades()
177 .await
178 }
179}