nsqueue/
consumer.rs

1use futures::{Future, future};
2
3use tokio_service::Service;
4use tokio_core::reactor::Handle;
5use tokio_proto::{TcpClient};
6use tokio_proto::util::client_proxy::ClientProxy;
7use tokio_proto::streaming::{Message};
8
9use std::io;
10use std::net::SocketAddr;
11
12use config::Config;
13use response::{ResponseStream};
14use codec::{NsqMessage, NsqResponseMessage, ClientTypeMap};
15use protocol::{NsqProtocol, RequestMessage};
16
17#[derive(Clone)]
18pub struct Consumer {
19    inner: ClientTypeMap<ClientProxy<NsqMessage, NsqResponseMessage, io::Error>>,
20}
21
22impl Consumer {
23    /// Establish a connection and send protocol version.
24    pub fn connect(addr: &SocketAddr, handle: &Handle, config: Config) -> Box<Future<Item = Consumer, Error = io::Error>> {
25        let protocol = NsqProtocol::new(config);
26        let ret = TcpClient::new(protocol)
27            .connect(addr, handle)
28            .map(|client_proxy| {
29                let type_map = ClientTypeMap { inner: client_proxy };
30                Consumer { inner: type_map }
31            });
32
33        Box::new(ret)
34    } 
35
36    #[allow(unused_variables)]
37    pub fn subscribe(&self, topic: String, channel: String) -> Box<Future<Item = ResponseStream, Error = io::Error>> {
38        let mut request = RequestMessage::new();
39        request.create_sub_command(topic, channel);        
40        
41        let service = self.inner.clone();
42        let resp = service.inner.call(Message::WithoutBody(request))
43            .map_err(|e| {e.into()})
44            .and_then(move |resp| {
45                let mut request = RequestMessage::new();
46                request.create_rdy_command();
47                let rdy = service.inner.call(Message::WithoutBody(request))
48                    .map_err(|e| {e.into()});
49                rdy    
50            })
51            .map(move |resp| {                                  
52                match resp {
53                    Message::WithoutBody(str) => {
54                        panic!("Not implemented: {}", str)
55                    },
56                    Message::WithBody(head, body) => {                
57                        ResponseStream { inner: body }
58                    }
59                }
60            });
61
62        Box::new(resp)
63    } 
64
65    #[allow(unused_variables)]
66    pub fn fin(&self, message_id: String) -> Box<Future<Item = (), Error = io::Error>> {
67        let mut request = RequestMessage::new();
68        request.create_fin_command(message_id);        
69        
70        let service = self.inner.clone();
71        let resp = service.inner.call(Message::WithoutBody(request))
72            .map_err(|e| e.into())
73            .and_then(|resp| future::ok(()));
74
75        Box::new(resp)
76    }    
77
78    #[allow(unused_variables)]
79    pub fn nop(&self) -> Box<Future<Item = (), Error = io::Error>> {
80        let mut request = RequestMessage::new();
81        request.create_nop_command();        
82        
83        let service = self.inner.clone();
84        let resp = service.inner.call(Message::WithoutBody(request))
85            .map_err(|e| e.into())
86            .and_then(|resp| future::ok(()));
87
88        Box::new(resp)
89    }    
90}
91
92impl<T> Service for ClientTypeMap<T>
93    where T: Service<Request = RequestMessage, Response = NsqResponseMessage, Error = io::Error>,
94          T::Future: 'static
95{
96    type Request = RequestMessage;
97    type Response = NsqResponseMessage;
98    type Error = io::Error;
99    type Future = Box<Future<Item = NsqResponseMessage, Error = io::Error>>;
100
101    fn call(&self, req: RequestMessage) -> Self::Future {
102        Box::new(self.inner.call(req))
103    }
104}