1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use async_zmq_types::{IntoInnerSocket, PairConfig, SockConfig, SubConfig, UnPair};
use futures::{future::lazy, Future};
use crate::{
error::Error,
prelude::Build,
socket::{
types::{Pair, Sub},
Socket,
},
SESSION,
};
impl<'a, T> Build<T> for SockConfig<'a, T>
where
T: UnPair + IntoInnerSocket + From<Socket> + 'static,
{
fn build(self) -> Box<dyn Future<Item = T, Error = Error> + Send> {
let res = self.do_build();
let fut = lazy(move || res)
.from_err()
.and_then(|sock| {
let session = SESSION.local_session();
session.init(sock).map(move |socket| (socket, session))
})
.map(|(sock, sess)| Socket::from_sock_and_session(sock, sess))
.map(T::from);
Box::new(fut)
}
}
impl<'a> Build<Sub> for SubConfig<'a> {
fn build(self) -> Box<dyn Future<Item = Sub, Error = Error> + Send> {
let sock = self.do_build();
let fut = lazy(move || sock)
.from_err()
.and_then(|sock| {
let session = SESSION.local_session();
session.init(sock).map(move |socket| (socket, session))
})
.map(|(sock, sess)| Socket::from_sock_and_session(sock, sess))
.map(Sub::from);
Box::new(fut)
}
}
impl<'a> Build<Pair> for PairConfig<'a> {
fn build(self) -> Box<dyn Future<Item = Pair, Error = Error> + Send> {
let sock = self.do_build();
let fut = lazy(move || sock)
.from_err()
.and_then(|sock| {
let session = SESSION.local_session();
session.init(sock).map(move |sock| (sock, session))
})
.map(|(sock, sess)| Socket::from_sock_and_session(sock, sess))
.map(Pair::from);
Box::new(fut)
}
}