1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crate::channel::ProxyChannel;
use crate::{Error, Result};
use async_std::net::Incoming;
use async_std::stream::Stream;
use slog::Logger;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

/// WebSocket to TCP proxy server.
#[derive(Debug)]
pub struct ProxyServer<'a> {
    logger: Logger,
    real_server_addr: SocketAddr,
    incoming: Incoming<'a>,
}
impl<'a> ProxyServer<'a> {
    /// Makes a new `ProxyServer` instance.
    pub async fn new(
        logger: Logger,
        incoming: Incoming<'a>,
        real_server_addr: SocketAddr,
    ) -> Result<ProxyServer<'a>> {
        info!(logger, "Starts a WebSocket proxy server");
        Ok(ProxyServer {
            logger,
            real_server_addr,
            incoming,
        })
    }
}
impl<'a> Future for ProxyServer<'a> {
    type Output = Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.get_mut();
        loop {
            match Pin::new(&mut this.incoming).poll_next(cx) {
                Poll::Pending => {
                    break;
                }
                Poll::Ready(None) => {
                    warn!(
                        this.logger,
                        "TCP socket for the WebSocket proxy server has been closed"
                    );
                    return Poll::Ready(Ok(()));
                }
                Poll::Ready(Some(Err(e))) => {
                    return Poll::Ready(Err(track!(Error::from(e))));
                }
                Poll::Ready(Some(Ok(stream))) => {
                    let addr = stream.peer_addr()?;
                    debug!(this.logger, "New client arrived: {:?}", addr);

                    let logger = this.logger.new(o!("client_addr" => addr.to_string()));
                    let channel = ProxyChannel::new(logger.clone(), stream, this.real_server_addr);
                    async_std::task::spawn(async move {
                        match channel.await {
                            Err(e) => {
                                warn!(logger, "A proxy channel aborted: {}", e);
                            }
                            Ok(()) => {
                                info!(logger, "A proxy channel terminated normally");
                            }
                        }
                    });
                }
            }
        }
        Poll::Pending
    }
}