nsq_client/msgs.rs
1// MIT License
2//
3// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
4// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12//
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23
24use actix::prelude::*;
25
26use crate::codec::Cmd;
27use crate::error::Error;
28use crate::auth::AuthResp;
29use crate::config::NsqdConfig;
30
31pub trait NsqMsg: Message<Result = ()> + Send + 'static {}
32
33impl<M> NsqMsg for M
34where
35 M: Message<Result = ()> + Send + 'static
36{}
37
38#[derive(Message)]
39pub struct AddHandler<M: NsqMsg>(pub Recipient<M>);
40
41/// Message sent by nsqd
42///
43/// # Examples
44/// ```no-run
45/// struct Consumer(Addr<Connection>);
46///
47/// impl Actor for Consumer {
48/// type Context = Context<Self>;
49/// fn handle(&mut self, ctx: &mut Self::Context) {
50/// self.subscribe::<Msg>(ctx, self.0.clone());
51/// }
52/// }
53///
54/// fn Handler<Msg> for Consumer {
55/// type Result = ();
56/// fn handle(&mut self, msg: Msg, _: &mut Self::Context) {
57/// println!("timestamp: {}", msg.timestamp);
58/// println!("attemps: {}", msg.attemps);
59/// println!("id: {}", msg.id);
60/// println!("data: {}", msg.body);
61/// println!("msg debug: {:?}", msg);
62/// }
63/// }
64///
65/// ```
66#[derive(Clone, Debug, Message)]
67pub struct Msg
68{
69 /// Timestamp of the message
70 pub timestamp: i64,
71 /// Number of attemps reader tried to process the message
72 pub attemps: u16,
73 /// Id of the message
74 pub id: String,
75 /// Data sent by nsqd
76 pub body: Vec<u8>,
77}
78
79impl Default for Msg {
80 fn default() -> Self {
81 Self {
82 timestamp: 0,
83 attemps: 0,
84 id: "".to_owned(),
85 body: Vec::new(),
86 }
87 }
88}
89
90/// Sent by [Connection](struct.Connection.html) every time in_fligth is increased or decreased
91#[derive(Message)]
92pub struct InFlight(pub u32);
93
94#[derive(Message)]
95pub struct Auth;
96
97#[derive(Message)]
98pub struct Sub;
99
100/// Allows Consumer to change Connection/s RDY
101///
102/// # Examples
103/// ```no-run
104/// struct Consumer(Addr<Connection>);
105///
106/// impl Actor for Consumer {
107/// type Context = Context<Self>;
108/// fn started(&mut self, _: &mut Self::Context) {
109/// self.conn.do_send(Ready(3));
110/// }
111/// }
112/// ```
113#[derive(Message)]
114pub struct Ready(pub u32);
115
116/// Allows Consumer to set Connection on backoff state
117///
118/// # Examples
119/// ```no-run
120/// use actix::prelude::*;
121/// use nsq_client::{Subscribe, Connection, Backoff, OnBackoff, OnResume, InFlight, Ready};
122/// struct Consumer{
123/// conn: Addr<Connection>,
124/// backoff: bool,
125/// };
126///
127/// impl Actor for Consumer {
128/// type Context = Context<Self>;
129/// fn started(&mut self, _: &mut Self::Context) {
130/// self.subscribe::<OnBackoff>(ctx, self.0.clone());
131/// self.subscribe::<OnResume>(ctx, self.0.clone());
132/// self.subscribe::<InFlight>(ctx, self.0.clone());
133/// self.conn.do_send(Ready(10));
134/// self.conn.do_send(Backoff);
135/// }
136/// }
137///
138/// impl Handler<OnBackoff> for Consumer {
139/// type Result = ();
140/// fn handle(&mut self, msg: OnBackoff, _: &mut Self::Context) {
141/// println!("Connection in Backoff");
142/// self.backoff = true;
143/// }
144/// }
145///
146/// impl Handler<OnResume> for Consumer {
147/// type Result = ();
148/// fn handle(&mut self, msg: OnResume, _: &mut Self::Context) {
149/// println!("Connection resuming from backoff");
150/// self.backoff = false;
151/// }
152/// }
153///
154/// impl Handler<OnInflight> for Consumer {
155/// type Result = ();
156/// fn handle(&mut self, msg: OnInFlight, _: &mut Self::Context) {
157/// match msg.0 {
158/// 0 => { println!("backoff state: {}", self.backoff) },
159/// 1 => { println!("resuming from backoff") },
160/// _ => { println!("throttle") },
161/// }
162/// }
163/// }
164/// ```
165#[derive(Message)]
166pub struct Backoff;
167
168#[derive(Message)]
169pub struct Resume;
170
171/// Send FIN command to nsqd
172///
173/// Args:
174/// * id - id of the message
175///
176/// # Examples
177/// ```no-run
178/// use actix::prelude::*;
179/// use nsq_client::{Connection, Subscribe, Msg, Fin};
180///
181/// struct Consumer(pub Addr<Connection>);
182///
183/// impl Actor for Consumer {
184/// type Context = Context<Self>;
185/// fn started(&mut self, ctx: &mut Self::Context) {
186/// self.subscribe::<Msg>(ctx, self.0.clone());
187/// }
188/// }
189///
190/// impl Handler<Msg> for Consumer {
191/// type Result = ();
192/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) {
193/// self.0.do_send(Fin(msg.id));
194/// }
195/// }
196/// ```
197#[derive(Message, Clone)]
198pub struct Fin(pub String);
199
200/// Send REQ command to nsqd
201///
202/// Args:
203/// * id - id of the message
204/// * timeout - time spent before message is re-sent by nsqd, 0 will not defer requeuing
205///
206/// # Examples
207/// ```no-run
208/// use actix::prelude::*;
209/// use nsq_client::{Connection, Subscribe, Requeue, Fin};
210///
211/// struct Consumer(pub Addr<Connection>);
212///
213/// impl Actor for Consumer {
214/// type Context = Context<Self>;
215/// fn started(&mut self, ctx: &mut Self::Context) {
216/// self.subscribe::<Msg>(ctx, self.0.clone());
217/// }
218/// }
219///
220/// impl Handler<Msg> for Consumer {
221/// type Result = ();
222/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) {
223/// self.0.do_send(Requeue(msg.id, 2));
224/// }
225/// }
226/// ```
227#[derive(Message, Clone)]
228pub struct Reqeue(pub String, u32);
229
230/// Send TOUCH command to nsqd (reset timeout for and in-flight message)
231///
232/// Args:
233/// * id - id of the message
234///
235/// # Examples
236/// ```no-run
237/// use actix::prelude::*;
238/// use nsq_client::{Connection, Subscribe, Touch, Fin};
239///
240/// struct Consumer(pub Addr<Connection>);
241///
242/// impl Actor for Consumer {
243/// type Context = Context<Self>;
244/// fn started(&mut self, ctx: &mut Self::Context) {
245/// self.subscribe::<Msg>(ctx, self.0.clone());
246/// }
247/// }
248///
249/// impl Handler<Msg> for Consumer {
250/// type Result = ();
251/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) {
252/// self.0.do_send(Touch(msg.id));
253/// }
254/// }
255/// ```
256#[derive(Message, Clone)]
257pub struct Touch(pub String);
258
259/// Sent by [Connection](struct.Connection.html) if auth be successful
260///
261/// # Examples
262/// ```no-run
263/// use actix::prelude::*;
264/// use nsq_client::{Connection, Subscribe, OnAuth};
265///
266/// struct Consumer(pub Addr<Connection>);
267///
268/// impl Actor for Consumer {
269/// type Context = Context<Self>;
270/// fn started(&mut self, ctx: &mut Self::Context) {
271/// self.subscribe::<OnAuth>(ctx, self.0.clone());
272/// }
273/// }
274///
275/// impl Handler<OnAuth> for Consumer {
276/// type Result = ();
277/// fn handle(&mut self, msg: OnAuth, ctx: &mut Self::Conetxt) {
278/// println!("authenticated: {:?}", msg.0);
279/// }
280/// }
281/// ```
282#[derive(Message, Clone)]
283pub struct OnAuth(pub AuthResp);
284
285/// Sent by [Connection](struct.Connection.html) after identify succeeds
286///
287/// # Examples
288/// ```no-run
289/// use actix::prelude::*;
290/// use nsq_client::{Connection, Subscribe, OnIdentify};
291///
292/// struct Consumer(pub Addr<Connection>);
293///
294/// impl Actor for Consumer {
295/// type Context = Context<Self>;
296/// fn started(&mut self, ctx: &mut Self::Context) {
297/// self.subscribe::<OnIdentify>(ctx, self.0.clone());
298/// }
299/// }
300///
301/// impl Handler<OnIdentify> for Consumer {
302/// type Result = ();
303/// fn handle(&mut self, msg: OnIdentify, ctx: &mut Self::Conetxt) {
304/// println!("identified: {:?}", msg.0);
305/// }
306/// }
307/// ```
308#[derive(Message, Clone)]
309pub struct OnIdentify(pub NsqdConfig);
310
311/// Sent by [Connection](struct.Connection.html) after CLS is sent to nsqd
312///
313/// # Examples
314/// ```no-run
315/// use actix::prelude::*;
316/// use nsq_client::{Connection, Subscribe, OnClose};
317///
318/// struct Consumer(pub Addr<Connection>);
319///
320/// impl Actor for Consumer {
321/// type Context = Context<Self>;
322/// fn started(&mut self, ctx: &mut Self::Context) {
323/// self.subscribe::<OnClose>(ctx, self.0.clone());
324/// }
325/// }
326///
327/// impl Handler<OnClose> for Consumer {
328/// type Result = ();
329/// fn handle(&mut self, msg: OnClose, ctx: &mut Self::Conetxt) {
330/// if msg.0 == true {
331/// println!("connection closed");
332/// } else {
333/// println!("connection closing failed");
334/// }
335/// }
336/// }
337/// ```
338#[derive(Message, Clone)]
339pub struct OnClose(pub bool);
340
341/// Sent by [Connection](struct.Connection.html) after Backoff state is activated
342///
343/// # Examples
344/// ```no-run
345/// use actix::prelude::*;
346/// use nsq_client::{Connection, Subscribe, OnBackoff};
347///
348/// struct Consumer(pub Addr<Connection>);
349///
350/// impl Actor for Consumer {
351/// type Context = Context<Self>;
352/// fn started(&mut self, ctx: &mut Self::Context) {
353/// self.subscribe::<OnBackoff>(ctx, self.0.clone());
354/// }
355/// }
356///
357/// impl Handler<OnBackoff> for Consumer {
358/// type Result = ();
359/// fn handle(&mut self, msg: OnBackoff, ctx: &mut Self::Conetxt) {
360/// println!("connection backoff activated");
361/// }
362/// }
363/// ```
364#[derive(Message, Clone)]
365pub struct OnBackoff;
366
367/// Sent by [Connection](struct.Connection.html) after Backoff state is terminated
368///
369/// # Examples
370/// ```no-run
371/// use actix::prelude::*;
372/// use nsq_client::{Connection, Subscribe, OnResume};
373///
374/// struct Consumer(pub Addr<Connection>);
375///
376/// impl Actor for Consumer {
377/// type Context = Context<Self>;
378/// fn started(&mut self, ctx: &mut Self::Context) {
379/// self.subscribe::<OnResume>(ctx, self.0.clone());
380/// }
381/// }
382///
383/// impl Handler<OnResume> for Consumer {
384/// type Result = ();
385/// fn handle(&mut self, msg: OnResume, ctx: &mut Self::Conetxt) {
386/// println!("resuming connection from backoff state");
387/// }
388/// }
389/// ```
390#[derive(Message, Clone)]
391pub struct OnResume;
392
393/// Allow Consumer to safefly close nsq [Connection](struct.Connection.html)
394///
395/// Send nsq CLS connand to nsqd
396/// # Examples
397/// ```no-run
398/// use actix::prelude::*;
399/// use nsq_client::{Cls, OnClose, Subscribe, Connection};
400///
401/// struct Consumer(Addr<Connection>);
402///
403/// impl Actor for Consumer {
404/// type Context = Context<Self>;
405/// fn started(&mut self, _: &mut Self::Context) {
406/// self.subscribe::<OnClose>(ctx, self.0.clone());
407/// self.0.do_send(Cls);
408/// }
409/// }
410///
411/// impl Handler<OnClose> for Consumer {
412/// type Result = ();
413/// fn handle(&mut self, msg: OnClose, _: &mut Self::Context) {
414/// if msg.0 {
415/// println!("Connection closed");
416/// } else {
417/// println!("Cannot close Connection");
418/// }
419/// }
420/// }
421/// ```
422#[derive(Message)]
423pub struct Cls;
424
425#[derive(Clone, Debug)]
426pub struct Pub(pub String);
427
428impl Message for Pub
429{
430 type Result = Result<Cmd, Error>;
431}
432