shadowsocks_service/local/http/
server.rs

1//! Shadowsocks Local HTTP proxy server
2//!
3//! https://www.ietf.org/rfc/rfc2068.txt
4
5use std::{io, net::SocketAddr, sync::Arc, time::Duration};
6
7use hyper::{body, server::conn::http1, service};
8use log::{error, info, trace};
9use shadowsocks::{config::ServerAddr, net::TcpListener};
10use tokio::{
11    io::{AsyncRead, AsyncWrite},
12    time,
13};
14
15use crate::local::{
16    context::ServiceContext, loadbalancing::PingBalancer, net::tcp::listener::create_standard_tcp_listener,
17};
18
19use super::{http_client::HttpClient, http_service::HttpService, tokio_rt::TokioIo};
20
21/// HTTP Local server builder
22pub struct HttpBuilder {
23    context: Arc<ServiceContext>,
24    client_config: ServerAddr,
25    balancer: PingBalancer,
26    #[cfg(target_os = "macos")]
27    launchd_tcp_socket_name: Option<String>,
28}
29
30impl HttpBuilder {
31    /// Create a new HTTP Local server builder
32    pub fn new(client_config: ServerAddr, balancer: PingBalancer) -> Self {
33        let context = ServiceContext::new();
34        Self::with_context(Arc::new(context), client_config, balancer)
35    }
36
37    /// Create with an existed context
38    pub fn with_context(context: Arc<ServiceContext>, client_config: ServerAddr, balancer: PingBalancer) -> Self {
39        Self {
40            context,
41            client_config,
42            balancer,
43            #[cfg(target_os = "macos")]
44            launchd_tcp_socket_name: None,
45        }
46    }
47
48    #[cfg(target_os = "macos")]
49    pub fn set_launchd_tcp_socket_name(&mut self, n: String) {
50        self.launchd_tcp_socket_name = Some(n);
51    }
52
53    /// Build HTTP server instance
54    pub async fn build(self) -> io::Result<Http> {
55        cfg_if::cfg_if! {
56            if #[cfg(target_os = "macos")] {
57                let listener = match self.launchd_tcp_socket_name {
58                    Some(launchd_socket_name) => {
59                        use tokio::net::TcpListener as TokioTcpListener;
60                        use crate::net::launch_activate_socket::get_launch_activate_tcp_listener;
61
62                        let std_listener = get_launch_activate_tcp_listener(&launchd_socket_name, true)?;
63                        let tokio_listener = TokioTcpListener::from_std(std_listener)?;
64                        TcpListener::from_listener(tokio_listener, self.context.accept_opts())?
65                    } _ => {
66                        create_standard_tcp_listener(&self.context, &self.client_config).await?
67                    }
68                };
69            } else {
70                let listener = create_standard_tcp_listener(&self.context, &self.client_config).await?;
71            }
72        }
73
74        // let proxy_client_cache = Arc::new(ProxyClientCache::new(self.context.clone()));
75
76        Ok(Http {
77            context: self.context,
78            listener,
79            balancer: self.balancer,
80        })
81    }
82}
83
84/// HTTP Local server
85pub struct Http {
86    context: Arc<ServiceContext>,
87    listener: TcpListener,
88    balancer: PingBalancer,
89}
90
91impl Http {
92    /// Server's local address
93    pub fn local_addr(&self) -> io::Result<SocketAddr> {
94        self.listener.local_addr()
95    }
96
97    /// Run server
98    pub async fn run(self) -> io::Result<()> {
99        // https://www.ietf.org/rfc/rfc2068.txt
100        // HTTP Proxy is based on HTTP/1.1
101
102        info!(
103            "shadowsocks HTTP listening on {}",
104            self.listener.local_addr().expect("http local_addr")
105        );
106
107        let handler = HttpConnectionHandler::new(self.context, self.balancer);
108
109        loop {
110            let (stream, peer_addr) = match self.listener.accept().await {
111                Ok(s) => s,
112                Err(err) => {
113                    error!("failed to accept HTTP clients, err: {}", err);
114                    time::sleep(Duration::from_secs(1)).await;
115                    continue;
116                }
117            };
118
119            trace!("HTTP accepted client from {}", peer_addr);
120            let handler = handler.clone();
121            tokio::spawn(async move {
122                if let Err(err) = handler.serve_connection(stream, peer_addr).await {
123                    error!("HTTP connection {} handler failed with error: {}", peer_addr, err);
124                }
125            });
126        }
127    }
128}
129
130/// HTTP Proxy handler for `accept()`ed HTTP clients
131///
132/// It should be created once and then `clone()` for every individual TCP connections
133#[derive(Clone)]
134pub struct HttpConnectionHandler {
135    context: Arc<ServiceContext>,
136    balancer: PingBalancer,
137    http_client: HttpClient<body::Incoming>,
138}
139
140impl HttpConnectionHandler {
141    /// Create a new Handler
142    pub fn new(context: Arc<ServiceContext>, balancer: PingBalancer) -> Self {
143        Self {
144            context,
145            balancer,
146            http_client: HttpClient::new(),
147        }
148    }
149
150    /// Handle a TCP HTTP connection
151    pub async fn serve_connection<S>(self, stream: S, peer_addr: SocketAddr) -> hyper::Result<()>
152    where
153        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
154    {
155        let Self {
156            context,
157            balancer,
158            http_client,
159        } = self;
160
161        let io = TokioIo::new(stream);
162
163        // NOTE: Some stupid clients requires HTTP header keys to be case-sensitive.
164        // For example: Nintendo Switch
165        http1::Builder::new()
166            .keep_alive(true)
167            .title_case_headers(true)
168            .preserve_header_case(true)
169            .serve_connection(
170                io,
171                service::service_fn(move |req| {
172                    HttpService::new(context.clone(), peer_addr, http_client.clone(), balancer.clone())
173                        .serve_connection(req)
174                }),
175            )
176            .with_upgrades()
177            .await
178    }
179}