1use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
2use log::{debug, error, info, trace};
3
4use std::borrow::Borrow;
5
6use crate::{
7 BasicProperties,
8 acknowledgement::{Acknowledgements, DeliveryTag},
9 auth::Credentials,
10 channel_status::{ChannelStatus, ChannelState},
11 confirmation::Confirmation,
12 connection::Connection,
13 connection_status::ConnectionState,
14 consumer::{Consumer, ConsumerSubscriber},
15 error::{Error, ErrorKind},
16 frames::Priority,
17 id_sequence::IdSequence,
18 message::{BasicGetMessage, BasicReturnMessage, Delivery},
19 protocol::{self, AMQPClass, AMQPError, AMQPSoftError},
20 queue::Queue,
21 queues::Queues,
22 returned_messages::ReturnedMessages,
23 types::*,
24 wait::{Wait, WaitHandle},
25};
26
27#[cfg(test)]
28use crate::queue::QueueState;
29
30#[derive(Clone, Debug)]
31#[deprecated(note = "use lapin instead")]
32pub struct Channel {
33 id: u16,
34 connection: Connection,
35 status: ChannelStatus,
36 acknowledgements: Acknowledgements,
37 delivery_tag: IdSequence<DeliveryTag>,
38 queues: Queues,
39 returned_messages: ReturnedMessages,
40}
41
42impl Channel {
43 pub(crate) fn new(channel_id: u16, connection: Connection) -> Channel {
44 let returned_messages = ReturnedMessages::default();
45 Channel {
46 id: channel_id,
47 connection,
48 status: ChannelStatus::default(),
49 acknowledgements: Acknowledgements::new(returned_messages.clone()),
50 delivery_tag: IdSequence::new(false),
51 queues: Queues::default(),
52 returned_messages,
53 }
54 }
55
56 #[deprecated(note = "use lapin instead")]
57 pub fn status(&self) -> &ChannelStatus {
58 &self.status
59 }
60
61 pub(crate) fn set_closing(&self) {
62 self.set_state(ChannelState::Closing);
63 }
64
65 pub(crate) fn set_closed(&self) -> Result<(), Error> {
66 self.set_state(ChannelState::Closed);
67 self.connection.remove_channel(self.id)
68 }
69
70 pub(crate) fn set_error(&self) -> Result<(), Error> {
71 self.set_state(ChannelState::Error);
72 self.connection.remove_channel(self.id)
73 }
74
75 pub(crate) fn set_state(&self, state: ChannelState) {
76 self.status.set_state(state);
77 }
78
79 #[deprecated(note = "use lapin instead")]
80 pub fn id(&self) -> u16 {
81 self.id
82 }
83
84 #[deprecated(note = "use lapin instead")]
85 pub fn close(&self, reply_code: ShortUInt, reply_text: &str) -> Confirmation<()> {
86 self.do_channel_close(reply_code, reply_text, 0, 0)
87 }
88
89 #[deprecated(note = "use lapin instead")]
90 pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable, subscriber: Box<dyn ConsumerSubscriber>) -> Confirmation<ShortString> {
91 self.do_basic_consume(queue.borrow(), consumer_tag, options, arguments, subscriber)
92 }
93
94 #[deprecated(note = "use lapin instead")]
95 pub fn wait_for_confirms(&self) -> Confirmation<Vec<BasicReturnMessage>> {
96 if let Some(wait) = self.acknowledgements.get_last_pending() {
97 let returned_messages = self.returned_messages.clone();
98 Confirmation::new(wait).map(Box::new(move |_| returned_messages.drain()))
99 } else {
100 let (wait, wait_handle) = Wait::new();
101 wait_handle.finish(Vec::default());
102 Confirmation::new(wait)
103 }
104 }
105
106 #[cfg(test)]
107 pub(crate) fn register_queue(&self, queue: QueueState) {
108 self.queues.register(queue);
109 }
110
111 pub(crate) fn send_method_frame(&self, priority: Priority, method: AMQPClass, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
112 self.send_frame(priority, AMQPFrame::Method(self.id, method), expected_reply)
113 }
114
115 pub(crate) fn send_frame(&self, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
116 self.connection.send_frame(self.id, priority, frame, expected_reply)
117 }
118
119 fn send_content_frames(&self, class_id: u16, slice: &[u8], properties: BasicProperties) -> Result<Wait<()>, Error> {
120 let header = AMQPContentHeader {
121 class_id,
122 weight: 0,
123 body_size: slice.len() as u64,
124 properties,
125 };
126 let mut wait = self.send_frame(Priority::LOW, AMQPFrame::Header(self.id, class_id, Box::new(header)), None)?;
127
128 let frame_max = self.connection.configuration().frame_max();
129 for chunk in slice.chunks(frame_max as usize - 8) {
131 wait = self.send_frame(Priority::LOW, AMQPFrame::Body(self.id, Vec::from(chunk)), None)?;
132 }
133 Ok(wait)
134 }
135
136 pub(crate) fn handle_content_header_frame(&self, size: u64, properties: BasicProperties) -> Result<(), Error> {
137 if let ChannelState::WillReceiveContent(queue_name, request_id_or_consumer_tag) = self.status.state() {
138 if size > 0 {
139 self.status.set_state(ChannelState::ReceivingContent(queue_name.clone(), request_id_or_consumer_tag.clone(), size as usize));
140 } else {
141 self.status.set_state(ChannelState::Connected);
142 }
143 if let Some(queue_name) = queue_name {
144 self.queues.handle_content_header_frame(queue_name.as_str(), request_id_or_consumer_tag, size, properties);
145 } else {
146 self.returned_messages.set_delivery_properties(properties);
147 if size == 0 {
148 self.returned_messages.new_delivery_complete();
149 }
150 }
151 Ok(())
152 } else {
153 self.set_error()
154 }
155 }
156
157 pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<(), Error> {
158 let payload_size = payload.len();
159
160 if let ChannelState::ReceivingContent(queue_name, request_id_or_consumer_tag, remaining_size) = self.status.state() {
161 if remaining_size >= payload_size {
162 if let Some(queue_name) = queue_name.as_ref() {
163 self.queues.handle_body_frame(queue_name.as_str(), request_id_or_consumer_tag.clone(), remaining_size, payload_size, payload);
164 } else {
165 self.returned_messages.receive_delivery_content(payload);
166 if remaining_size == payload_size {
167 self.returned_messages.new_delivery_complete();
168 }
169 }
170 if remaining_size == payload_size {
171 self.status.set_state(ChannelState::Connected);
172 } else {
173 self.status.set_state(ChannelState::ReceivingContent(queue_name, request_id_or_consumer_tag, remaining_size - payload_size));
174 }
175 Ok(())
176 } else {
177 error!("body frame too large");
178 self.set_error()
179 }
180 } else {
181 self.set_error()
182 }
183 }
184
185 fn acknowledgement_error(&self, error: Error, class_id: u16, method_id: u16) -> Result<(), Error> {
186 self.do_channel_close(AMQPSoftError::PRECONDITIONFAILED.get_id(), "precondition failed", class_id, method_id).as_error()?;
187 Err(error)
188 }
189
190 fn on_connection_start_ok_sent(&self, wait_handle: WaitHandle<Connection>, credentials: Credentials) -> Result<(), Error> {
191 self.connection.set_state(ConnectionState::SentStartOk(wait_handle, credentials));
192 Ok(())
193 }
194
195 fn on_connection_open_sent(&self, wait_handle: WaitHandle<Connection>) -> Result<(), Error> {
196 self.connection.set_state(ConnectionState::SentOpen(wait_handle));
197 Ok(())
198 }
199
200 fn on_connection_close_sent(&self) -> Result<(), Error> {
201 self.connection.set_closing();
202 Ok(())
203 }
204
205 fn on_connection_close_ok_sent(&self) -> Result<(), Error> {
206 self.connection.set_closed()
207 }
208
209 fn on_channel_close_sent(&self) -> Result<(), Error> {
210 self.set_closing();
211 Ok(())
212 }
213
214 fn on_channel_close_ok_sent(&self) -> Result<(), Error> {
215 self.set_closed()
216 }
217
218 fn on_basic_publish_sent(&self, class_id: u16, payload: Vec<u8>, properties: BasicProperties) -> Result<Wait<()>, Error> {
219 if self.status.confirm() {
220 let delivery_tag = self.delivery_tag.next();
221 self.acknowledgements.register_pending(delivery_tag);
222 };
223
224 self.send_content_frames(class_id, payload.as_slice(), properties)
225 }
226
227 fn on_basic_recover_async_sent(&self) -> Result<(), Error> {
228 self.queues.drop_prefetched_messages();
229 Ok(())
230 }
231
232 fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) -> Result<(), Error> {
233 if multiple && delivery_tag == 0 {
234 self.queues.drop_prefetched_messages();
235 }
236 Ok(())
237 }
238
239 fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) -> Result<(), Error> {
240 if multiple && delivery_tag == 0 {
241 self.queues.drop_prefetched_messages();
242 }
243 Ok(())
244 }
245
246 fn tune_connection_configuration(&self, channel_max: u16, frame_max: u32, heartbeat: u16) {
247 if self.connection.configuration().heartbeat() == 0 || heartbeat != 0 && heartbeat < self.connection.configuration().heartbeat() {
250 self.connection.configuration().set_heartbeat(heartbeat);
251 }
252
253 if channel_max != 0 {
254 if self.connection.configuration().channel_max() == 0 || channel_max < self.connection.configuration().channel_max() {
257 self.connection.configuration().set_channel_max(channel_max);
258 }
259 }
260 if self.connection.configuration().channel_max() == 0 {
261 self.connection.configuration().set_channel_max(u16::max_value());
262 }
263
264 if frame_max != 0 {
265 if self.connection.configuration().frame_max() == 0 || frame_max < self.connection.configuration().frame_max() {
268 self.connection.configuration().set_frame_max(frame_max);
269 }
270 }
271 if self.connection.configuration().frame_max() == 0 {
272 self.connection.configuration().set_frame_max(u32::max_value());
273 }
274 }
275
276 fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<(), Error> {
277 trace!("Server sent connection::Start: {:?}", method);
278 let state = self.connection.status().state();
279 if let ConnectionState::SentProtocolHeader(wait_handle, credentials, mut options) = state {
280 let mechanism = options.mechanism.to_string();
281 let locale = options.locale.clone();
282
283 if !method.mechanisms.split_whitespace().any(|m| m == mechanism) {
284 error!("unsupported mechanism: {}", mechanism);
285 }
286 if !method.locales.split_whitespace().any(|l| l == locale) {
287 error!("unsupported locale: {}", mechanism);
288 }
289
290 if !options.client_properties.contains_key("product") || !options.client_properties.contains_key("version") {
291 options.client_properties.insert("product".into(), AMQPValue::LongString(env!("CARGO_PKG_NAME").into()));
292 options.client_properties.insert("version".into(), AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()));
293 }
294
295 options.client_properties.insert("platform".into(), AMQPValue::LongString("rust".into()));
296
297 let mut capabilities = FieldTable::default();
298 capabilities.insert("publisher_confirms".into(), AMQPValue::Boolean(true));
299 capabilities.insert("exchange_exchange_bindings".into(), AMQPValue::Boolean(true));
300 capabilities.insert("basic.nack".into(), AMQPValue::Boolean(true));
301 capabilities.insert("consumer_cancel_notify".into(), AMQPValue::Boolean(true));
302 capabilities.insert("connection.blocked".into(), AMQPValue::Boolean(true));
303 capabilities.insert("authentication_failure_close".into(), AMQPValue::Boolean(true));
304
305 options.client_properties.insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
306
307 self.connection_start_ok(options.client_properties, &mechanism, &credentials.sasl_auth_string(options.mechanism), &locale, wait_handle, credentials).as_error()
308 } else {
309 error!("Invalid state: {:?}", state);
310 self.connection.set_error()?;
311 Err(ErrorKind::InvalidConnectionState(state).into())
312 }
313 }
314
315 fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<(), Error> {
316 trace!("Server sent connection::Secure: {:?}", method);
317
318 let state = self.connection.status().state();
319 if let ConnectionState::SentStartOk(_, credentials) = state {
320 self.connection_secure_ok(&credentials.rabbit_cr_demo_answer()).as_error()
321 } else {
322 error!("Invalid state: {:?}", state);
323 self.connection.set_error()?;
324 Err(ErrorKind::InvalidConnectionState(state).into())
325 }
326 }
327
328 fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<(), Error> {
329 debug!("Server sent Connection::Tune: {:?}", method);
330
331 let state = self.connection.status().state();
332 if let ConnectionState::SentStartOk(wait_handle, _) = state {
333 self.tune_connection_configuration(method.channel_max, method.frame_max, method.heartbeat);
334
335 self.connection_tune_ok(self.connection.configuration().channel_max(), self.connection.configuration().frame_max(), self.connection.configuration().heartbeat()).as_error()?;
336 self.connection_open(&self.connection.status().vhost(), wait_handle).as_error()
337 } else {
338 error!("Invalid state: {:?}", state);
339 self.connection.set_error()?;
340 Err(ErrorKind::InvalidConnectionState(state).into())
341 }
342 }
343
344 fn on_connection_open_ok_received(&self, _: protocol::connection::OpenOk) -> Result<(), Error> {
345 let state = self.connection.status().state();
346 if let ConnectionState::SentOpen(wait_handle) = state {
347 self.connection.set_state(ConnectionState::Connected);
348 wait_handle.finish(self.connection.clone());
349 Ok(())
350 } else {
351 error!("Invalid state: {:?}", state);
352 self.connection.set_error()?;
353 Err(ErrorKind::InvalidConnectionState(state).into())
354 }
355 }
356
357 fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<(), Error> {
358 if let Some(error) = AMQPError::from_id(method.reply_code) {
359 error!("Connection closed on channel {} by {}:{} => {:?} => {}", self.id, method.class_id, method.method_id, error, method.reply_text);
360 } else {
361 info!("Connection closed on channel {}: {:?}", self.id, method);
362 }
363 let state = self.connection.status().state();
364 self.connection.set_closing();
365 self.connection.drop_pending_frames();
366 self.connection_close_ok().as_error()?;
367 match state {
368 ConnectionState::SentProtocolHeader(wait_handle, ..) => wait_handle.error(ErrorKind::ConnectionRefused.into()),
369 ConnectionState::SentStartOk(wait_handle, _) => wait_handle.error(ErrorKind::ConnectionRefused.into()),
370 ConnectionState::SentOpen(wait_handle) => wait_handle.error(ErrorKind::ConnectionRefused.into()),
371 _ => {},
372 }
373 Ok(())
374 }
375
376 fn on_connection_blocked_received(&self, _method: protocol::connection::Blocked) -> Result<(), Error> {
377 self.connection.block();
378 Ok(())
379 }
380
381 fn on_connection_unblocked_received(&self, _method: protocol::connection::Unblocked) -> Result<(), Error> {
382 self.connection.unblock();
383 Ok(())
384 }
385
386 fn on_connection_close_ok_received(&self) -> Result<(), Error> {
387 self.connection.set_closed()
388 }
389
390 fn on_channel_open_ok_received(&self, _method: protocol::channel::OpenOk, wait_handle: WaitHandle<Channel>) -> Result<(), Error> {
391 self.status.set_state(ChannelState::Connected);
392 wait_handle.finish(self.clone());
393 Ok(())
394 }
395
396 fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<(), Error> {
397 self.status.set_send_flow(method.active);
398 self.channel_flow_ok(ChannelFlowOkOptions {active: method.active}).as_error()
399 }
400
401 fn on_channel_flow_ok_received(&self, method: protocol::channel::FlowOk, wait_handle: WaitHandle<Boolean>) -> Result<(), Error> {
402 wait_handle.finish(method.active);
404 Ok(())
405 }
406
407 fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<(), Error> {
408 if let Some(error) = AMQPError::from_id(method.reply_code) {
409 error!("Channel {} closed by {}:{} => {:?} => {}", self.id, method.class_id, method.method_id, error, method.reply_text);
410 } else {
411 info!("Channel {} closed: {:?}", self.id, method);
412 }
413 self.channel_close_ok().as_error()
414 }
415
416 fn on_channel_close_ok_received(&self) -> Result<(), Error> {
417 self.set_closed()
418 }
419
420 fn on_queue_delete_ok_received(&self, method: protocol::queue::DeleteOk, wait_handle: WaitHandle<LongUInt>, queue: ShortString) -> Result<(), Error> {
421 self.queues.deregister(queue.as_str());
422 wait_handle.finish(method.message_count);
423 Ok(())
424 }
425
426 fn on_queue_purge_ok_received(&self, method: protocol::queue::PurgeOk, wait_handle: WaitHandle<LongUInt>) -> Result<(), Error> {
427 wait_handle.finish(method.message_count);
428 Ok(())
429 }
430
431 fn on_queue_declare_ok_received(&self, method: protocol::queue::DeclareOk, wait_handle: WaitHandle<Queue>) -> Result<(), Error> {
432 let queue = Queue::new(method.queue, method.message_count, method.consumer_count);
433 wait_handle.finish(queue.clone());
434 self.queues.register(queue.into());
435 Ok(())
436 }
437
438 fn on_basic_get_ok_received(&self, method: protocol::basic::GetOk, wait_handle: WaitHandle<Option<BasicGetMessage>>, queue: ShortString) -> Result<(), Error> {
439 self.queues.start_basic_get_delivery(queue.as_str(), BasicGetMessage::new(method.delivery_tag, method.exchange, method.routing_key, method.redelivered, method.message_count), wait_handle);
440 self.status.set_state(ChannelState::WillReceiveContent(Some(queue), None));
441 Ok(())
442 }
443
444 fn on_basic_get_empty_received(&self, _: protocol::basic::GetEmpty) -> Result<(), Error> {
445 match self.connection.next_expected_reply(self.id) {
446 Some(Reply::AwaitingBasicGetOk(wait_handle, _)) => {
447 wait_handle.finish(None);
448 Ok(())
449 },
450 _ => {
451 self.set_error()?;
452 Err(ErrorKind::UnexpectedReply.into())
453 }
454 }
455 }
456
457 #[allow(clippy::too_many_arguments)]
458 fn on_basic_consume_ok_received(&self, method: protocol::basic::ConsumeOk, wait_handle: WaitHandle<ShortString>, queue: ShortString, no_local: bool, no_ack: bool, exclusive: bool, subscriber: Box<dyn ConsumerSubscriber>) -> Result<(), Error> {
459 wait_handle.finish(method.consumer_tag.clone());
460 self.queues.register_consumer(queue.as_str(), method.consumer_tag.clone(), Consumer::new(method.consumer_tag, no_local, no_ack, exclusive, subscriber));
461 Ok(())
462 }
463
464 fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<(), Error> {
465 if let Some(queue_name) = self.queues.start_consumer_delivery(method.consumer_tag.as_str(), Delivery::new(method.delivery_tag, method.exchange.into(), method.routing_key.into(), method.redelivered)) {
466 self.status.set_state(ChannelState::WillReceiveContent(Some(queue_name), Some(method.consumer_tag)));
467 }
468 Ok(())
469 }
470
471 fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<(), Error> {
472 self.queues.deregister_consumer(method.consumer_tag.as_str());
473 if !method.nowait {
474 self.basic_cancel_ok(method.consumer_tag.as_str()).as_error()
475 } else {
476 Ok(())
477 }
478 }
479
480 fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<(), Error> {
481 self.queues.deregister_consumer(method.consumer_tag.as_str());
482 Ok(())
483 }
484
485 fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<(), Error> {
486 if self.status.confirm() {
487 if method.multiple {
488 if method.delivery_tag > 0 {
489 self.acknowledgements.ack_all_before(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
490 } else {
491 self.acknowledgements.ack_all_pending();
492 }
493 } else {
494 self.acknowledgements.ack(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
495 }
496 }
497 Ok(())
498 }
499
500 fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<(), Error> {
501 if self.status.confirm() {
502 if method.multiple {
503 if method.delivery_tag > 0 {
504 self.acknowledgements.nack_all_before(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
505 } else {
506 self.acknowledgements.nack_all_pending();
507 }
508 } else {
509 self.acknowledgements.nack(method.delivery_tag).or_else(|err| self.acknowledgement_error(err, method.get_amqp_class_id(), method.get_amqp_method_id()))?;
510 }
511 }
512 Ok(())
513 }
514
515 fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<(), Error> {
516 self.returned_messages.start_new_delivery(BasicReturnMessage::new(method.exchange, method.routing_key, method.reply_code, method.reply_text));
517 self.status.set_state(ChannelState::WillReceiveContent(None, None));
518 Ok(())
519 }
520
521 fn on_basic_recover_ok_received(&self) -> Result<(), Error> {
522 self.queues.drop_prefetched_messages();
523 Ok(())
524 }
525
526 fn on_confirm_select_ok_received(&self) -> Result<(), Error> {
527 self.status.set_confirm();
528 Ok(())
529 }
530
531 fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<(), Error> {
532 Ok(())
533 }
534}
535
536include!(concat!(env!("OUT_DIR"), "/channel.rs"));