1use std::io;
25use std::collections::VecDeque;
26
27use actix::actors::resolver::{Connect, Resolver};
28use actix::prelude::*;
29use futures::unsync::oneshot;
30use futures::Future;
31use backoff::backoff::Backoff;
32use backoff::ExponentialBackoff;
33use log::{error, info, debug};
34use serde_json;
35use tokio_codec::FramedRead;
36use tokio_io::io::WriteHalf;
37use tokio_io::AsyncRead;
38use tokio_tcp::TcpStream;
39use crate::codec::{NsqCodec, Cmd};
42use crate::commands::{identify, nop, auth, sub, rdy, publish, VERSION};
43use crate::config::{Config, NsqdConfig};
44use crate::error::Error;
45use crate::msgs::{Auth, Pub, Sub, Ready};
46use crate::conn::ConnState;
47
48pub struct Producer
49{
50 topic: String,
51 channel: String,
52 addr: String,
53 config: Config,
54 backoff: ExponentialBackoff,
55 cell: Option<actix::io::FramedWrite<WriteHalf<TcpStream>, NsqCodec>>,
56 state: ConnState,
57 queue: VecDeque<oneshot::Sender<Result<Cmd, Error>>>,
58 auth: String,
59}
61
62impl Default for Producer
63{
64 fn default() -> Producer {
65 Producer {
66 topic: String::new(),
67 channel: String::new(),
68 addr: String::new(),
69 config: Config::default(),
70 backoff: ExponentialBackoff::default(),
71 cell: None,
72 state: ConnState::Neg,
73 queue: VecDeque::new(),
74 auth: String::new(),
75 }
77 }
78}
79
80impl Producer
81{
82 pub fn new<S: Into<String>>(
83 topic: S,
84 channel: S,
85 addr: S,
86 config: Option<Config>,
87 auth: S,
88 ) -> Producer {
90 let mut backoff = ExponentialBackoff::default();
91 let mut _rdy = 0;
92 let cfg = match config {
94 Some(cfg) => cfg,
95 None => Config::default(),
96 };
97 backoff.max_elapsed_time = None;
98 Producer {
99 addr: addr.into(),
100 config: cfg,
101 backoff,
102 cell: None,
103 topic: topic.into(),
104 channel: channel.into(),
105 state: ConnState::Neg,
106 queue: VecDeque::new(),
107 auth: auth.into(),
108 }
110 }
111}
112
113impl Actor for Producer
114{
115 type Context = Context<Self>;
116
117 fn started(&mut self, ctx: &mut Context<Self>) {
118 Resolver::from_registry()
119 .send(Connect::host(self.addr.as_str()))
120 .into_actor(self)
121 .map(|res, act, ctx| match res {
122 Ok(stream) => {
123 info!("Connected to nsqd: {}", act.addr);
124
125 let (r, w) = stream.split();
126
127 let mut framed =
129 actix::io::FramedWrite::new(w, NsqCodec{}, ctx);
130 let mut rx = FramedRead::new(r, NsqCodec{});
131
132 framed.write(Cmd::Magic(VERSION));
134 let json = match serde_json::to_string(&act.config) {
136 Ok(s) => s,
137 Err(e) => {
138 error!("Config cannot be formatted as json string: {}", e);
139 return ctx.stop();
140 }
141 };
142 ctx.add_stream(rx);
144 framed.write(identify(json));
145 act.cell = Some(framed);
146
147 act.backoff.reset();
149 }
150 Err(err) => {
151 error!("Can not connect to nsqd: {}", err);
152 if let Some(timeout) = act.backoff.next_backoff() {
153 ctx.run_later(timeout, |_, ctx| ctx.stop());
154 }
155 }
156 })
157 .map_err(|err, act, ctx| {
158 error!("Can not connect to nsqd: {}", err);
159 if let Some(timeout) = act.backoff.next_backoff() {
160 ctx.run_later(timeout, |_, ctx| ctx.stop());
161 }
162 })
163 .wait(ctx);
164 }
165}
166
167impl actix::io::WriteHandler<io::Error> for Producer
168{
169 fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
170 error!("nsqd connection dropped: {} error: {}", self.addr, err);
171 Running::Stop
172 }
173}
174
175impl StreamHandler<Cmd, Error> for Producer
177{
178 fn error(&mut self, err: Error, _ctx: &mut Self::Context) -> Running {
179 match err {
180 Error::Remote(err) => {
181 if let Some(tx) = self.queue.pop_front() {
182 let _ = tx.send(Err(Error::Remote(err)));
183 }
184 return Running::Continue
185 },
186 _ => {
187 error!("Something goes wrong decoding message");
188 },
189 };
190 Running::Stop
191 }
192
193 fn handle(&mut self, msg: Cmd, ctx: &mut Self::Context)
194 {
195 match msg {
196 Cmd::Heartbeat => {
197 debug!("received heartbeat");
198 if let Some(ref mut cell) = self.cell {
199 cell.write(nop());
200 } else {
201 error!("Nsqd connection dropped. trying reconnecting");
202 ctx.stop();
203 }
204 }
205 Cmd::Response(s) => {
206 match self.state {
207 ConnState::Neg => {
208 let config: NsqdConfig = match serde_json::from_str(s.as_str()) {
209 Ok(s) => s,
210 Err(err) => {
211 error!("Negotiating json response invalid: {:?}", err);
212 return ctx.stop();
213 }
214 };
215 debug!("json response: {:?}", config);
216 if config.auth_required {
217 ctx.notify(Auth);
218 } else {
219 self.state = ConnState::Started;
221 }
222 },
223 ConnState::Sub => {
224 ctx.notify(Sub);
225 },
226 ConnState::Ready => {
227 debug!("sub response: {}", s);
228 ctx.notify(Ready(0));
229 }
230 _ => {
231 debug!("response: {}", s);
232 if let Some(tx) = self.queue.pop_front() {
233 let _ = tx.send(Ok(Cmd::Response(s)));
234 }
235 },
236 }
237 },
238 _ => {},
239 }
240 }
241}
242
243impl Handler<Auth> for Producer
244{
245 type Result = ();
246 fn handle(&mut self, _msg: Auth, ctx: &mut Self::Context) {
247 if let Some(ref mut cell) = self.cell {
248 cell.write(auth(self.auth.clone()));
249 } else {
250 error!("Unable to identify nsqd connection dropped");
251 ctx.stop();
252 }
253 self.state = ConnState::Sub;
254 }
255
256}
257
258impl Handler<Sub> for Producer
259{
260 type Result = ();
261
262 fn handle(&mut self, _msg: Sub, _ctx: &mut Self::Context) {
263 if let Some(ref mut cell) = self.cell {
264 let topic = self.topic.clone();
265 let channel = self.channel.clone();
266 cell.write(sub(topic.as_str(), channel.as_str()));
267 }
268 self.state = ConnState::Ready
269 }
270}
271
272impl Handler<Ready> for Producer
273{
274 type Result = ();
275
276 fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) {
277 if let Some(ref mut cell) = self.cell {
278 cell.write(rdy(msg.0));
279 }
280 if self.state != ConnState::Started { self.state = ConnState::Started }
281 }
282}
283
284
285impl Handler<Pub> for Producer
286{
287 type Result = ResponseFuture<Cmd, Error>;
288
289 fn handle(&mut self, msg: Pub, _ctx: &mut Self::Context) -> Self::Result {
290 let (tx, rx) = oneshot::channel();
291 if let Some(ref mut cell) = self.cell {
292 self.queue.push_back(tx);
293 let topic = self.topic.clone();
294 let m = &msg.0.clone();
295 println!("publish: {}", m);
296 cell.write(publish(topic.as_str(), &msg.0));
297 } else {
298 let _ = tx.send(Err(Error::NotConnected));
299 }
300 Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res))
301 }
302}
303
304
305impl Supervised for Producer
306{
307 fn restarting(&mut self, _: &mut Self::Context) {}
308}