1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use futures::stream;
use model::server::{SamotopListener, SamotopPort, SamotopServer};
use server::builder::SamotopBuilder;
use service::mail::ConsoleMail;
use service::session::StatefulSessionService;
use service::tcp::SamotopService;
use service::TcpService;
use std::net::ToSocketAddrs;
use tokio;
use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
pub fn builder() -> SamotopBuilder<SamotopService<StatefulSessionService<ConsoleMail>>> {
let mail_svc = ConsoleMail::new("Samotop STARTTLS");
let session_svc = StatefulSessionService::new(mail_svc);
let tcp_svc = SamotopService::new(session_svc, Default::default());
SamotopBuilder::new("localhost:12345", tcp_svc)
}
pub(crate) fn resolve<S>(
server: SamotopServer<S>,
) -> impl Stream<Item = SamotopPort<S>, Error = io::Error>
where
S: Clone,
{
let SamotopServer { addr, service } = server;
stream::once(addr.to_socket_addrs())
.map(stream::iter_ok)
.map_err(move |e| {
io::Error::new(
e.kind(),
format!("Cannot reslove socket address {}: {}", addr, e),
)
})
.flatten()
.map(move |addr| SamotopPort {
addr,
service: service.clone(),
})
}
pub(crate) fn bind<S>(port: SamotopPort<S>) -> impl Future<Item = SamotopListener<S>, Error = ()> {
let SamotopPort {
addr: local,
service,
} = port;
future::lazy(move || future::result(TcpListener::bind(&local)))
.map_err(move |e| error!("bind error on {}: {}", local, e))
.and_then(move |listener| {
info!("listening on {}", local);
future::ok(SamotopListener {
listener,
service: service,
})
})
}
pub(crate) fn serve<S, Fut>(server: SamotopServer<S>) -> impl Future<Item = (), Error = ()>
where
S: Clone + Send + 'static,
S: TcpService<Future = Fut>,
Fut: Future<Item = (), Error = ()> + Send + 'static,
{
resolve(server)
.map(|port| {
let SamotopPort { addr, service } = port;
let service = stream::repeat(service);
SamotopPort { addr, service }
})
.map_err(|e| error!("{}", e))
.for_each(|port| tokio::spawn(bind(port).and_then(accept)))
}
pub(crate) fn accept<S, TcpSvc, Fut>(
listener: SamotopListener<S>,
) -> impl Future<Item = (), Error = ()>
where
S: Stream<Item = TcpSvc, Error = io::Error>,
TcpSvc: TcpService<Future = Fut>,
Fut: Future<Item = (), Error = ()> + Send + 'static,
{
let SamotopListener { listener, service } = listener;
let local = listener.local_addr().ok();
listener
.incoming()
.zip(service)
.for_each(|(tcp, handler)| {
tokio::spawn(handler.handle(tcp));
Ok(())
})
.then(move |result| match result {
Ok(_) => Ok(info!("done accepting on {:?}", local)),
Err(e) => Err(error!("done accepting on {:?} with error: {:?}", local, e)),
})
}