lapin_async/
connection.rs

1use amq_protocol::{
2  frame::AMQPFrame,
3  tcp::TcpStream,
4  uri::AMQPUri,
5};
6use mio::{Evented, Poll, PollOpt, Ready, Token};
7use log::{debug, error, trace};
8
9use std::{
10  io,
11  thread::JoinHandle,
12};
13
14use crate::{
15  channel::{Channel, Reply},
16  channels::Channels,
17  confirmation::Confirmation,
18  configuration::Configuration,
19  connection_properties::ConnectionProperties,
20  connection_status::{ConnectionStatus, ConnectionState},
21  error::{Error, ErrorKind},
22  error_handler::ErrorHandler,
23  frames::{Frames, Priority, SendId},
24  io_loop::{IoLoop, IoLoopHandle},
25  registration::Registration,
26  tcp::AMQPUriTcpExt,
27  types::ShortUInt,
28  wait::Wait,
29};
30
31#[derive(Clone, Debug)]
32#[deprecated(note = "use lapin instead")]
33pub struct Connection {
34  configuration: Configuration,
35  status:        ConnectionStatus,
36  channels:      Channels,
37  registration:  Registration,
38  frames:        Frames,
39  io_loop:       IoLoopHandle,
40  error_handler: ErrorHandler,
41}
42
43impl Default for Connection {
44  fn default() -> Self {
45    let connection = Self {
46      configuration: Configuration::default(),
47      status:        ConnectionStatus::default(),
48      channels:      Channels::default(),
49      registration:  Registration::default(),
50      frames:        Frames::default(),
51      io_loop:       IoLoopHandle::default(),
52      error_handler: ErrorHandler::default(),
53    };
54
55    connection.channels.create_zero(connection.clone());
56    connection
57  }
58}
59
60impl Evented for Connection {
61  fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
62    self.registration.register(poll, token, interest, opts)
63  }
64
65  fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
66    self.registration.reregister(poll, token, interest, opts)
67  }
68
69  fn deregister(&self, poll: &Poll) -> io::Result<()> {
70    self.registration.deregister(poll)
71  }
72}
73
74impl Connection {
75  /// Connect to an AMQP Server
76  #[deprecated(note = "use lapin instead")]
77  pub fn connect(uri: &str, options: ConnectionProperties) -> Confirmation<Connection> {
78    Connect::connect(uri, options)
79  }
80
81  /// Connect to an AMQP Server
82  #[deprecated(note = "use lapin instead")]
83  pub fn connect_uri(uri: AMQPUri, options: ConnectionProperties) -> Confirmation<Connection> {
84    Connect::connect(uri, options)
85  }
86
87  #[deprecated(note = "use lapin instead")]
88  pub fn create_channel(&self) -> Confirmation<Channel> {
89    if !self.status.connected() {
90      return Confirmation::new_error(ErrorKind::InvalidConnectionState(self.status.state()).into());
91    }
92    match self.channels.create(self.clone()) {
93      Ok(channel) => channel.channel_open(),
94      Err(error)  => Confirmation::new_error(error),
95    }
96  }
97
98  /// Block current thread while the connection is still active.
99  /// This is useful when you only have a consumer and nothing else keeping your application
100  /// "alive".
101  #[deprecated(note = "use lapin instead")]
102  pub fn run(&self) -> Result<(), Error> {
103    self.io_loop.wait()
104  }
105
106  #[deprecated(note = "use lapin instead")]
107  pub fn on_error<E: Fn() + Send + 'static>(&self, handler: Box<E>) {
108    self.error_handler.set_handler(handler);
109  }
110
111  #[deprecated(note = "use lapin instead")]
112  pub fn configuration(&self) -> &Configuration {
113    &self.configuration
114  }
115
116  #[deprecated(note = "use lapin instead")]
117  pub fn status(&self) -> &ConnectionStatus {
118    &self.status
119  }
120
121  pub(crate) fn flow(&self) -> bool {
122    self.channels.flow()
123  }
124
125  #[deprecated(note = "use lapin instead")]
126  pub fn close(&self, reply_code: ShortUInt, reply_text: &str) -> Confirmation<()> {
127    self.channels.get(0).expect("channel 0").connection_close(reply_code, reply_text, 0, 0)
128  }
129
130  pub(crate) fn set_io_loop(&self, io_loop: JoinHandle<Result<(), Error>>) {
131    self.io_loop.register(io_loop);
132  }
133
134  pub(crate) fn drop_pending_frames(&self) {
135    self.frames.drop_pending();
136  }
137
138  fn connector(options: ConnectionProperties) -> impl FnOnce(TcpStream, AMQPUri) -> Result<(Wait<Connection>, IoLoop<TcpStream>), Error> + 'static {
139    move |stream, uri| {
140      let conn = Connection::default();
141      conn.status.set_vhost(&uri.vhost);
142      if let Some(frame_max) = uri.query.frame_max {
143        conn.configuration.set_frame_max(frame_max);
144      }
145      if let Some(channel_max) = uri.query.channel_max {
146        conn.configuration.set_channel_max(channel_max);
147      }
148      if let Some(heartbeat) = uri.query.heartbeat {
149        conn.configuration.set_heartbeat(heartbeat);
150      }
151      conn.send_frame(0, Priority::CRITICAL, AMQPFrame::ProtocolHeader, None)?;
152      let (wait, wait_handle) = Wait::new();
153      conn.set_state(ConnectionState::SentProtocolHeader(wait_handle, uri.authority.userinfo.into(), options));
154      let io_loop = IoLoop::new(conn.clone(), stream)?;
155      Ok((wait, io_loop))
156    }
157  }
158
159  pub(crate) fn set_state(&self, state: ConnectionState) {
160    self.status.set_state(state);
161  }
162
163  pub(crate) fn block(&self) {
164    self.status.block();
165  }
166
167  pub(crate) fn unblock(&self) {
168    self.status.unblock();
169  }
170
171  fn set_readable(&self) -> Result<(), Error> {
172    trace!("connection set readable");
173    self.registration.set_readiness(Ready::readable()).map_err(ErrorKind::IOError)?;
174    Ok(())
175  }
176
177  pub(crate) fn send_frame(&self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
178    trace!("connection send_frame; channel_id={}", channel_id);
179    let wait = self.frames.push(channel_id, priority, frame, expected_reply);
180    self.set_readable()?;
181    Ok(wait)
182  }
183
184  pub(crate) fn next_expected_reply(&self, channel_id: u16) -> Option<Reply> {
185    self.frames.next_expected_reply(channel_id)
186  }
187
188  /// next message to send to the network
189  ///
190  /// returns None if there's no message to send
191  pub(crate) fn next_frame(&self) -> Option<(SendId, AMQPFrame)> {
192    self.frames.pop(self.flow())
193  }
194
195  /// updates the current state with a new received frame
196  pub(crate) fn handle_frame(&self, f: AMQPFrame) -> Result<(), Error> {
197    trace!("will handle frame: {:?}", f);
198    match f {
199      AMQPFrame::ProtocolHeader => {
200        error!("error: the client should not receive a protocol header");
201        self.set_error()?;
202      },
203      AMQPFrame::Method(channel_id, method) => {
204        self.channels.receive_method(channel_id, method)?;
205      },
206      AMQPFrame::Heartbeat(_) => {
207        debug!("received heartbeat from server");
208      },
209      AMQPFrame::Header(channel_id, _, header) => {
210        self.channels.handle_content_header_frame(channel_id, header.body_size, header.properties)?;
211      },
212      AMQPFrame::Body(channel_id, payload) => {
213        self.channels.handle_body_frame(channel_id, payload)?;
214      }
215    };
216    Ok(())
217  }
218
219  pub(crate) fn send_heartbeat(&self) -> Result<(), Error> {
220    self.set_readable()?;
221    self.send_frame(0, Priority::CRITICAL, AMQPFrame::Heartbeat(0), None)?;
222    Ok(())
223  }
224
225  pub(crate) fn requeue_frame(&self, send_id: SendId, frame: AMQPFrame) -> Result<(), Error> {
226    self.set_readable()?;
227    self.frames.retry(send_id, frame);
228    Ok(())
229  }
230
231  pub(crate) fn mark_sent(&self, send_id: SendId) {
232    self.frames.mark_sent(send_id);
233  }
234
235  pub(crate) fn remove_channel(&self, channel_id: u16) -> Result<(), Error> {
236    self.frames.clear_expected_replies(channel_id);
237    self.channels.remove(channel_id)
238  }
239
240  pub(crate) fn set_closing(&self) {
241    self.set_state(ConnectionState::Closing);
242    self.channels.set_closing();
243  }
244
245  pub(crate) fn set_closed(&self) -> Result<(), Error> {
246    self.set_state(ConnectionState::Closed);
247    self.channels.set_closed()
248  }
249
250  pub(crate) fn set_error(&self) -> Result<(), Error> {
251    error!("Connection error");
252    self.set_state(ConnectionState::Error);
253    self.channels.set_error()?;
254    self.error_handler.on_error();
255    Ok(())
256  }
257}
258
259/// Trait providing a method to connect to an AMQP server
260#[deprecated(note = "use lapin instead")]
261pub trait Connect {
262  /// connect to an AMQP server
263  fn connect(self, options: ConnectionProperties) -> Confirmation<Connection> where Self: Sized {
264    self.connect_raw(options).into()
265  }
266
267  /// connect to an AMQP server, for internal use
268  fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error>;
269}
270
271impl Connect for AMQPUri {
272  fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error> {
273    let (conn, io_loop) = AMQPUriTcpExt::connect(self, Connection::connector(options)).map_err(ErrorKind::IOError)??;
274    io_loop.run()?;
275    Ok(conn)
276  }
277}
278
279impl Connect for &str {
280  fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error> {
281    let (conn, io_loop) = AMQPUriTcpExt::connect(self, Connection::connector(options)).map_err(ErrorKind::IOError)??;
282    io_loop.run()?;
283    Ok(conn)
284  }
285}
286
287#[cfg(test)]
288mod tests {
289  use env_logger;
290
291  use super::*;
292  use crate::BasicProperties;
293  use crate::channel_status::ChannelState;
294  use crate::consumer::ConsumerSubscriber;
295  use crate::message::Delivery;
296  use crate::types::ShortString;
297  use amq_protocol::protocol::{basic, AMQPClass};
298  use amq_protocol::frame::AMQPContentHeader;
299
300  #[derive(Clone,Debug,PartialEq)]
301  struct DummySubscriber;
302
303  impl ConsumerSubscriber for DummySubscriber {
304    fn new_delivery(&self, _delivery: Delivery) {}
305    fn drop_prefetched_messages(&self) {}
306    fn cancel(&self) {}
307  }
308
309  #[test]
310  fn basic_consume_small_payload() {
311    let _ = env_logger::try_init();
312
313    use crate::consumer::Consumer;
314    use crate::queue::{Queue, QueueState};
315
316    // Bootstrap connection state to a consuming state
317    let conn = Connection::default();
318    conn.set_state(ConnectionState::Connected);
319    conn.configuration.set_channel_max(2047);
320    let channel = conn.channels.create(conn.clone()).unwrap();
321    channel.set_state(ChannelState::Connected);
322    let queue_name = ShortString::from("consumed");
323    let mut queue: QueueState = Queue::new(queue_name.clone(), 0, 0).into();
324    let consumer_tag = ShortString::from("consumer-tag");
325    let consumer = Consumer::new(consumer_tag.clone(), false, false, false, Box::new(DummySubscriber));
326    queue.register_consumer(consumer_tag.clone(), consumer);
327    conn.channels.get(channel.id()).map(|c| {
328      c.register_queue(queue);
329    });
330    // Now test the state machine behaviour
331    {
332      let deliver_frame = AMQPFrame::Method(
333        channel.id(),
334        AMQPClass::Basic(
335          basic::AMQPMethod::Deliver(
336            basic::Deliver {
337              consumer_tag: consumer_tag.clone(),
338              delivery_tag: 1,
339              redelivered: false,
340              exchange: "".into(),
341              routing_key: queue_name.clone(),
342            }
343          )
344        )
345      );
346      conn.handle_frame(deliver_frame).unwrap();
347      let channel_state = channel.status().state();
348      let expected_state = ChannelState::WillReceiveContent(
349        Some(queue_name.clone()),
350        Some(consumer_tag.clone())
351      );
352      assert_eq!(channel_state, expected_state);
353    }
354    {
355      let header_frame = AMQPFrame::Header(
356        channel.id(),
357        60,
358        Box::new(AMQPContentHeader {
359          class_id: 60,
360          weight: 0,
361          body_size: 2,
362          properties: BasicProperties::default(),
363        })
364      );
365      conn.handle_frame(header_frame).unwrap();
366      let channel_state = channel.status().state();
367      let expected_state = ChannelState::ReceivingContent(Some(queue_name.clone()), Some(consumer_tag.clone()), 2);
368      assert_eq!(channel_state, expected_state);
369    }
370    {
371      let body_frame = AMQPFrame::Body(channel.id(), "{}".as_bytes().to_vec());
372      conn.handle_frame(body_frame).unwrap();
373      let channel_state = channel.status().state();
374      let expected_state = ChannelState::Connected;
375      assert_eq!(channel_state, expected_state);
376    }
377  }
378
379  #[test]
380  fn basic_consume_empty_payload() {
381    let _ = env_logger::try_init();
382
383    use crate::consumer::Consumer;
384    use crate::queue::{Queue, QueueState};
385
386    // Bootstrap connection state to a consuming state
387    let conn = Connection::default();
388    conn.set_state(ConnectionState::Connected);
389    conn.configuration.set_channel_max(2047);
390    let channel = conn.channels.create(conn.clone()).unwrap();
391    channel.set_state(ChannelState::Connected);
392    let queue_name = ShortString::from("consumed");
393    let mut queue: QueueState = Queue::new(queue_name.clone(), 0, 0).into();
394    let consumer_tag = ShortString::from("consumer-tag");
395    let consumer = Consumer::new(consumer_tag.clone(), false, false, false, Box::new(DummySubscriber));
396    queue.register_consumer(consumer_tag.clone(), consumer);
397    conn.channels.get(channel.id()).map(|c| {
398      c.register_queue(queue);
399    });
400    // Now test the state machine behaviour
401    {
402      let deliver_frame = AMQPFrame::Method(
403        channel.id(),
404        AMQPClass::Basic(
405          basic::AMQPMethod::Deliver(
406            basic::Deliver {
407              consumer_tag: consumer_tag.clone(),
408              delivery_tag: 1,
409              redelivered: false,
410              exchange: "".into(),
411              routing_key: queue_name.clone(),
412            }
413          )
414        )
415      );
416      conn.handle_frame(deliver_frame).unwrap();
417      let channel_state = channel.status().state();
418      let expected_state = ChannelState::WillReceiveContent(
419        Some(queue_name.clone()),
420        Some(consumer_tag.clone())
421      );
422      assert_eq!(channel_state, expected_state);
423    }
424    {
425      let header_frame = AMQPFrame::Header(
426        channel.id(),
427        60,
428        Box::new(AMQPContentHeader {
429          class_id: 60,
430          weight: 0,
431          body_size: 0,
432          properties: BasicProperties::default(),
433        })
434      );
435      conn.handle_frame(header_frame).unwrap();
436      let channel_state = channel.status().state();
437      let expected_state = ChannelState::Connected;
438      assert_eq!(channel_state, expected_state);
439    }
440  }
441}