1use futures::{Future, future};
2
3use tokio_service::Service;
4use tokio_core::reactor::Handle;
5use tokio_proto::{TcpClient};
6use tokio_proto::util::client_proxy::ClientProxy;
7use tokio_proto::streaming::{Message};
8
9use std::io;
10use std::net::SocketAddr;
11
12use config::Config;
13use response::{ResponseStream};
14use codec::{NsqMessage, NsqResponseMessage, ClientTypeMap};
15use protocol::{NsqProtocol, RequestMessage};
16
17#[derive(Clone)]
18pub struct Consumer {
19 inner: ClientTypeMap<ClientProxy<NsqMessage, NsqResponseMessage, io::Error>>,
20}
21
22impl Consumer {
23 pub fn connect(addr: &SocketAddr, handle: &Handle, config: Config) -> Box<Future<Item = Consumer, Error = io::Error>> {
25 let protocol = NsqProtocol::new(config);
26 let ret = TcpClient::new(protocol)
27 .connect(addr, handle)
28 .map(|client_proxy| {
29 let type_map = ClientTypeMap { inner: client_proxy };
30 Consumer { inner: type_map }
31 });
32
33 Box::new(ret)
34 }
35
36 #[allow(unused_variables)]
37 pub fn subscribe(&self, topic: String, channel: String) -> Box<Future<Item = ResponseStream, Error = io::Error>> {
38 let mut request = RequestMessage::new();
39 request.create_sub_command(topic, channel);
40
41 let service = self.inner.clone();
42 let resp = service.inner.call(Message::WithoutBody(request))
43 .map_err(|e| {e.into()})
44 .and_then(move |resp| {
45 let mut request = RequestMessage::new();
46 request.create_rdy_command();
47 let rdy = service.inner.call(Message::WithoutBody(request))
48 .map_err(|e| {e.into()});
49 rdy
50 })
51 .map(move |resp| {
52 match resp {
53 Message::WithoutBody(str) => {
54 panic!("Not implemented: {}", str)
55 },
56 Message::WithBody(head, body) => {
57 ResponseStream { inner: body }
58 }
59 }
60 });
61
62 Box::new(resp)
63 }
64
65 #[allow(unused_variables)]
66 pub fn fin(&self, message_id: String) -> Box<Future<Item = (), Error = io::Error>> {
67 let mut request = RequestMessage::new();
68 request.create_fin_command(message_id);
69
70 let service = self.inner.clone();
71 let resp = service.inner.call(Message::WithoutBody(request))
72 .map_err(|e| e.into())
73 .and_then(|resp| future::ok(()));
74
75 Box::new(resp)
76 }
77
78 #[allow(unused_variables)]
79 pub fn nop(&self) -> Box<Future<Item = (), Error = io::Error>> {
80 let mut request = RequestMessage::new();
81 request.create_nop_command();
82
83 let service = self.inner.clone();
84 let resp = service.inner.call(Message::WithoutBody(request))
85 .map_err(|e| e.into())
86 .and_then(|resp| future::ok(()));
87
88 Box::new(resp)
89 }
90}
91
92impl<T> Service for ClientTypeMap<T>
93 where T: Service<Request = RequestMessage, Response = NsqResponseMessage, Error = io::Error>,
94 T::Future: 'static
95{
96 type Request = RequestMessage;
97 type Response = NsqResponseMessage;
98 type Error = io::Error;
99 type Future = Box<Future<Item = NsqResponseMessage, Error = io::Error>>;
100
101 fn call(&self, req: RequestMessage) -> Self::Future {
102 Box::new(self.inner.call(req))
103 }
104}