lapin_async/
channel.rs

1use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
2use log::{debug, error, info, trace};
3
4use std::borrow::Borrow;
5
6use crate::{
7  BasicProperties,
8  acknowledgement::{Acknowledgements, DeliveryTag},
9  auth::Credentials,
10  channel_status::{ChannelStatus, ChannelState},
11  confirmation::Confirmation,
12  connection::Connection,
13  connection_status::ConnectionState,
14  consumer::{Consumer, ConsumerSubscriber},
15  error::{Error, ErrorKind},
16  frames::Priority,
17  id_sequence::IdSequence,
18  message::{BasicGetMessage, BasicReturnMessage, Delivery},
19  protocol::{self, AMQPClass, AMQPError, AMQPSoftError},
20  queue::Queue,
21  queues::Queues,
22  returned_messages::ReturnedMessages,
23  types::*,
24  wait::{Wait, WaitHandle},
25};
26
27#[cfg(test)]
28use crate::queue::QueueState;
29
30#[derive(Clone, Debug)]
31#[deprecated(note = "use lapin instead")]
32pub struct Channel {
33  id:                u16,
34  connection:        Connection,
35  status:            ChannelStatus,
36  acknowledgements:  Acknowledgements,
37  delivery_tag:      IdSequence<DeliveryTag>,
38  queues:            Queues,
39  returned_messages: ReturnedMessages,
40}
41
42impl Channel {
43  pub(crate) fn new(channel_id: u16, connection: Connection) -> Channel {
44    let returned_messages = ReturnedMessages::default();
45    Channel {
46      id:               channel_id,
47      connection,
48      status:           ChannelStatus::default(),
49      acknowledgements: Acknowledgements::new(returned_messages.clone()),
50      delivery_tag:     IdSequence::new(false),
51      queues:           Queues::default(),
52      returned_messages,
53    }
54  }
55
56  #[deprecated(note = "use lapin instead")]
57  pub fn status(&self) -> &ChannelStatus {
58    &self.status
59  }
60
61  pub(crate) fn set_closing(&self) {
62    self.set_state(ChannelState::Closing);
63  }
64
65  pub(crate) fn set_closed(&self) -> Result<(), Error> {
66    self.set_state(ChannelState::Closed);
67    self.connection.remove_channel(self.id)
68  }
69
70  pub(crate) fn set_error(&self) -> Result<(), Error> {
71    self.set_state(ChannelState::Error);
72    self.connection.remove_channel(self.id)
73  }
74
75  pub(crate) fn set_state(&self, state: ChannelState) {
76    self.status.set_state(state);
77  }
78
79  #[deprecated(note = "use lapin instead")]
80  pub fn id(&self) -> u16 {
81    self.id
82  }
83
84  #[deprecated(note = "use lapin instead")]
85  pub fn close(&self, reply_code: ShortUInt, reply_text: &str) -> Confirmation<()> {
86    self.do_channel_close(reply_code, reply_text, 0, 0)
87  }
88
89  #[deprecated(note = "use lapin instead")]
90  pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable, subscriber: Box<dyn ConsumerSubscriber>) -> Confirmation<ShortString> {
91    self.do_basic_consume(queue.borrow(), consumer_tag, options, arguments, subscriber)
92  }
93
94  #[deprecated(note = "use lapin instead")]
95  pub fn wait_for_confirms(&self) -> Confirmation<Vec<BasicReturnMessage>> {
96    if let Some(wait) = self.acknowledgements.get_last_pending() {
97      let returned_messages = self.returned_messages.clone();
98      Confirmation::new(wait).map(Box::new(move |_| returned_messages.drain()))
99    } else {
100      let (wait, wait_handle) = Wait::new();
101      wait_handle.finish(Vec::default());
102      Confirmation::new(wait)
103    }
104  }
105
106  #[cfg(test)]
107  pub(crate) fn register_queue(&self, queue: QueueState) {
108    self.queues.register(queue);
109  }
110
111  pub(crate) fn send_method_frame(&self, priority: Priority, method: AMQPClass, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
112    self.send_frame(priority, AMQPFrame::Method(self.id, method), expected_reply)
113  }
114
115  pub(crate) fn send_frame(&self, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
116    self.connection.send_frame(self.id, priority, frame, expected_reply)
117  }
118
119  fn send_content_frames(&self, class_id: u16, slice: &[u8], properties: BasicProperties) -> Result<Wait<()>, Error> {
120    let header = AMQPContentHeader {
121      class_id,
122      weight:    0,
123      body_size: slice.len() as u64,
124      properties,
125    };
126    let mut wait = self.send_frame(Priority::LOW, AMQPFrame::Header(self.id, class_id, Box::new(header)), None)?;
127
128    let frame_max = self.connection.configuration().frame_max();
129    //a content body frame 8 bytes of overhead
130    for chunk in slice.chunks(frame_max as usize - 8) {
131      wait = self.send_frame(Priority::LOW, AMQPFrame::Body(self.id, Vec::from(chunk)), None)?;
132    }
133    Ok(wait)
134  }
135
136  pub(crate) fn handle_content_header_frame(&self, size: u64, properties: BasicProperties) -> Result<(), Error> {
137    if let ChannelState::WillReceiveContent(queue_name, request_id_or_consumer_tag) = self.status.state() {
138      if size > 0 {
139        self.status.set_state(ChannelState::ReceivingContent(queue_name.clone(), request_id_or_consumer_tag.clone(), size as usize));
140      } else {
141        self.status.set_state(ChannelState::Connected);
142      }
143      if let Some(queue_name) = queue_name {
144        self.queues.handle_content_header_frame(queue_name.as_str(), request_id_or_consumer_tag, size, properties);
145      } else {
146        self.returned_messages.set_delivery_properties(properties);
147        if size == 0 {
148          self.returned_messages.new_delivery_complete();
149        }
150      }
151      Ok(())
152    } else {
153      self.set_error()
154    }
155  }
156
157  pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<(), Error> {
158    let payload_size = payload.len();
159
160    if let ChannelState::ReceivingContent(queue_name, request_id_or_consumer_tag, remaining_size) = self.status.state() {
161      if remaining_size >= payload_size {
162        if let Some(queue_name) = queue_name.as_ref() {
163          self.queues.handle_body_frame(queue_name.as_str(), request_id_or_consumer_tag.clone(), remaining_size, payload_size, payload);
164        } else {
165          self.returned_messages.receive_delivery_content(payload);
166          if remaining_size == payload_size {
167            self.returned_messages.new_delivery_complete();
168          }
169        }
170        if remaining_size == payload_size {
171          self.status.set_state(ChannelState::Connected);
172        } else {
173          self.status.set_state(ChannelState::ReceivingContent(queue_name, request_id_or_consumer_tag, remaining_size - payload_size));
174        }
175        Ok(())
176      } else {
177        error!("body frame too large");
178        self.set_error()
179      }
180    } else {
181        self.set_error()
182    }
183  }
184
185  fn acknowledgement_error(&self, error: Error, class_id: u16, method_id: u16) -> Result<(), Error> {
186    self.do_channel_close(AMQPSoftError::PRECONDITIONFAILED.get_id(), "precondition failed", class_id, method_id).as_error()?;
187    Err(error)
188  }
189
190  fn on_connection_start_ok_sent(&self, wait_handle: WaitHandle<Connection>, credentials: Credentials) -> Result<(), Error> {
191    self.connection.set_state(ConnectionState::SentStartOk(wait_handle, credentials));
192    Ok(())
193  }
194
195  fn on_connection_open_sent(&self, wait_handle: WaitHandle<Connection>) -> Result<(), Error> {
196    self.connection.set_state(ConnectionState::SentOpen(wait_handle));
197    Ok(())
198  }
199
200  fn on_connection_close_sent(&self) -> Result<(), Error> {
201    self.connection.set_closing();
202    Ok(())
203  }
204
205  fn on_connection_close_ok_sent(&self) -> Result<(), Error> {
206    self.connection.set_closed()
207  }
208
209  fn on_channel_close_sent(&self) -> Result<(), Error> {
210    self.set_closing();
211    Ok(())
212  }
213
214  fn on_channel_close_ok_sent(&self) -> Result<(), Error> {
215    self.set_closed()
216  }
217
218  fn on_basic_publish_sent(&self, class_id: u16, payload: Vec<u8>, properties: BasicProperties) -> Result<Wait<()>, Error> {
219    if self.status.confirm() {
220      let delivery_tag = self.delivery_tag.next();
221      self.acknowledgements.register_pending(delivery_tag);
222    };
223
224    self.send_content_frames(class_id, payload.as_slice(), properties)
225  }
226
227  fn on_basic_recover_async_sent(&self) -> Result<(), Error> {
228    self.queues.drop_prefetched_messages();
229    Ok(())
230  }
231
232  fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) -> Result<(), Error> {
233    if multiple && delivery_tag == 0 {
234      self.queues.drop_prefetched_messages();
235    }
236    Ok(())
237  }
238
239  fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) -> Result<(), Error> {
240    if multiple && delivery_tag == 0 {
241      self.queues.drop_prefetched_messages();
242    }
243    Ok(())
244  }
245
246  fn tune_connection_configuration(&self, channel_max: u16, frame_max: u32, heartbeat: u16) {
247    // If we disable the heartbeat (0) but the server don't, follow him and enable it too
248    // If both us and the server want heartbeat enabled, pick the lowest value.
249    if self.connection.configuration().heartbeat() == 0 || heartbeat != 0 && heartbeat < self.connection.configuration().heartbeat() {
250      self.connection.configuration().set_heartbeat(heartbeat);
251    }
252
253    if channel_max != 0 {
254      // 0 means we want to take the server's value
255      // If both us and the server specified a channel_max, pick the lowest value.
256      if self.connection.configuration().channel_max() == 0 || channel_max < self.connection.configuration().channel_max() {
257        self.connection.configuration().set_channel_max(channel_max);
258      }
259    }
260    if self.connection.configuration().channel_max() == 0 {
261      self.connection.configuration().set_channel_max(u16::max_value());
262    }
263
264    if frame_max != 0 {
265      // 0 means we want to take the server's value
266      // If both us and the server specified a frame_max, pick the lowest value.
267      if self.connection.configuration().frame_max() == 0 || frame_max < self.connection.configuration().frame_max() {
268        self.connection.configuration().set_frame_max(frame_max);
269      }
270    }
271    if self.connection.configuration().frame_max() == 0 {
272      self.connection.configuration().set_frame_max(u32::max_value());
273    }
274  }
275
276  fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<(), Error> {
277    trace!("Server sent connection::Start: {:?}", method);
278    let state = self.connection.status().state();
279    if let ConnectionState::SentProtocolHeader(wait_handle, credentials, mut options) = state {
280      let mechanism = options.mechanism.to_string();
281      let locale    = options.locale.clone();
282
283      if !method.mechanisms.split_whitespace().any(|m| m == mechanism) {
284        error!("unsupported mechanism: {}", mechanism);
285      }
286      if !method.locales.split_whitespace().any(|l| l == locale) {
287        error!("unsupported locale: {}", mechanism);
288      }
289
290      if !options.client_properties.contains_key("product") || !options.client_properties.contains_key("version") {
291        options.client_properties.insert("product".into(), AMQPValue::LongString(env!("CARGO_PKG_NAME").into()));
292        options.client_properties.insert("version".into(), AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()));
293      }
294
295      options.client_properties.insert("platform".into(), AMQPValue::LongString("rust".into()));
296
297      let mut capabilities = FieldTable::default();
298      capabilities.insert("publisher_confirms".into(), AMQPValue::Boolean(true));
299      capabilities.insert("exchange_exchange_bindings".into(), AMQPValue::Boolean(true));
300      capabilities.insert("basic.nack".into(), AMQPValue::Boolean(true));
301      capabilities.insert("consumer_cancel_notify".into(), AMQPValue::Boolean(true));
302      capabilities.insert("connection.blocked".into(), AMQPValue::Boolean(true));
303      capabilities.insert("authentication_failure_close".into(), AMQPValue::Boolean(true));
304
305      options.client_properties.insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
306
307      self.connection_start_ok(options.client_properties, &mechanism, &credentials.sasl_auth_string(options.mechanism), &locale, wait_handle, credentials).as_error()
308    } else {
309      error!("Invalid state: {:?}", state);
310      self.connection.set_error()?;
311      Err(ErrorKind::InvalidConnectionState(state).into())
312    }
313  }
314
315  fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<(), Error> {
316    trace!("Server sent connection::Secure: {:?}", method);
317
318    let state = self.connection.status().state();
319    if let ConnectionState::SentStartOk(_, credentials) = state {
320      self.connection_secure_ok(&credentials.rabbit_cr_demo_answer()).as_error()
321    } else {
322      error!("Invalid state: {:?}", state);
323      self.connection.set_error()?;
324      Err(ErrorKind::InvalidConnectionState(state).into())
325    }
326  }
327
328  fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<(), Error> {
329    debug!("Server sent Connection::Tune: {:?}", method);
330
331    let state = self.connection.status().state();
332    if let ConnectionState::SentStartOk(wait_handle, _) = state {
333      self.tune_connection_configuration(method.channel_max, method.frame_max, method.heartbeat);
334
335      self.connection_tune_ok(self.connection.configuration().channel_max(), self.connection.configuration().frame_max(), self.connection.configuration().heartbeat()).as_error()?;
336      self.connection_open(&self.connection.status().vhost(), wait_handle).as_error()
337    } else {
338      error!("Invalid state: {:?}", state);
339      self.connection.set_error()?;
340      Err(ErrorKind::InvalidConnectionState(state).into())
341    }
342  }
343
344  fn on_connection_open_ok_received(&self, _: protocol::connection::OpenOk) -> Result<(), Error> {
345    let state = self.connection.status().state();
346    if let ConnectionState::SentOpen(wait_handle) = state {
347      self.connection.set_state(ConnectionState::Connected);
348      wait_handle.finish(self.connection.clone());
349      Ok(())
350    } else {
351      error!("Invalid state: {:?}", state);
352      self.connection.set_error()?;
353      Err(ErrorKind::InvalidConnectionState(state).into())
354    }
355  }
356
357  fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<(), Error> {
358    if let Some(error) = AMQPError::from_id(method.reply_code) {
359      error!("Connection closed on channel {} by {}:{} => {:?} => {}", self.id, method.class_id, method.method_id, error, method.reply_text);
360    } else {
361      info!("Connection closed on channel {}: {:?}", self.id, method);
362    }
363    let state = self.connection.status().state();
364    self.connection.set_closing();
365    self.connection.drop_pending_frames();
366    self.connection_close_ok().as_error()?;
367    match state {
368      ConnectionState::SentProtocolHeader(wait_handle, ..) => wait_handle.error(ErrorKind::ConnectionRefused.into()),
369      ConnectionState::SentStartOk(wait_handle, _)         => wait_handle.error(ErrorKind::ConnectionRefused.into()),
370      ConnectionState::SentOpen(wait_handle)               => wait_handle.error(ErrorKind::ConnectionRefused.into()),
371      _                                                    => {},
372    }
373    Ok(())
374  }
375
376  fn on_connection_blocked_received(&self, _method: protocol::connection::Blocked) -> Result<(), Error> {
377    self.connection.block();
378    Ok(())
379  }
380
381  fn on_connection_unblocked_received(&self, _method: protocol::connection::Unblocked) -> Result<(), Error> {
382    self.connection.unblock();
383    Ok(())
384  }
385
386  fn on_connection_close_ok_received(&self) -> Result<(), Error> {
387    self.connection.set_closed()
388  }
389
390  fn on_channel_open_ok_received(&self, _method: protocol::channel::OpenOk, wait_handle: WaitHandle<Channel>) -> Result<(), Error> {
391    self.status.set_state(ChannelState::Connected);
392    wait_handle.finish(self.clone());
393    Ok(())
394  }
395
396  fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<(), Error> {
397    self.status.set_send_flow(method.active);
398    self.channel_flow_ok(ChannelFlowOkOptions {active: method.active}).as_error()
399  }
400
401  fn on_channel_flow_ok_received(&self, method: protocol::channel::FlowOk, wait_handle: WaitHandle<Boolean>) -> Result<(), Error> {
402    // Nothing to do here, the server just confirmed that we paused/resumed the receiving flow
403    wait_handle.finish(method.active);
404    Ok(())
405  }
406
407  fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<(), Error> {
408    if let Some(error) = AMQPError::from_id(method.reply_code) {
409      error!("Channel {} closed by {}:{} => {:?} => {}", self.id, method.class_id, method.method_id, error, method.reply_text);
410    } else {
411      info!("Channel {} closed: {:?}", self.id, method);
412    }
413    self.channel_close_ok().as_error()
414  }
415
416  fn on_channel_close_ok_received(&self) -> Result<(), Error> {
417    self.set_closed()
418  }
419
420  fn on_queue_delete_ok_received(&self, method: protocol::queue::DeleteOk, wait_handle: WaitHandle<LongUInt>, queue: ShortString) -> Result<(), Error> {
421    self.queues.deregister(queue.as_str());
422    wait_handle.finish(method.message_count);
423    Ok(())
424  }
425
426  fn on_queue_purge_ok_received(&self, method: protocol::queue::PurgeOk, wait_handle: WaitHandle<LongUInt>) -> Result<(), Error> {
427    wait_handle.finish(method.message_count);
428    Ok(())
429  }
430
431  fn on_queue_declare_ok_received(&self, method: protocol::queue::DeclareOk, wait_handle: WaitHandle<Queue>) -> Result<(), Error> {
432    let queue = Queue::new(method.queue, method.message_count, method.consumer_count);
433    wait_handle.finish(queue.clone());
434    self.queues.register(queue.into());
435    Ok(())
436  }
437
438  fn on_basic_get_ok_received(&self, method: protocol::basic::GetOk, wait_handle: WaitHandle<Option<BasicGetMessage>>, queue: ShortString) -> Result<(), Error> {
439    self.queues.start_basic_get_delivery(queue.as_str(), BasicGetMessage::new(method.delivery_tag, method.exchange, method.routing_key, method.redelivered, method.message_count), wait_handle);
440    self.status.set_state(ChannelState::WillReceiveContent(Some(queue), None));
441    Ok(())
442  }
443
444  fn on_basic_get_empty_received(&self, _: protocol::basic::GetEmpty) -> Result<(), Error> {
445    match self.connection.next_expected_reply(self.id) {
446      Some(Reply::AwaitingBasicGetOk(wait_handle, _)) => {
447        wait_handle.finish(None);
448        Ok(())
449      },
450      _ => {
451        self.set_error()?;
452        Err(ErrorKind::UnexpectedReply.into())
453      }
454    }
455  }
456
457  #[allow(clippy::too_many_arguments)]
458  fn on_basic_consume_ok_received(&self, method: protocol::basic::ConsumeOk, wait_handle: WaitHandle<ShortString>, queue: ShortString, no_local: bool, no_ack: bool, exclusive: bool, subscriber: Box<dyn ConsumerSubscriber>) -> Result<(), Error> {
459    wait_handle.finish(method.consumer_tag.clone());
460    self.queues.register_consumer(queue.as_str(), method.consumer_tag.clone(), Consumer::new(method.consumer_tag, no_local, no_ack, exclusive, subscriber));
461    Ok(())
462  }
463
464  fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<(), Error> {
465    if let Some(queue_name) = self.queues.start_consumer_delivery(method.consumer_tag.as_str(), Delivery::new(method.delivery_tag, method.exchange.into(), method.routing_key.into(), method.redelivered)) {
466      self.status.set_state(ChannelState::WillReceiveContent(Some(queue_name), Some(method.consumer_tag)));
467    }
468    Ok(())
469  }
470
471  fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<(), Error> {
472    self.queues.deregister_consumer(method.consumer_tag.as_str());
473    if !method.nowait {
474      self.basic_cancel_ok(method.consumer_tag.as_str()).as_error()
475    } else {
476      Ok(())
477    }
478  }
479
480  fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<(), Error> {
481    self.queues.deregister_consumer(method.consumer_tag.as_str());
482    Ok(())
483  }
484
485  fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<(), Error> {
486    if self.status.confirm() {
487      if method.multiple {
488        if method.delivery_tag > 0 {
489          self.acknowledgements.ack_all_before(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
490        } else {
491          self.acknowledgements.ack_all_pending();
492        }
493      } else {
494        self.acknowledgements.ack(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
495      }
496    }
497    Ok(())
498  }
499
500  fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<(), Error> {
501    if self.status.confirm() {
502      if method.multiple {
503        if method.delivery_tag > 0 {
504          self.acknowledgements.nack_all_before(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
505        } else {
506          self.acknowledgements.nack_all_pending();
507        }
508      } else {
509        self.acknowledgements.nack(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
510      }
511    }
512    Ok(())
513  }
514
515  fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<(), Error> {
516    self.returned_messages.start_new_delivery(BasicReturnMessage::new(method.exchange, method.routing_key, method.reply_code, method.reply_text));
517    self.status.set_state(ChannelState::WillReceiveContent(None, None));
518    Ok(())
519  }
520
521  fn on_basic_recover_ok_received(&self) -> Result<(), Error> {
522    self.queues.drop_prefetched_messages();
523    Ok(())
524  }
525
526  fn on_confirm_select_ok_received(&self) -> Result<(), Error> {
527    self.status.set_confirm();
528    Ok(())
529  }
530
531  fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<(), Error> {
532    Ok(())
533  }
534}
535
536include!(concat!(env!("OUT_DIR"), "/channel.rs"));