1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::{future, Future, Sink, Stream};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::Handle;
use error;
use resp;
use super::connect::{connect, ClientConnection};
pub fn paired_connect(addr: &SocketAddr,
handle: &Handle)
-> Box<Future<Item = PairedConnection, Error = error::Error>> {
let handle = handle.clone();
let paired_con = connect(addr, &handle)
.map(move |connection| {
let ClientConnection { sender, receiver } = connection;
let (out_tx, out_rx) = mpsc::unbounded();
let sender = out_rx.fold(sender, |sender, msg| sender.send(msg).map_err(|_| ()));
let resp_queue: Arc<Mutex<VecDeque<oneshot::Sender<resp::RespValue>>>> =
Arc::new(Mutex::new(VecDeque::new()));
let receiver_queue = resp_queue.clone();
let receiver = receiver.for_each(move |msg| {
let mut queue = receiver_queue.lock().expect("Lock is tainted");
let dest = queue.pop_front().expect("Queue is empty");
match dest.send(msg) {
Ok(()) => Ok(()),
Err(_) => Ok(())
}
});
handle.spawn(sender.map(|_| ()));
handle.spawn(receiver.map_err(|_| ()));
PairedConnection {
out_tx: out_tx,
resp_queue: resp_queue,
}
})
.map_err(|e| e.into());
Box::new(paired_con)
}
pub struct PairedConnection {
out_tx: mpsc::UnboundedSender<resp::RespValue>,
resp_queue: Arc<Mutex<VecDeque<oneshot::Sender<resp::RespValue>>>>,
}
type SendBox<T> = Box<Future<Item = T, Error = error::Error>>;
impl PairedConnection {
pub fn send<R, T: resp::FromResp + 'static>(&self, msg: R) -> SendBox<T>
where R: Into<resp::RespValue>
{
let (tx, rx) = oneshot::channel();
let mut queue = self.resp_queue.lock().expect("Tainted queue");
queue.push_back(tx);
self.out_tx
.unbounded_send(msg.into())
.expect("Failed to send");
let future = rx.then(|v| match v {
Ok(v) => future::result(T::from_resp(v)),
Err(e) => future::err(e.into()),
});
Box::new(future)
}
pub fn send_and_forget<R>(&self, msg: R)
where R: Into<resp::RespValue>
{
let _: SendBox<String> = self.send(msg);
}
}