1use crate::channel::ProxyChannel;
2use crate::{Error, Result};
3use async_std::net::Incoming;
4use async_std::stream::Stream;
5use slog::Logger;
6use std::future::Future;
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::task::Context;
10use std::task::Poll;
11
12#[derive(Debug)]
14pub struct ProxyServer<'a> {
15 logger: Logger,
16 real_server_addr: SocketAddr,
17 incoming: Incoming<'a>,
18}
19impl<'a> ProxyServer<'a> {
20 pub async fn new(
22 logger: Logger,
23 incoming: Incoming<'a>,
24 real_server_addr: SocketAddr,
25 ) -> Result<ProxyServer<'a>> {
26 info!(logger, "Starts a WebSocket proxy server");
27 Ok(ProxyServer {
28 logger,
29 real_server_addr,
30 incoming,
31 })
32 }
33}
34impl<'a> Future for ProxyServer<'a> {
35 type Output = Result<()>;
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
38 let this = self.get_mut();
39 loop {
40 match Pin::new(&mut this.incoming).poll_next(cx) {
41 Poll::Pending => {
42 break;
43 }
44 Poll::Ready(None) => {
45 warn!(
46 this.logger,
47 "TCP socket for the WebSocket proxy server has been closed"
48 );
49 return Poll::Ready(Ok(()));
50 }
51 Poll::Ready(Some(Err(e))) => {
52 return Poll::Ready(Err(track!(Error::from(e))));
53 }
54 Poll::Ready(Some(Ok(stream))) => {
55 let addr = stream.peer_addr()?;
56 debug!(this.logger, "New client arrived: {:?}", addr);
57
58 let logger = this.logger.new(o!("client_addr" => addr.to_string()));
59 let channel = ProxyChannel::new(logger.clone(), stream, this.real_server_addr);
60 async_std::task::spawn(async move {
61 match channel.await {
62 Err(e) => {
63 warn!(logger, "A proxy channel aborted: {}", e);
64 }
65 Ok(()) => {
66 info!(logger, "A proxy channel terminated normally");
67 }
68 }
69 });
70 }
71 }
72 }
73 Poll::Pending
74 }
75}