1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use crate::common::*;
use crate::model::io::*;
use crate::model::Result;
use crate::service::tcp::TcpService;
use async_std::net::{TcpListener, TcpStream, ToSocketAddrs};
use async_std::task;
use futures::{
    future::{BoxFuture, TryFutureExt},
    stream::FuturesUnordered,
};
use std::net::SocketAddr;

/// `Server` takes care of accepting TCP connections and passing them to `TcpService` to `handle()`.
pub struct Server<'a> {
    ports: Vec<BoxFuture<'a, Result<Vec<SocketAddr>>>>,
}

impl<'a> Server<'a> {
    pub fn new() -> Self {
        Self { ports: vec![] }
    }
    pub fn on<N>(ports: N) -> Self
    where
        N: ToSocketAddrs + 'a,
        N::Iter: Send,
    {
        Self::new().and(ports)
    }
    pub fn on_all<I, N>(ports: I) -> Self
    where
        I: IntoIterator<Item = N>,
        N: ToSocketAddrs + 'a,
        N::Iter: Send,
    {
        Self::new().and_all(ports)
    }
    pub fn and_all<I, N>(mut self, ports: I) -> Self
    where
        I: IntoIterator<Item = N>,
        N: ToSocketAddrs + 'a,
        N::Iter: Send,
    {
        for port in ports.into_iter() {
            self = self.and(port);
        }
        self
    }
    pub fn and<N>(mut self, ports: N) -> Self
    where
        N: ToSocketAddrs + 'a,
        N::Iter: Send,
    {
        self.ports.push(Box::pin(Self::map_ports(ports)));
        self
    }
    fn map_ports(addrs: impl ToSocketAddrs) -> impl Future<Output = Result<Vec<SocketAddr>>> {
        addrs
            .to_socket_addrs()
            .map_ok(|i| i.into_iter().collect())
            .map_err(|e| e.into())
    }
    pub async fn resolve_ports(&mut self) -> Result<Vec<SocketAddr>> {
        let mut result = vec![];
        for port in self.ports.iter_mut() {
            let port = port.await?;
            result.extend_from_slice(&port[..]);
        }
        Ok(result)
    }
    pub async fn serve<S>(mut self: Server<'a>, service: S) -> Result<()>
    where
        S: TcpService<TcpStream>,
    {
        Self::serve_ports(service, self.resolve_ports().await?).await
    }
    async fn serve_ports<S>(service: S, addrs: impl IntoIterator<Item = SocketAddr>) -> Result<()>
    where
        S: TcpService<TcpStream>,
    {
        let svc = std::rc::Rc::new(service);
        addrs
            .into_iter()
            .map(|a| Self::serve_port(svc.clone(), a))
            .collect::<FuturesUnordered<_>>()
            .skip_while(|r| futures::future::ready(r.is_ok()))
            .take(1)
            .fold(Ok(()), |acc, cur| match cur {
                Err(e) => futures::future::err(e),
                Ok(()) => futures::future::ready(acc),
            })
            .await
    }
    async fn serve_port<S>(service: S, addr: SocketAddr) -> Result<()>
    where
        S: TcpService<TcpStream> + Clone,
    {
        trace!("Binding on {}", addr);
        let listener = TcpListener::bind(addr)
            .await
            .map_err(|e| format!("Unable to bind {}: {}", addr, e))?;
        let mut incoming = listener.incoming();
        info!("Listening on {:?}", listener.local_addr());
        while let Some(stream) = incoming.next().await {
            let conn = if let Ok(ref stream) = stream {
                Connection::new(stream.local_addr().ok(), stream.peer_addr().ok())
            } else {
                Connection::default()
            };
            let stream = stream.map_err(|e| e.into());
            let service = service.clone();
            spawn_task_and_swallow_log_errors(
                format!("TCP transmission {}", conn),
                service.handle(stream, conn),
            );
        }
        Ok(())
    }
}

fn spawn_task_and_swallow_log_errors<F>(task_name: String, fut: F) -> task::JoinHandle<()>
where
    F: Future<Output = Result<()>> + Send + 'static,
{
    task::spawn(async move {
        log_errors(task_name, fut).await.unwrap();
        ()
    })
}

async fn log_errors<F, T, E>(task_name: String, fut: F) -> F::Output
where
    F: Future<Output = std::result::Result<T, E>>,
    E: std::fmt::Display,
{
    match fut.await {
        Err(e) => {
            error!("Error in {}: {}", task_name, e);
            Err(e)
        }
        Ok(r) => {
            info!("{} completed successfully.", task_name);
            Ok(r)
        }
    }
}