1use crate::{
2 BasicProperties, ChannelState, ChannelStatus, Connection, ConnectionState, ConnectionStatus,
3 Error, ErrorKind, ExchangeKind, Promise, PromiseResolver, Result,
4 acknowledgement::Acknowledgements,
5 auth::AuthProvider,
6 basic_get_delivery::BasicGetDelivery,
7 channel_closer::ChannelCloser,
8 channel_receiver_state::DeliveryCause,
9 configuration::{NegotiatedConfig, RecoveryConfig},
10 connection_closer::ConnectionCloser,
11 connection_status::ConnectionStep,
12 consumer::Consumer,
13 consumers::Consumers,
14 events::EventsSender,
15 frames::{ExpectedReply, Frames},
16 internal_rpc::InternalRPCHandle,
17 message::{BasicGetMessage, BasicReturnMessage, Delivery},
18 promise::Cancelable,
19 protocol::{self, AMQPClass, AMQPError, AMQPHardError},
20 publisher_confirm::PublisherConfirm,
21 queue::Queue,
22 registry::Registry,
23 returned_messages::ReturnedMessages,
24 socket_state::SocketStateHandle,
25 topology::ChannelDefinition,
26 types::*,
27};
28use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
29use std::{convert::TryFrom, fmt, sync::Arc};
30use tracing::{error, info, trace};
31
32#[derive(Clone)]
42pub struct Channel {
43 id: ChannelId,
44 configuration: NegotiatedConfig,
45 status: ChannelStatus,
46 connection_status: ConnectionStatus,
47 local_registry: Registry,
48 acknowledgements: Acknowledgements,
49 consumers: Consumers,
50 basic_get_delivery: BasicGetDelivery,
51 returned_messages: ReturnedMessages,
52 waker: SocketStateHandle,
53 internal_rpc: InternalRPCHandle,
54 frames: Frames,
55 events_sender: EventsSender,
56 channel_closer: Option<Arc<ChannelCloser>>,
57 _connection_closer: Option<Arc<ConnectionCloser>>,
58 recovery_config: RecoveryConfig,
59}
60
61impl PartialEq for Channel {
62 fn eq(&self, other: &Self) -> bool {
63 self.id == other.id
64 }
65}
66
67impl fmt::Debug for Channel {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct("Channel")
70 .field("id", &self.id)
71 .field("configuration", &self.configuration)
72 .field("status", &self.status)
73 .field("connection_status", &self.connection_status)
74 .field("acknowledgements", &self.acknowledgements)
75 .field("consumers", &self.consumers)
76 .field("basic_get_delivery", &self.basic_get_delivery)
77 .field("returned_messages", &self.returned_messages)
78 .field("frames", &self.frames)
79 .finish()
80 }
81}
82
83impl Channel {
84 #[allow(clippy::too_many_arguments)]
85 pub(crate) fn new(
86 channel_id: ChannelId,
87 configuration: NegotiatedConfig,
88 connection_status: ConnectionStatus,
89 waker: SocketStateHandle,
90 internal_rpc: InternalRPCHandle,
91 frames: Frames,
92 connection_closer: Option<Arc<ConnectionCloser>>,
93 recovery_config: RecoveryConfig,
94 events_sender: EventsSender,
95 ) -> Channel {
96 let returned_messages = ReturnedMessages::default();
97 let status = ChannelStatus::new(channel_id, internal_rpc.clone());
98 let channel_closer = if channel_id == 0 {
99 None
100 } else {
101 Some(Arc::new(ChannelCloser::new(
102 channel_id,
103 status.clone(),
104 internal_rpc.clone(),
105 )))
106 };
107 Self {
108 id: channel_id,
109 configuration,
110 status,
111 connection_status,
112 local_registry: Registry::default(),
113 acknowledgements: Acknowledgements::new(channel_id, returned_messages.clone()),
114 consumers: Consumers::default(),
115 basic_get_delivery: BasicGetDelivery::default(),
116 returned_messages,
117 waker,
118 internal_rpc,
119 frames,
120 events_sender,
121 channel_closer,
122 _connection_closer: connection_closer,
123 recovery_config,
124 }
125 }
126
127 pub fn status(&self) -> &ChannelStatus {
128 &self.status
129 }
130
131 pub async fn wait_for_recovery(&self, error: Error) -> Result<()> {
132 if self.recovery_config.can_recover(&error)
133 && let Some(notifier) = error.notifier()
134 {
135 notifier.await;
136 return Ok(());
137 }
138 Err(error)
139 }
140
141 pub(crate) fn set_closing(&self, error: Option<Error>) {
142 self.set_state(ChannelState::Closing);
143 if let Some(error) = error {
144 self.error_publisher_confirms(error.clone());
145 self.error_consumers(error); } else {
147 self.consumers.start_cancel();
148 }
149 }
150
151 pub(crate) fn set_closed(&self, error: Error) {
152 self.set_state(ChannelState::Closed);
153 self.error_publisher_confirms(error.clone());
154 self.cancel_consumers();
155 self.internal_rpc.remove_channel(self.id, error);
156 }
157
158 pub(crate) fn set_connection_error(&self, error: Error) {
160 self.set_state(ChannelState::Error);
161 self.error_publisher_confirms(error.clone());
162 self.error_consumers(error.clone());
163 self.internal_rpc.remove_channel(self.id, error.clone());
164 }
165
166 fn error_publisher_confirms(&self, error: Error) {
167 self.acknowledgements.on_channel_error(error);
168 }
169
170 fn cancel_consumers(&self) {
171 self.consumers.cancel();
172 }
173
174 fn error_consumers(&self, error: Error) {
175 let recover = self.recovery_config.can_recover(&error);
176 self.consumers.error(error, recover);
177 }
178
179 pub(crate) fn set_state(&self, state: ChannelState) {
180 self.status.set_state(state);
181 }
182
183 pub fn id(&self) -> ChannelId {
184 self.id
185 }
186
187 pub(crate) fn clone_internal(&self) -> Self {
188 let mut this = self.clone();
189 this.channel_closer = None;
190 this
191 }
192
193 fn wake(&self) {
194 trace!(channel=%self.id, "wake");
195 self.waker.wake()
196 }
197
198 fn assert_channel0(&self, class_id: Identifier, method_id: Identifier) -> Result<()> {
199 if self.id == 0 {
200 Ok(())
201 } else {
202 error!(
203 channel=%self.id,
204 "Got a connection frame on, closing connection"
205 );
206 let error = AMQPError::new(
207 AMQPHardError::COMMANDINVALID.into(),
208 format!("connection frame received on channel {}", self.id).into(),
209 );
210 self.internal_rpc.close_connection(
211 error.get_id(),
212 error.get_message().clone(),
213 class_id,
214 method_id,
215 );
216 Err(ErrorKind::ProtocolError(error).into())
217 }
218 }
219
220 pub async fn close(&self, reply_code: ReplyCode, reply_text: ShortString) -> Result<()> {
221 self.do_channel_close(reply_code, reply_text, 0, 0).await
222 }
223
224 pub async fn basic_consume(
225 &self,
226 queue: ShortString,
227 consumer_tag: ShortString,
228 options: BasicConsumeOptions,
229 arguments: FieldTable,
230 ) -> Result<Consumer> {
231 let consumer = self
232 .do_basic_consume(queue, consumer_tag, options, arguments, None)
233 .await?;
234 Ok(consumer.external(self.id))
235 }
236
237 pub async fn basic_get(
238 &self,
239 queue: ShortString,
240 options: BasicGetOptions,
241 ) -> Result<Option<BasicGetMessage>> {
242 self.do_basic_get(queue, options, None).await
243 }
244
245 pub async fn exchange_declare(
246 &self,
247 exchange: ShortString,
248 kind: ExchangeKind,
249 options: ExchangeDeclareOptions,
250 arguments: FieldTable,
251 ) -> Result<()> {
252 self.do_exchange_declare(
253 exchange,
254 kind.kind().into(),
255 options,
256 arguments,
257 kind.clone(),
258 )
259 .await
260 }
261
262 pub async fn wait_for_confirms(&self) -> Result<Vec<BasicReturnMessage>> {
263 if let Some(last_pending) = self.acknowledgements.get_last_pending() {
264 trace!("Waiting for pending confirms");
265 last_pending.await?;
266 } else {
267 trace!("No confirms to wait for");
268 }
269 Ok(self.returned_messages.drain())
270 }
271
272 #[cfg(test)]
273 pub(crate) fn register_queue(
274 &self,
275 name: ShortString,
276 options: QueueDeclareOptions,
277 arguments: FieldTable,
278 ) {
279 self.local_registry.register_queue(name, options, arguments);
280 }
281
282 #[cfg(test)]
283 pub(crate) fn register_consumer(&self, tag: ShortString, consumer: Consumer) {
284 self.consumers.register(tag, consumer);
285 }
286
287 pub(crate) fn finalize_connection(&self) {
288 self.status.finalize_connection();
289 }
290
291 pub(crate) fn init_recovery(&self, error: Error) -> Error {
292 let err = self.status.set_reconnecting(error, self.topology());
293 self.frames.drop_frames_for_channel(self.id, err.clone());
294 self.poison(err.clone());
295 err
296 }
297
298 fn poison(&self, error: Error) {
299 self.frames.poison_channel(self.id, error);
300 }
301
302 pub(crate) fn update_recovery(&self) -> Option<ChannelDefinition> {
303 self.status.update_recovery_context(|ctx| {
304 ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
306 self.acknowledgements.reset(ctx.cause());
308 self.frames.drop_channel_poison(self.id);
310 ctx.topology()
311 })
312 }
313
314 pub(crate) async fn start_recovery(&self) -> Result<()> {
315 let topology = self.update_recovery().expect("No topology during recovery");
316
317 self.channel_open(self.clone()).await?;
319
320 if self.status.confirm() {
322 self.confirm_select(ConfirmSelectOptions::default()).await?;
323 }
324
325 for ex in &topology.exchanges {
327 if ex.is_declared {
328 self.exchange_declare(
329 ex.name.clone(),
330 ex.kind.clone().unwrap_or_default(),
331 ex.options.unwrap_or_default(),
332 ex.arguments.clone().unwrap_or_default(),
333 )
334 .await?;
335 }
336 }
337
338 for ex in &topology.exchanges {
340 for binding in &ex.bindings {
341 self.exchange_bind(
342 ex.name.clone(),
343 binding.source.clone(),
344 binding.routing_key.clone(),
345 ExchangeBindOptions::default(),
346 binding.arguments.clone(),
347 )
348 .await?;
349 }
350 }
351
352 for queue in &topology.queues {
354 if queue.is_declared {
355 self.queue_declare(
356 queue.name.clone(),
357 queue.options.unwrap_or_default(),
358 queue.arguments.clone().unwrap_or_default(),
359 )
360 .await?;
361 }
362 }
363
364 for queue in &topology.queues {
366 for binding in &queue.bindings {
367 self.queue_bind(
368 queue.name.clone(),
369 binding.source.clone(),
370 binding.routing_key.clone(),
371 QueueBindOptions::default(),
372 binding.arguments.clone(),
373 )
374 .await?;
375 }
376 }
377
378 for consumer in topology.consumers.iter().cloned() {
380 consumer.reset();
381 self.do_basic_consume(
382 consumer.queue().clone(),
383 consumer.tag().clone(),
384 consumer.options(),
385 consumer.arguments(),
386 Some(consumer),
387 )
388 .await?;
389 }
390
391 Ok(())
392 }
393
394 pub(crate) fn send_method_frame(
395 &self,
396 method: AMQPClass,
397 canceler: Box<dyn Cancelable + Send + Sync>,
398 expected_reply: Option<ExpectedReply>,
399 resolver: Option<PromiseResolver<()>>,
400 ) {
401 self.send_frame(
402 AMQPFrame::Method(self.id, method),
403 canceler,
404 expected_reply,
405 resolver,
406 );
407 }
408
409 pub(crate) fn send_frame(
410 &self,
411 frame: AMQPFrame,
412 canceler: Box<dyn Cancelable + Send + Sync>,
413 expected_reply: Option<ExpectedReply>,
414 resolver: Option<PromiseResolver<()>>,
415 ) {
416 trace!(channel=%self.id, "send_frame");
417 self.frames
418 .push(self.id, frame, canceler, expected_reply, resolver);
419 self.wake();
420 }
421
422 async fn send_method_frame_with_body(
423 &self,
424 ctx: &'static str,
425 method: AMQPClass,
426 payload: &[u8],
427 properties: BasicProperties,
428 publisher_confirms_result: Option<PublisherConfirm>,
429 ) -> Result<PublisherConfirm> {
430 let class_id = method.get_amqp_class_id();
431 let header = AMQPContentHeader {
432 class_id,
433 body_size: payload.len() as PayloadSize,
434 properties,
435 };
436 let frame_max = self.configuration.frame_max();
437 let mut frames = vec![
438 AMQPFrame::Method(self.id, method),
439 AMQPFrame::Header(self.id, header),
440 ];
441
442 frames.extend(
443 payload
444 .chunks(frame_max as usize - 8 )
445 .map(|chunk| AMQPFrame::Body(self.id, chunk.into())),
446 );
447
448 trace!(channel=%self.id, "send_frames");
449 let (promise, resolver) = Promise::new(ctx);
450 self.frames.push_frames(self.id, frames, resolver);
451 self.wake();
452 promise.await?;
453 Ok(publisher_confirms_result
454 .unwrap_or_else(|| PublisherConfirm::not_requested(self.returned_messages.clone())))
455 }
456
457 pub(crate) fn report_protocol_violation(
458 &self,
459 error: AMQPError,
460 class_id: Identifier,
461 method_id: Identifier,
462 ) -> Result<()> {
463 error!(%error);
464 self.internal_rpc.close_connection(
465 error.get_id(),
466 error.get_message().clone(),
467 class_id,
468 method_id,
469 );
470 let error = Error::from(ErrorKind::ProtocolError(error));
471 self.internal_rpc.set_connection_error(error.clone());
472 Err(error)
473 }
474
475 fn handle_invalid_contents(
476 &self,
477 error: String,
478 class_id: Identifier,
479 method_id: Identifier,
480 ) -> Result<()> {
481 error!(%error);
482 let error = AMQPError::new(AMQPHardError::UNEXPECTEDFRAME.into(), error.into());
483 self.report_protocol_violation(error, class_id, method_id)
484 }
485
486 pub(crate) fn handle_content_header_frame(
487 &self,
488 class_id: Identifier,
489 size: PayloadSize,
490 properties: BasicProperties,
491 ) -> Result<()> {
492 self.status.set_content_length(
493 self.id,
494 class_id,
495 size,
496 |delivery_cause, confirm_mode| match delivery_cause {
497 DeliveryCause::Consume(consumer_tag) => {
498 self.consumers
499 .handle_content_header_frame(consumer_tag, size, properties);
500 }
501 DeliveryCause::Get => {
502 self.basic_get_delivery
503 .handle_content_header_frame(size, properties);
504 }
505 DeliveryCause::Return => {
506 self.returned_messages.handle_content_header_frame(
507 size,
508 properties,
509 confirm_mode,
510 );
511 }
512 },
513 |msg| {
514 let error = AMQPError::new(AMQPHardError::FRAMEERROR.into(), msg.into());
515 self.report_protocol_violation(error, class_id, 0)
516 },
517 |msg| self.handle_invalid_contents(msg, class_id, 0),
518 )
519 }
520
521 pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<()> {
522 self.status.receive(
523 self.id,
524 payload.len() as PayloadSize,
525 |delivery_cause, remaining_size, confirm_mode| match delivery_cause {
526 DeliveryCause::Consume(consumer_tag) => {
527 self.consumers
528 .handle_body_frame(consumer_tag, remaining_size, payload);
529 }
530 DeliveryCause::Get => {
531 self.basic_get_delivery
532 .handle_body_frame(remaining_size, payload);
533 }
534 DeliveryCause::Return => {
535 self.returned_messages
536 .handle_body_frame(remaining_size, payload, confirm_mode);
537 }
538 },
539 |msg| self.handle_invalid_contents(msg, 0, 0),
540 )
541 }
542
543 pub(crate) fn topology(&self) -> ChannelDefinition {
544 ChannelDefinition {
545 exchanges: self.local_registry.exchanges_topology(),
546 queues: self.local_registry.queues_topology(),
547 consumers: self.consumers.topology(),
548 }
549 }
550
551 fn before_basic_publish(&self) -> Option<PublisherConfirm> {
552 if self.status.confirm() {
553 Some(self.acknowledgements.register_pending())
554 } else {
555 None
556 }
557 }
558
559 fn before_basic_cancel(&self, consumer_tag: &str) {
560 self.consumers.start_cancel_one(consumer_tag);
561 }
562
563 fn acknowledgement_error(
564 &self,
565 error: AMQPError,
566 class_id: Identifier,
567 method_id: Identifier,
568 ) -> Result<()> {
569 error!("Got a bad acknowledgement from server, closing channel");
570 let channel = self.clone();
571 let err = error.clone();
572 self.internal_rpc.spawn(async move {
573 channel
574 .do_channel_close(
575 error.get_id(),
576 error.get_message().clone(),
577 class_id,
578 method_id,
579 )
580 .await
581 });
582 Err(ErrorKind::ProtocolError(err).into())
583 }
584
585 fn before_connection_start_ok(
586 &self,
587 resolver: PromiseResolver<Connection>,
588 connection: Connection,
589 auth_provider: Arc<dyn AuthProvider>,
590 ) {
591 self.connection_status
592 .set_connection_step(ConnectionStep::StartOk(resolver, connection, auth_provider));
593 }
594
595 fn before_connection_secure_ok(
596 &self,
597 resolver: PromiseResolver<Connection>,
598 connection: Connection,
599 auth_provider: Arc<dyn AuthProvider>,
600 ) {
601 self.connection_status
602 .set_connection_step(ConnectionStep::SecureOk(
603 resolver,
604 connection,
605 auth_provider,
606 ));
607 }
608
609 fn before_connection_open(&self, resolver: PromiseResolver<Connection>) {
610 self.connection_status
611 .set_connection_step(ConnectionStep::Open(resolver));
612 }
613
614 fn on_connection_close_ok_sent(&self, error: Error) {
615 self.internal_rpc.finish_connection_shutdown();
616 if !self.recovery_config.can_recover(&error) {
617 if let ErrorKind::ProtocolError(_) = error.kind() {
618 self.internal_rpc.set_connection_error(error);
619 } else {
620 self.internal_rpc.set_connection_closed(error);
621 }
622 }
623 }
624
625 fn next_expected_close_ok_reply(&self) -> Option<Reply> {
626 self.frames.next_expected_close_ok_reply(
627 self.id,
628 ErrorKind::InvalidChannelState(
629 ChannelState::Closed,
630 "unexpected channel.close-ok received",
631 )
632 .into(),
633 )
634 }
635
636 fn before_channel_close(&self) {
637 self.set_closing(None);
638 }
639
640 fn on_channel_close_ok_sent(&self, error: Option<Error>) {
641 let err = error.clone().unwrap_or_else(|| {
642 ErrorKind::InvalidChannelState(ChannelState::Closing, "channel.close-ok sent").into()
643 });
644 self.poison(err.clone());
645 self.set_closed(err);
646 if let Some(error) = error {
647 self.events_sender.error(error.clone());
648 }
649 }
650
651 fn on_basic_recover_async_sent(&self) {
652 self.consumers.drop_prefetched_messages();
653 }
654
655 fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
656 if multiple && delivery_tag == 0 {
657 self.consumers.drop_prefetched_messages();
658 }
659 }
660
661 fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
662 if multiple && delivery_tag == 0 {
663 self.consumers.drop_prefetched_messages();
664 }
665 }
666
667 fn tune_connection_configuration(
668 &self,
669 channel_max: ChannelId,
670 frame_max: FrameSize,
671 heartbeat: Heartbeat,
672 ) {
673 if self.configuration.heartbeat() == 0
676 || (heartbeat != 0 && heartbeat < self.configuration.heartbeat())
677 {
678 self.configuration.set_heartbeat(heartbeat);
679 }
680
681 if channel_max != 0 {
682 if self.configuration.channel_max() == 0
685 || channel_max < self.configuration.channel_max()
686 {
687 self.configuration.set_channel_max(channel_max);
688 }
689 }
690 if self.configuration.channel_max() == 0 {
691 self.configuration.set_channel_max(ChannelId::MAX);
692 }
693
694 if frame_max != 0 {
695 if self.configuration.frame_max() == 0 || frame_max < self.configuration.frame_max() {
698 self.configuration.set_frame_max(frame_max);
699 }
700 }
701 if self.configuration.frame_max() == 0 {
702 self.configuration.set_frame_max(FrameSize::MAX);
703 }
704 }
705
706 fn connection_process_error(
707 &self,
708 state: ConnectionState,
709 step: Option<ConnectionStep>,
710 ) -> Result<()> {
711 error!(
712 ?state,
713 step = step.as_ref().map(ConnectionStep::name),
714 "Invalid state"
715 );
716 let error: Error = ErrorKind::InvalidConnectionState(state).into();
717 self.internal_rpc.set_connection_error(error.clone());
718 if let Some((resolver, _)) = step.map(ConnectionStep::into_connection_resolver) {
719 resolver.reject(error.clone());
720 }
721 Err(error)
722 }
723
724 fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<()> {
725 trace!(?method, "Server sent connection::Start");
726
727 let state = self.connection_status.state();
728 let step = self.connection_status.connection_step();
729 if state != ConnectionState::Connecting {
730 return self.connection_process_error(state, step);
731 }
732 let Some(step) = step else {
733 return self.connection_process_error(state, step);
734 };
735
736 match step {
737 ConnectionStep::ProtocolHeader(resolver, mut connection) => {
738 let configuration = connection.configuration_mut();
739 let auth_provider = configuration.auth_provider.clone();
740 let mechanism = auth_provider.mechanism();
741 let locale = configuration.amqp_locale.clone();
742
743 if !method
744 .mechanisms
745 .to_string()
746 .split_whitespace()
747 .any(|m| m == mechanism.as_str())
748 {
749 error!(%mechanism, "unsupported mechanism");
750 }
751 if !method
752 .locales
753 .to_string()
754 .split_whitespace()
755 .any(|l| l == locale.as_str())
756 {
757 error!(%locale, "unsupported locale");
758 }
759
760 if !configuration.amqp_client_properties.contains_key("product")
761 || !configuration.amqp_client_properties.contains_key("version")
762 {
763 configuration.amqp_client_properties.insert(
764 "product".into(),
765 AMQPValue::LongString(env!("CARGO_PKG_NAME").into()),
766 );
767 configuration.amqp_client_properties.insert(
768 "version".into(),
769 AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()),
770 );
771 }
772
773 configuration
774 .amqp_client_properties
775 .insert("platform".into(), AMQPValue::LongString("rust".into()));
776
777 let mut capabilities = FieldTable::default();
778 capabilities.insert("publisher_confirms".into(), true.into());
779 capabilities.insert("exchange_exchange_bindings".into(), true.into());
780 capabilities.insert("basic.nack".into(), true.into());
781 capabilities.insert("consumer_cancel_notify".into(), true.into());
782 capabilities.insert("connection.blocked".into(), true.into());
783 capabilities.insert("consumer_priorities".into(), true.into());
784 capabilities.insert("authentication_failure_close".into(), true.into());
785 capabilities.insert("per_consumer_qos".into(), true.into());
786 capabilities.insert("direct_reply_to".into(), true.into());
787
788 configuration
789 .amqp_client_properties
790 .insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
791
792 let auth_starter = auth_provider
793 .auth_starter()
794 .map_err(ErrorKind::AuthProviderError)?;
795 let channel = self.clone();
796 let client_properties = configuration.amqp_client_properties.clone();
797 self.internal_rpc.spawn(async move {
798 channel
799 .connection_start_ok(
800 client_properties,
801 mechanism,
802 auth_starter,
803 locale,
804 resolver,
805 connection,
806 auth_provider,
807 )
808 .await
809 });
810 Ok(())
811 }
812 step => self.connection_process_error(state, Some(step)),
813 }
814 }
815
816 fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<()> {
817 trace!(?method, "Server sent connection::Secure");
818
819 let state = self.connection_status.state();
820 let step = self.connection_status.connection_step();
821 if state != ConnectionState::Connecting {
822 return self.connection_process_error(state, step);
823 }
824 let Some(step) = step else {
825 return self.connection_process_error(state, step);
826 };
827
828 match step {
829 ConnectionStep::StartOk(resolver, connection, auth_provider)
830 | ConnectionStep::SecureOk(resolver, connection, auth_provider) => {
831 let channel = self.clone();
832 let response = auth_provider
833 .continue_auth(method.challenge)
834 .map_err(ErrorKind::AuthProviderError)?;
835 self.internal_rpc.spawn(async move {
836 channel
837 .connection_secure_ok(response, resolver, connection, auth_provider)
838 .await
839 });
840 Ok(())
841 }
842 step => self.connection_process_error(state, Some(step)),
843 }
844 }
845
846 fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<()> {
847 trace!(?method, "Server sent Connection::Tune");
848
849 let state = self.connection_status.state();
850 let step = self.connection_status.connection_step();
851 if state != ConnectionState::Connecting {
852 return self.connection_process_error(state, step);
853 }
854 let Some(step) = step else {
855 return self.connection_process_error(state, step);
856 };
857
858 match step {
859 ConnectionStep::StartOk(resolver, connection, _)
860 | ConnectionStep::SecureOk(resolver, connection, _) => {
861 self.tune_connection_configuration(
862 method.channel_max,
863 method.frame_max,
864 method.heartbeat,
865 );
866
867 let channel = self.clone();
868 let configuration = self.configuration.clone();
869 let vhost = self.connection_status.vhost();
870 self.internal_rpc.spawn(async move {
871 channel
872 .connection_tune_ok(
873 configuration.channel_max(),
874 configuration.frame_max(),
875 configuration.heartbeat(),
876 )
877 .await?;
878 channel
879 .connection_open(vhost, Box::new(connection), resolver)
880 .await
881 });
882 Ok(())
883 }
884 step => self.connection_process_error(state, Some(step)),
885 }
886 }
887
888 #[allow(clippy::boxed_local)]
889 fn on_connection_open_ok_received(
890 &self,
891 _: protocol::connection::OpenOk,
892 connection: Box<Connection>,
893 ) -> Result<()> {
894 let state = self.connection_status.state();
895 let step = self.connection_status.connection_step();
896 if state != ConnectionState::Connecting {
897 return self.connection_process_error(state, step);
898 }
899 let Some(step) = step else {
900 return self.connection_process_error(state, step);
901 };
902
903 match step {
904 ConnectionStep::Open(resolver) => {
905 self.connection_status.set_state(ConnectionState::Connected);
906 resolver.resolve(*connection);
907 self.events_sender.connected();
908 Ok(())
909 }
910 step => self.connection_process_error(state, Some(step)),
911 }
912 }
913
914 fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<()> {
915 let error: Error = AMQPError::try_from(method.clone())
916 .map(|error| {
917 error!(
918 channel=%self.id,
919 ?method,
920 ?error,
921 "Connection closed",
922 );
923 ErrorKind::ProtocolError(error).into()
924 })
925 .unwrap_or_else(|error| {
926 error!(%error);
927 info!(channel=%self.id, ?method, "Connection closed");
928 ErrorKind::InvalidConnectionState(ConnectionState::Closed).into()
929 });
930 if self.recovery_config.can_recover(&error) {
931 self.internal_rpc.init_connection_recovery(error.clone());
932 } else {
933 let connection_resolver = self.connection_status.connection_resolver();
934 self.internal_rpc
935 .init_connection_shutdown(error.clone(), connection_resolver);
936 }
937 self.internal_rpc.send_connection_close_ok(error);
938 Ok(())
939 }
940
941 fn on_connection_blocked_received(&self, method: protocol::connection::Blocked) -> Result<()> {
942 self.connection_status.block();
943 self.events_sender.connection_blocked(method.reason.into());
944 Ok(())
945 }
946
947 fn on_connection_unblocked_received(
948 &self,
949 _method: protocol::connection::Unblocked,
950 ) -> Result<()> {
951 self.connection_status.unblock();
952 self.events_sender.connection_unblocked();
953 self.wake();
954 Ok(())
955 }
956
957 fn on_connection_close_ok_received(&self) -> Result<()> {
958 self.internal_rpc.set_connection_closed(
959 ErrorKind::InvalidConnectionState(ConnectionState::Closed).into(),
960 );
961 Ok(())
962 }
963
964 fn on_channel_open_ok_received(
965 &self,
966 _method: protocol::channel::OpenOk,
967 resolver: PromiseResolver<Channel>,
968 channel: Channel,
969 ) -> Result<()> {
970 if !self.status.confirm() {
971 self.finalize_connection();
972 }
973 resolver.resolve(channel);
974 Ok(())
975 }
976
977 fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<()> {
978 self.status.set_send_flow(method.active);
979 self.events_sender.send_flow(method.active);
980 let channel = self.clone();
981 self.internal_rpc.spawn(async move {
982 channel
983 .channel_flow_ok(ChannelFlowOkOptions {
984 active: method.active,
985 })
986 .await
987 });
988 Ok(())
989 }
990
991 fn on_channel_flow_ok_received(
992 &self,
993 method: protocol::channel::FlowOk,
994 resolver: PromiseResolver<Boolean>,
995 ) -> Result<()> {
996 resolver.resolve(method.active);
998 Ok(())
999 }
1000
1001 fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<()> {
1002 let error = AMQPError::try_from(method.clone()).map(|error| {
1003 error!(
1004 channel=%self.id, ?method, ?error,
1005 "Channel closed"
1006 );
1007 Error::from(ErrorKind::ProtocolError(error))
1008 }).map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
1009 self.set_closing(error.clone());
1010 let channel = self.clone();
1011 self.internal_rpc
1012 .spawn(async move { channel.channel_close_ok(error).await });
1013 Ok(())
1014 }
1015
1016 fn on_channel_close_ok_received(&self) -> Result<()> {
1017 self.set_closed(
1018 ErrorKind::InvalidChannelState(ChannelState::Closed, "channel.close-ok received")
1019 .into(),
1020 );
1021 Ok(())
1022 }
1023
1024 fn on_exchange_bind_ok_received(
1025 &self,
1026 destination: ShortString,
1027 source: ShortString,
1028 routing_key: ShortString,
1029 arguments: FieldTable,
1030 ) -> Result<()> {
1031 self.local_registry
1032 .register_exchange_binding(destination, source, routing_key, arguments);
1033 Ok(())
1034 }
1035
1036 fn on_exchange_unbind_ok_received(
1037 &self,
1038 destination: ShortString,
1039 source: ShortString,
1040 routing_key: ShortString,
1041 arguments: FieldTable,
1042 ) -> Result<()> {
1043 self.local_registry.deregister_exchange_binding(
1044 destination.as_str(),
1045 source.as_str(),
1046 routing_key.as_str(),
1047 &arguments,
1048 );
1049 Ok(())
1050 }
1051
1052 fn on_exchange_declare_ok_received(
1053 &self,
1054 resolver: PromiseResolver<()>,
1055 exchange: ShortString,
1056 kind: ExchangeKind,
1057 options: ExchangeDeclareOptions,
1058 arguments: FieldTable,
1059 ) -> Result<()> {
1060 self.local_registry
1061 .register_exchange(exchange, kind, options, arguments);
1062 resolver.resolve(());
1063 Ok(())
1064 }
1065
1066 fn on_exchange_delete_ok_received(&self, exchange: ShortString) -> Result<()> {
1067 self.local_registry.deregister_exchange(exchange.as_str());
1068 Ok(())
1069 }
1070
1071 fn on_queue_delete_ok_received(
1072 &self,
1073 method: protocol::queue::DeleteOk,
1074 resolver: PromiseResolver<MessageCount>,
1075 queue: ShortString,
1076 ) -> Result<()> {
1077 self.local_registry.deregister_queue(queue.as_str());
1078 resolver.resolve(method.message_count);
1079 Ok(())
1080 }
1081
1082 fn on_queue_purge_ok_received(
1083 &self,
1084 method: protocol::queue::PurgeOk,
1085 resolver: PromiseResolver<MessageCount>,
1086 ) -> Result<()> {
1087 resolver.resolve(method.message_count);
1088 Ok(())
1089 }
1090
1091 fn on_queue_declare_ok_received(
1092 &self,
1093 method: protocol::queue::DeclareOk,
1094 resolver: PromiseResolver<Queue>,
1095 options: QueueDeclareOptions,
1096 arguments: FieldTable,
1097 ) -> Result<()> {
1098 self.local_registry
1099 .register_queue(method.queue.clone(), options, arguments);
1100 resolver.resolve(Queue::new(
1101 method.queue,
1102 method.message_count,
1103 method.consumer_count,
1104 ));
1105 Ok(())
1106 }
1107
1108 fn on_queue_bind_ok_received(
1109 &self,
1110 queue: ShortString,
1111 exchange: ShortString,
1112 routing_key: ShortString,
1113 arguments: FieldTable,
1114 ) -> Result<()> {
1115 self.local_registry
1116 .register_queue_binding(queue, exchange, routing_key, arguments);
1117 Ok(())
1118 }
1119
1120 fn on_queue_unbind_ok_received(
1121 &self,
1122 queue: ShortString,
1123 exchange: ShortString,
1124 routing_key: ShortString,
1125 arguments: FieldTable,
1126 ) -> Result<()> {
1127 self.local_registry.deregister_queue_binding(
1128 queue.as_str(),
1129 exchange.as_str(),
1130 routing_key.as_str(),
1131 &arguments,
1132 );
1133 Ok(())
1134 }
1135
1136 fn on_basic_get_ok_received(
1137 &self,
1138 method: protocol::basic::GetOk,
1139 resolver: PromiseResolver<Option<BasicGetMessage>>,
1140 ) -> Result<()> {
1141 let class_id = method.get_amqp_class_id();
1142 let killswitch = self.status.set_will_receive(class_id, DeliveryCause::Get);
1143 self.basic_get_delivery.start_new_delivery(
1144 BasicGetMessage::new(
1145 self.id,
1146 method.delivery_tag,
1147 method.exchange,
1148 method.routing_key,
1149 method.redelivered,
1150 method.message_count,
1151 self.internal_rpc.clone(),
1152 killswitch,
1153 ),
1154 resolver,
1155 );
1156 Ok(())
1157 }
1158
1159 fn on_basic_get_empty_received(&self, method: protocol::basic::GetEmpty) -> Result<()> {
1160 match self
1161 .frames
1162 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
1163 {
1164 Some(Reply::BasicGetOk(resolver, ..)) => {
1165 resolver.resolve(None);
1166 Ok(())
1167 }
1168 _ => self.handle_invalid_contents(
1169 format!("unexpected basic get empty received on channel {}", self.id),
1170 method.get_amqp_class_id(),
1171 method.get_amqp_method_id(),
1172 ),
1173 }
1174 }
1175
1176 fn on_basic_consume_ok_received(
1177 &self,
1178 method: protocol::basic::ConsumeOk,
1179 resolver: PromiseResolver<Consumer>,
1180 channel_closer: Option<Arc<ChannelCloser>>,
1181 queue: ShortString,
1182 options: BasicConsumeOptions,
1183 arguments: FieldTable,
1184 original: Option<Consumer>,
1185 ) -> Result<()> {
1186 let consumer = original.unwrap_or_else(|| {
1187 Consumer::new(
1188 method.consumer_tag.clone(),
1189 self.internal_rpc.clone(),
1190 channel_closer,
1191 queue,
1192 options,
1193 arguments,
1194 )
1195 });
1196 self.consumers
1197 .register(method.consumer_tag, consumer.clone());
1198 resolver.resolve(consumer);
1199 Ok(())
1200 }
1201
1202 fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<()> {
1203 let class_id = method.get_amqp_class_id();
1204 let consumer_tag = method.consumer_tag.clone();
1205 let killswitch = self
1206 .status
1207 .set_will_receive(class_id, DeliveryCause::Consume(consumer_tag.clone()));
1208 self.consumers.start_delivery(&consumer_tag, |error| {
1209 Delivery::new(
1210 self.id,
1211 method.delivery_tag,
1212 method.exchange,
1213 method.routing_key,
1214 method.redelivered,
1215 Some(self.internal_rpc.clone()),
1216 Some(error),
1217 killswitch,
1218 )
1219 });
1220 Ok(())
1221 }
1222
1223 pub(crate) fn deregister_consumer(&self, consumer_tag: &str) {
1224 self.consumers.deregister(consumer_tag);
1225 }
1226
1227 fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<()> {
1228 self.deregister_consumer(method.consumer_tag.as_str());
1229 if !method.nowait {
1230 let channel = self.clone();
1231 self.internal_rpc
1232 .spawn(async move { channel.basic_cancel_ok(method.consumer_tag).await });
1233 }
1234 Ok(())
1235 }
1236
1237 fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<()> {
1238 self.deregister_consumer(method.consumer_tag.as_str());
1239 Ok(())
1240 }
1241
1242 fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<()> {
1243 if self.status.confirm() {
1244 if method.multiple {
1245 if method.delivery_tag > 0 {
1246 self.acknowledgements
1247 .ack_all_before(method.delivery_tag)
1248 .or_else(|err| {
1249 self.acknowledgement_error(
1250 err,
1251 method.get_amqp_class_id(),
1252 method.get_amqp_method_id(),
1253 )
1254 })?;
1255 } else {
1256 self.acknowledgements.ack_all_pending();
1257 }
1258 } else {
1259 self.acknowledgements
1260 .ack(method.delivery_tag)
1261 .or_else(|err| {
1262 self.acknowledgement_error(
1263 err,
1264 method.get_amqp_class_id(),
1265 method.get_amqp_method_id(),
1266 )
1267 })?;
1268 }
1269 }
1270 Ok(())
1271 }
1272
1273 fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<()> {
1274 if self.status.confirm() {
1275 if method.multiple {
1276 if method.delivery_tag > 0 {
1277 self.acknowledgements
1278 .nack_all_before(method.delivery_tag)
1279 .or_else(|err| {
1280 self.acknowledgement_error(
1281 err,
1282 method.get_amqp_class_id(),
1283 method.get_amqp_method_id(),
1284 )
1285 })?;
1286 } else {
1287 self.acknowledgements.nack_all_pending();
1288 }
1289 } else {
1290 self.acknowledgements
1291 .nack(method.delivery_tag)
1292 .or_else(|err| {
1293 self.acknowledgement_error(
1294 err,
1295 method.get_amqp_class_id(),
1296 method.get_amqp_method_id(),
1297 )
1298 })?;
1299 }
1300 }
1301 Ok(())
1302 }
1303
1304 fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<()> {
1305 let class_id = method.get_amqp_class_id();
1306 let killswitch = self
1307 .status
1308 .set_will_receive(class_id, DeliveryCause::Return);
1309 self.returned_messages
1310 .start_new_delivery(BasicReturnMessage::new(
1311 method.exchange,
1312 method.routing_key,
1313 method.reply_code,
1314 method.reply_text,
1315 killswitch,
1316 ));
1317 Ok(())
1318 }
1319
1320 fn on_basic_recover_ok_received(&self) -> Result<()> {
1321 self.consumers.drop_prefetched_messages();
1322 Ok(())
1323 }
1324
1325 fn on_confirm_select_ok_received(&self) -> Result<()> {
1326 self.status.set_confirm();
1327 Ok(())
1328 }
1329
1330 fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<()> {
1331 Ok(())
1332 }
1333}
1334
1335#[cfg(feature = "codegen")]
1336include!(concat!(env!("OUT_DIR"), "/channel.rs"));
1337#[cfg(not(feature = "codegen"))]
1338include!("generated/channel.rs");