1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#![deny(missing_docs)]
use std::net::ToSocketAddrs;
use byteorder::{BigEndian, ByteOrder};
use futures::{Future, Stream};
use tokio_core::net::TcpStream as TokioTcp;
use tokio_io::AsyncRead;
use tokio_io::codec::{FramedWrite, length_delimited};
use tokio_io::io::write_all;
use config::NsqConfig;
use errors::{NsqResult, NsqError};
use message::{Handler, MessageBuilder, MessageReply, NsqResponder};
use nsq_conn::NsqConn;
pub struct Consumer {
channel: String,
config: NsqConfig,
conn: Option<NsqConn>,
handler: Option<Box<Handler>>,
topic: String,
}
impl Consumer {
pub fn new(topic: &str, channel: &str, config: NsqConfig) -> Consumer {
Consumer {
channel: channel.to_owned(),
config: config,
conn: None,
handler: None,
topic: topic.to_owned(),
}
}
pub fn connect_to_nsqd<A: ToSocketAddrs>(&mut self, addr: A) -> NsqResult<()> {
let conn = NsqConn::new(addr)?;
self.conn = Some(conn);
Ok(())
}
pub fn add_handler<H>(&mut self, handler: H)
where
H: Handler + 'static,
{
self.handler = Some(Box::new(handler));
}
pub fn begin_consuming(self) -> NsqResult<()> {
match self.conn {
Some(_) => {
self.read_loop();
Ok(())
}
None => Err(NsqError::InvalidConn),
}
}
}
impl Consumer {
fn read_loop(self) {
let mut conn = self.conn.unwrap();
let sock_clone = conn.socket.try_clone().expect("cloning TCP socket");
let stream_sock = TokioTcp::from_stream(conn.socket, &conn.event_loop.handle()).unwrap();
let (stream_read, stream_write) = stream_sock.split();
let framed_sock = TokioTcp::from_stream(sock_clone, &conn.event_loop.handle()).unwrap();
let subscribe = format!("SUB {} {}\n", self.topic, self.channel);
let ready_count = format!("RDY {}\n", self.config.max_in_flight());
let prelude = write_all(stream_write, b" V2")
.and_then(|(stream, _)| write_all(stream, subscribe.as_bytes()))
.and_then(|(stream, _)| write_all(stream, ready_count.as_bytes()));
let framed_writer = FramedWrite::new(framed_sock, NsqResponder::default());
let framed_read = length_delimited::Builder::new()
.length_field_length(4)
.new_read(stream_read);
let handler = self.handler;
let reader = framed_read
.map(|mut buf| {
let frame_type = BigEndian::read_i32(buf.as_ref());
buf.split_to(4);
let mut response = MessageReply::Nop;
if frame_type == 2 {
let time_bytes = buf.split_to(8);
let time = BigEndian::read_i64(time_bytes.as_ref());
let attempt_bytes = buf.split_to(2);
let attempts = BigEndian::read_u16(attempt_bytes.as_ref());
let id = buf.split_to(16);
let message = MessageBuilder::default()
.timestamp(time)
.attempts(attempts)
.id(id)
.body(buf)
.build()
.unwrap();
match handler {
Some(ref h) => response = h.handle_message(&message),
None => {}
}
}
response
})
.forward(framed_writer);
conn.event_loop.run(prelude.join(reader)).unwrap();
}
}