1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use futures::{Future, future};
use tokio_service::Service;
use tokio_core::reactor::Handle;
use tokio_proto::{TcpClient};
use tokio_proto::util::client_proxy::ClientProxy;
use tokio_proto::streaming::{Message};
use std::io;
use std::net::SocketAddr;
use config::Config;
use response::{ResponseStream};
use codec::{NsqMessage, NsqResponseMessage, ClientTypeMap};
use protocol::{NsqProtocol, RequestMessage};
#[derive(Clone)]
pub struct Consumer {
inner: ClientTypeMap<ClientProxy<NsqMessage, NsqResponseMessage, io::Error>>,
}
impl Consumer {
pub fn connect(addr: &SocketAddr, handle: &Handle, config: Config) -> Box<Future<Item = Consumer, Error = io::Error>> {
let protocol = NsqProtocol::new(config);
let ret = TcpClient::new(protocol)
.connect(addr, handle)
.map(|client_proxy| {
let type_map = ClientTypeMap { inner: client_proxy };
Consumer { inner: type_map }
});
Box::new(ret)
}
#[allow(unused_variables)]
pub fn subscribe(&self, topic: String, channel: String) -> Box<Future<Item = ResponseStream, Error = io::Error>> {
let mut request = RequestMessage::new();
request.create_sub_command(topic, channel);
let service = self.inner.clone();
let resp = service.inner.call(Message::WithoutBody(request))
.map_err(|e| {e.into()})
.and_then(move |resp| {
let mut request = RequestMessage::new();
request.create_rdy_command();
let rdy = service.inner.call(Message::WithoutBody(request))
.map_err(|e| {e.into()});
rdy
})
.map(move |resp| {
match resp {
Message::WithoutBody(str) => {
panic!("Not implemented: {}", str)
},
Message::WithBody(head, body) => {
ResponseStream { inner: body }
}
}
});
Box::new(resp)
}
#[allow(unused_variables)]
pub fn fin(&self, message_id: String) -> Box<Future<Item = (), Error = io::Error>> {
let mut request = RequestMessage::new();
request.create_fin_command(message_id);
let service = self.inner.clone();
let resp = service.inner.call(Message::WithoutBody(request))
.map_err(|e| e.into())
.and_then(|resp| future::ok(()));
Box::new(resp)
}
#[allow(unused_variables)]
pub fn nop(&self) -> Box<Future<Item = (), Error = io::Error>> {
let mut request = RequestMessage::new();
request.create_nop_command();
let service = self.inner.clone();
let resp = service.inner.call(Message::WithoutBody(request))
.map_err(|e| e.into())
.and_then(|resp| future::ok(()));
Box::new(resp)
}
}
impl<T> Service for ClientTypeMap<T>
where T: Service<Request = RequestMessage, Response = NsqResponseMessage, Error = io::Error>,
T::Future: 'static
{
type Request = RequestMessage;
type Response = NsqResponseMessage;
type Error = io::Error;
type Future = Box<Future<Item = NsqResponseMessage, Error = io::Error>>;
fn call(&self, req: RequestMessage) -> Self::Future {
Box::new(self.inner.call(req))
}
}