1use crate::{
2 BasicProperties, Configuration, Connection, ConnectionStatus, Error, ErrorKind, ExchangeKind,
3 Promise, PromiseResolver, Result,
4 acknowledgement::Acknowledgements,
5 auth::Credentials,
6 basic_get_delivery::BasicGetDelivery,
7 channel_closer::ChannelCloser,
8 channel_receiver_state::DeliveryCause,
9 channel_status::{ChannelState, ChannelStatus},
10 connection_closer::ConnectionCloser,
11 connection_status::{ConnectionState, ConnectionStep},
12 consumer::Consumer,
13 consumers::Consumers,
14 error_handler::ErrorHandler,
15 frames::{ExpectedReply, Frames},
16 internal_rpc::InternalRPCHandle,
17 message::{BasicGetMessage, BasicReturnMessage, Delivery},
18 protocol::{self, AMQPClass, AMQPError, AMQPHardError},
19 publisher_confirm::PublisherConfirm,
20 queue::Queue,
21 recovery_config::RecoveryConfig,
22 registry::Registry,
23 returned_messages::ReturnedMessages,
24 socket_state::SocketStateHandle,
25 topology::ChannelDefinition,
26 types::*,
27};
28use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
29use executor_trait::FullExecutor;
30use std::{convert::TryFrom, fmt, sync::Arc};
31use tracing::{Level, error, info, level_enabled, trace};
32
33#[derive(Clone)]
43pub struct Channel {
44 id: ChannelId,
45 configuration: Configuration,
46 status: ChannelStatus,
47 connection_status: ConnectionStatus,
48 local_registry: Registry,
49 acknowledgements: Acknowledgements,
50 consumers: Consumers,
51 basic_get_delivery: BasicGetDelivery,
52 returned_messages: ReturnedMessages,
53 waker: SocketStateHandle,
54 internal_rpc: InternalRPCHandle,
55 frames: Frames,
56 error_handler: ErrorHandler,
57 executor: Arc<dyn FullExecutor + Send + Sync>,
58 channel_closer: Option<Arc<ChannelCloser>>,
59 connection_closer: Option<Arc<ConnectionCloser>>,
60 recovery_config: RecoveryConfig,
61}
62
63impl PartialEq for Channel {
64 fn eq(&self, other: &Self) -> bool {
65 self.id == other.id
66 }
67}
68
69impl fmt::Debug for Channel {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("Channel")
72 .field("id", &self.id)
73 .field("configuration", &self.configuration)
74 .field("status", &self.status)
75 .field("connection_status", &self.connection_status)
76 .field("acknowledgements", &self.acknowledgements)
77 .field("consumers", &self.consumers)
78 .field("basic_get_delivery", &self.basic_get_delivery)
79 .field("returned_messages", &self.returned_messages)
80 .field("frames", &self.frames)
81 .finish()
82 }
83}
84
85impl Channel {
86 #[allow(clippy::too_many_arguments)]
87 pub(crate) fn new(
88 channel_id: ChannelId,
89 configuration: Configuration,
90 connection_status: ConnectionStatus,
91 waker: SocketStateHandle,
92 internal_rpc: InternalRPCHandle,
93 frames: Frames,
94 executor: Arc<dyn FullExecutor + Send + Sync>,
95 connection_closer: Option<Arc<ConnectionCloser>>,
96 recovery_config: RecoveryConfig,
97 ) -> Channel {
98 let returned_messages = ReturnedMessages::default();
99 let status = ChannelStatus::new(channel_id, internal_rpc.clone());
100 let channel_closer = if channel_id == 0 {
101 None
102 } else {
103 Some(Arc::new(ChannelCloser::new(
104 channel_id,
105 status.clone(),
106 internal_rpc.clone(),
107 )))
108 };
109 Self {
110 id: channel_id,
111 configuration,
112 status,
113 connection_status,
114 local_registry: Registry::default(),
115 acknowledgements: Acknowledgements::new(channel_id, returned_messages.clone()),
116 consumers: Consumers::default(),
117 basic_get_delivery: BasicGetDelivery::default(),
118 returned_messages,
119 waker,
120 internal_rpc,
121 frames,
122 error_handler: ErrorHandler::default(),
123 executor,
124 channel_closer,
125 connection_closer,
126 recovery_config,
127 }
128 }
129
130 pub fn status(&self) -> &ChannelStatus {
131 &self.status
132 }
133
134 pub fn on_error<E: FnMut(Error) + Send + 'static>(&self, handler: E) {
135 self.error_handler.set_handler(handler);
136 }
137
138 pub async fn wait_for_recovery(&self, error: Error) -> Result<()> {
139 if self.recovery_config.can_recover(&error) {
140 #[allow(deprecated)]
141 if let Some(notifier) = error.notifier() {
142 notifier.await;
143 return Ok(());
144 }
145 }
146 Err(error)
147 }
148
149 pub(crate) fn set_closing(&self, error: Option<Error>) {
150 self.set_state(ChannelState::Closing);
151 if let Some(error) = error {
152 self.error_publisher_confirms(error.clone());
153 self.error_consumers(error); } else {
155 self.consumers.start_cancel();
156 }
157 }
158
159 pub(crate) fn set_closed(&self, error: Error) {
160 self.set_state(ChannelState::Closed);
161 self.error_publisher_confirms(error.clone());
162 self.cancel_consumers();
163 self.internal_rpc.remove_channel(self.id, error);
164 }
165
166 pub(crate) fn set_connection_error(&self, error: Error) {
168 self.set_state(ChannelState::Error);
169 self.error_publisher_confirms(error.clone());
170 self.error_consumers(error.clone());
171 self.internal_rpc.remove_channel(self.id, error.clone());
172 }
173
174 fn error_publisher_confirms(&self, error: Error) {
175 self.acknowledgements.on_channel_error(error);
176 }
177
178 fn cancel_consumers(&self) {
179 self.consumers.cancel();
180 }
181
182 fn error_consumers(&self, error: Error) {
183 let recover = self.recovery_config.can_recover(&error);
184 self.consumers.error(error, recover);
185 }
186
187 pub(crate) fn set_state(&self, state: ChannelState) {
188 self.status.set_state(state);
189 }
190
191 pub fn id(&self) -> ChannelId {
192 self.id
193 }
194
195 pub(crate) fn clone_internal(&self) -> Self {
196 Self {
197 id: self.id,
198 configuration: self.configuration.clone(),
199 status: self.status.clone(),
200 connection_status: self.connection_status.clone(),
201 local_registry: self.local_registry.clone(),
202 acknowledgements: self.acknowledgements.clone(),
203 consumers: self.consumers.clone(),
204 basic_get_delivery: self.basic_get_delivery.clone(),
205 returned_messages: self.returned_messages.clone(),
206 waker: self.waker.clone(),
207 internal_rpc: self.internal_rpc.clone(),
208 frames: self.frames.clone(),
209 error_handler: self.error_handler.clone(),
210 executor: self.executor.clone(),
211 channel_closer: None,
212 connection_closer: self.connection_closer.clone(),
213 recovery_config: self.recovery_config.clone(),
214 }
215 }
216
217 fn wake(&self) {
218 trace!(channel=%self.id, "wake");
219 self.waker.wake()
220 }
221
222 fn assert_channel0(&self, class_id: Identifier, method_id: Identifier) -> Result<()> {
223 if self.id == 0 {
224 Ok(())
225 } else {
226 error!(
227 channel=%self.id,
228 "Got a connection frame on, closing connection"
229 );
230 let error = AMQPError::new(
231 AMQPHardError::COMMANDINVALID.into(),
232 format!("connection frame received on channel {}", self.id).into(),
233 );
234 self.internal_rpc.close_connection(
235 error.get_id(),
236 error.get_message().to_string(),
237 class_id,
238 method_id,
239 );
240 Err(ErrorKind::ProtocolError(error).into())
241 }
242 }
243
244 pub async fn close(&self, reply_code: ReplyCode, reply_text: &str) -> Result<()> {
245 self.do_channel_close(reply_code, reply_text, 0, 0).await
246 }
247
248 pub async fn basic_consume(
249 &self,
250 queue: &str,
251 consumer_tag: &str,
252 options: BasicConsumeOptions,
253 arguments: FieldTable,
254 ) -> Result<Consumer> {
255 let consumer = self
256 .do_basic_consume(queue, consumer_tag, options, arguments, None)
257 .await?;
258 Ok(consumer.external(self.id, self.internal_rpc.clone()))
259 }
260
261 pub async fn basic_get(
262 &self,
263 queue: &str,
264 options: BasicGetOptions,
265 ) -> Result<Option<BasicGetMessage>> {
266 self.do_basic_get(queue, options, None).await
267 }
268
269 pub async fn exchange_declare(
270 &self,
271 exchange: &str,
272 kind: ExchangeKind,
273 options: ExchangeDeclareOptions,
274 arguments: FieldTable,
275 ) -> Result<()> {
276 self.do_exchange_declare(exchange, kind.kind(), options, arguments, kind.clone())
277 .await
278 }
279
280 pub async fn wait_for_confirms(&self) -> Result<Vec<BasicReturnMessage>> {
281 if let Some(last_pending) = self.acknowledgements.get_last_pending() {
282 trace!("Waiting for pending confirms");
283 last_pending.await?;
284 } else {
285 trace!("No confirms to wait for");
286 }
287 Ok(self.returned_messages.drain())
288 }
289
290 #[cfg(test)]
291 pub(crate) fn register_queue(
292 &self,
293 name: ShortString,
294 options: QueueDeclareOptions,
295 arguments: FieldTable,
296 ) {
297 self.local_registry.register_queue(name, options, arguments);
298 }
299
300 #[cfg(test)]
301 pub(crate) fn register_consumer(&self, tag: ShortString, consumer: Consumer) {
302 self.consumers.register(tag, consumer);
303 }
304
305 pub(crate) fn finalize_connection(&self) {
306 self.status.finalize_connection();
307 }
308
309 fn init_recovery_or_shutdown(&self, error: Option<Error>) -> Option<Error> {
310 match error {
311 Some(err) if self.recovery_config.can_recover_channel(&err) => {
312 Some(self.init_recovery_poison(err, false))
313 }
314 err => {
315 self.set_closing(err.clone());
316 err
317 }
318 }
319 }
320
321 pub(crate) fn init_recovery(&self, error: Error) -> Error {
322 self.init_recovery_poison(error, true)
323 }
324
325 fn init_recovery_poison(&self, error: Error, poison: bool) -> Error {
326 let err = self.status.set_reconnecting(error, self.topology());
327 self.frames.drop_frames_for_channel(self.id, err.clone());
328 if poison {
329 self.poison(err.clone());
330 }
331 err
332 }
333
334 fn poison(&self, error: Error) {
335 self.frames.poison_channel(self.id, error);
336 }
337
338 pub(crate) fn update_recovery(&self) -> Option<ChannelDefinition> {
339 self.status.update_recovery_context(|ctx| {
340 ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
342 self.acknowledgements.reset(ctx.cause());
344 self.frames.drop_channel_poison(self.id);
346 ctx.topology()
347 })
348 }
349
350 pub(crate) async fn start_recovery(&self) -> Result<()> {
351 let topology = self.update_recovery().expect("No topology during recovery");
352
353 self.channel_open(self.clone()).await?;
355
356 if self.status.confirm() {
358 self.confirm_select(ConfirmSelectOptions::default()).await?;
359 }
360
361 for ex in &topology.exchanges {
363 if ex.is_declared {
364 self.exchange_declare(
365 ex.name.as_str(),
366 ex.kind.clone().unwrap_or_default(),
367 ex.options.unwrap_or_default(),
368 ex.arguments.clone().unwrap_or_default(),
369 )
370 .await?;
371 }
372 }
373
374 for ex in &topology.exchanges {
376 for binding in &ex.bindings {
377 self.exchange_bind(
378 ex.name.as_str(),
379 binding.source.as_str(),
380 binding.routing_key.as_str(),
381 ExchangeBindOptions::default(),
382 binding.arguments.clone(),
383 )
384 .await?;
385 }
386 }
387
388 for queue in &topology.queues {
390 if queue.is_declared {
391 self.queue_declare(
392 queue.name.as_str(),
393 queue.options.unwrap_or_default(),
394 queue.arguments.clone().unwrap_or_default(),
395 )
396 .await?;
397 }
398 }
399
400 for queue in &topology.queues {
402 for binding in &queue.bindings {
403 self.queue_bind(
404 queue.name.as_str(),
405 binding.source.as_str(),
406 binding.routing_key.as_str(),
407 QueueBindOptions::default(),
408 binding.arguments.clone(),
409 )
410 .await?;
411 }
412 }
413
414 for consumer in topology.consumers.iter().cloned() {
416 consumer.reset();
417 self.do_basic_consume(
418 consumer.queue().as_str(),
419 consumer.tag().as_str(),
420 consumer.options(),
421 consumer.arguments(),
422 Some(consumer),
423 )
424 .await?;
425 }
426
427 Ok(())
428 }
429
430 pub(crate) fn send_method_frame(
431 &self,
432 method: AMQPClass,
433 resolver: PromiseResolver<()>,
434 expected_reply: Option<ExpectedReply>,
435 ) {
436 self.send_frame(AMQPFrame::Method(self.id, method), resolver, expected_reply);
437 }
438
439 pub(crate) fn send_frame(
440 &self,
441 frame: AMQPFrame,
442 resolver: PromiseResolver<()>,
443 expected_reply: Option<ExpectedReply>,
444 ) {
445 trace!(channel=%self.id, "send_frame");
446 self.frames.push(self.id, frame, resolver, expected_reply);
447 self.wake();
448 }
449
450 async fn send_method_frame_with_body(
451 &self,
452 method: AMQPClass,
453 payload: &[u8],
454 properties: BasicProperties,
455 publisher_confirms_result: Option<PublisherConfirm>,
456 ) -> Result<PublisherConfirm> {
457 let class_id = method.get_amqp_class_id();
458 let header = AMQPContentHeader {
459 class_id,
460 body_size: payload.len() as PayloadSize,
461 properties,
462 };
463 let frame_max = self.configuration.frame_max();
464 let mut frames = vec![
465 AMQPFrame::Method(self.id, method),
466 AMQPFrame::Header(self.id, class_id, Box::new(header)),
467 ];
468
469 frames.extend(
470 payload
471 .chunks(frame_max as usize - 8 )
472 .map(|chunk| AMQPFrame::Body(self.id, chunk.into())),
473 );
474
475 trace!(channel=%self.id, "send_frames");
476 let promise = self.frames.push_frames(self.id, frames);
477 self.wake();
478 promise.await?;
479 Ok(publisher_confirms_result
480 .unwrap_or_else(|| PublisherConfirm::not_requested(self.returned_messages.clone())))
481 }
482
483 fn handle_invalid_contents(
484 &self,
485 error: String,
486 class_id: Identifier,
487 method_id: Identifier,
488 ) -> Result<()> {
489 error!(%error);
490 let error = AMQPError::new(AMQPHardError::UNEXPECTEDFRAME.into(), error.into());
491 self.internal_rpc.close_connection(
492 error.get_id(),
493 error.get_message().to_string(),
494 class_id,
495 method_id,
496 );
497 Err(ErrorKind::ProtocolError(error).into())
498 }
499
500 pub(crate) fn handle_content_header_frame(
501 &self,
502 class_id: Identifier,
503 size: PayloadSize,
504 properties: BasicProperties,
505 ) -> Result<()> {
506 self.status.set_content_length(
507 self.id,
508 class_id,
509 size,
510 |delivery_cause, confirm_mode| match delivery_cause {
511 DeliveryCause::Consume(consumer_tag) => {
512 self.consumers
513 .handle_content_header_frame(consumer_tag, size, properties);
514 }
515 DeliveryCause::Get => {
516 self.basic_get_delivery
517 .handle_content_header_frame(size, properties);
518 }
519 DeliveryCause::Return => {
520 self.returned_messages.handle_content_header_frame(
521 size,
522 properties,
523 confirm_mode,
524 );
525 }
526 },
527 |msg| {
528 error!(%msg);
529 let error = AMQPError::new(AMQPHardError::FRAMEERROR.into(), msg.into());
530 self.internal_rpc.close_connection(
531 error.get_id(),
532 error.get_message().to_string(),
533 class_id,
534 0,
535 );
536 let error: Error = ErrorKind::ProtocolError(error).into();
537 self.internal_rpc.set_connection_error(error.clone());
538 Err(error)
539 },
540 |msg| self.handle_invalid_contents(msg, class_id, 0),
541 )
542 }
543
544 pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<()> {
545 self.status.receive(
546 self.id,
547 payload.len() as PayloadSize,
548 |delivery_cause, remaining_size, confirm_mode| match delivery_cause {
549 DeliveryCause::Consume(consumer_tag) => {
550 self.consumers
551 .handle_body_frame(consumer_tag, remaining_size, payload);
552 }
553 DeliveryCause::Get => {
554 self.basic_get_delivery
555 .handle_body_frame(remaining_size, payload);
556 }
557 DeliveryCause::Return => {
558 self.returned_messages
559 .handle_body_frame(remaining_size, payload, confirm_mode);
560 }
561 },
562 |msg| self.handle_invalid_contents(msg, 0, 0),
563 )
564 }
565
566 pub(crate) fn topology(&self) -> ChannelDefinition {
567 ChannelDefinition {
568 exchanges: self.local_registry.exchanges_topology(),
569 queues: self.local_registry.queues_topology(),
570 consumers: self.consumers.topology(),
571 }
572 }
573
574 fn before_basic_publish(&self) -> Option<PublisherConfirm> {
575 if self.status.confirm() {
576 Some(self.acknowledgements.register_pending())
577 } else {
578 None
579 }
580 }
581
582 fn before_basic_cancel(&self, consumer_tag: &str) {
583 self.consumers.start_cancel_one(consumer_tag);
584 }
585
586 fn acknowledgement_error(
587 &self,
588 error: AMQPError,
589 class_id: Identifier,
590 method_id: Identifier,
591 ) -> Result<()> {
592 error!("Got a bad acknowledgement from server, closing channel");
593 let channel = self.clone();
594 let err = error.clone();
595 self.internal_rpc.register_internal_future(async move {
596 channel
597 .do_channel_close(
598 error.get_id(),
599 error.get_message().as_str(),
600 class_id,
601 method_id,
602 )
603 .await
604 });
605 Err(ErrorKind::ProtocolError(err).into())
606 }
607
608 fn before_connection_start_ok(
609 &self,
610 resolver: PromiseResolver<Connection>,
611 connection: Connection,
612 credentials: Credentials,
613 ) {
614 self.connection_status
615 .set_connection_step(ConnectionStep::StartOk(resolver, connection, credentials));
616 }
617
618 fn before_connection_open(&self, resolver: PromiseResolver<Connection>) {
619 self.connection_status
620 .set_connection_step(ConnectionStep::Open(resolver));
621 }
622
623 fn on_connection_close_ok_sent(&self, error: Error) {
624 self.internal_rpc.finish_connection_shutdown();
625 if !self.recovery_config.can_recover_connection(&error) {
626 if let ErrorKind::ProtocolError(_) = error.kind() {
627 self.internal_rpc.set_connection_error(error);
628 } else {
629 self.internal_rpc.set_connection_closed(error);
630 }
631 }
632 }
633
634 fn next_expected_close_ok_reply(&self) -> Option<Reply> {
635 self.frames.next_expected_close_ok_reply(
636 self.id,
637 ErrorKind::InvalidChannelState(
638 ChannelState::Closed,
639 "unexpected channel.close-ok received",
640 )
641 .into(),
642 )
643 }
644
645 fn before_channel_close(&self) {
646 self.set_closing(None);
647 }
648
649 fn on_channel_close_ok_sent(&self, error: Option<Error>) {
650 let recover = error
651 .as_ref()
652 .is_some_and(|err| self.recovery_config.can_recover_channel(err));
653 let err = error.clone().unwrap_or_else(|| {
654 ErrorKind::InvalidChannelState(ChannelState::Closing, "channel.close-ok sent").into()
655 });
656
657 self.poison(err.clone());
658 if !recover {
659 self.set_closed(err);
660 if let Some(error) = error {
661 self.error_handler.on_error(error);
662 }
663 }
664 }
665
666 fn on_basic_recover_async_sent(&self) {
667 self.consumers.drop_prefetched_messages();
668 }
669
670 fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
671 if multiple && delivery_tag == 0 {
672 self.consumers.drop_prefetched_messages();
673 }
674 }
675
676 fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
677 if multiple && delivery_tag == 0 {
678 self.consumers.drop_prefetched_messages();
679 }
680 }
681
682 fn tune_connection_configuration(
683 &self,
684 channel_max: ChannelId,
685 frame_max: FrameSize,
686 heartbeat: Heartbeat,
687 ) {
688 if self.configuration.heartbeat() == 0
691 || (heartbeat != 0 && heartbeat < self.configuration.heartbeat())
692 {
693 self.configuration.set_heartbeat(heartbeat);
694 }
695
696 if channel_max != 0 {
697 if self.configuration.channel_max() == 0
700 || channel_max < self.configuration.channel_max()
701 {
702 self.configuration.set_channel_max(channel_max);
703 }
704 }
705 if self.configuration.channel_max() == 0 {
706 self.configuration.set_channel_max(ChannelId::MAX);
707 }
708
709 if frame_max != 0 {
710 if self.configuration.frame_max() == 0 || frame_max < self.configuration.frame_max() {
713 self.configuration.set_frame_max(frame_max);
714 }
715 }
716 if self.configuration.frame_max() == 0 {
717 self.configuration.set_frame_max(FrameSize::MAX);
718 }
719 }
720
721 fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<()> {
722 trace!(?method, "Server sent connection::Start");
723 let state = self.connection_status.state();
724 let step = self.connection_status.connection_step_name();
725 if let (
726 ConnectionState::Connecting,
727 Some(ConnectionStep::ProtocolHeader(
728 resolver,
729 connection,
730 credentials,
731 mechanism,
732 mut options,
733 )),
734 ) = (state, self.connection_status.connection_step())
735 {
736 let mechanism_str = mechanism.to_string();
737 let locale = options.locale.clone();
738
739 if !method
740 .mechanisms
741 .to_string()
742 .split_whitespace()
743 .any(|m| m == mechanism_str)
744 {
745 error!(%mechanism, "unsupported mechanism");
746 }
747 if !method
748 .locales
749 .to_string()
750 .split_whitespace()
751 .any(|l| l == locale)
752 {
753 error!(%locale, "unsupported locale");
754 }
755
756 if !options.client_properties.contains_key("product")
757 || !options.client_properties.contains_key("version")
758 {
759 options.client_properties.insert(
760 "product".into(),
761 AMQPValue::LongString(env!("CARGO_PKG_NAME").into()),
762 );
763 options.client_properties.insert(
764 "version".into(),
765 AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()),
766 );
767 }
768
769 options
770 .client_properties
771 .insert("platform".into(), AMQPValue::LongString("rust".into()));
772
773 let mut capabilities = FieldTable::default();
774 capabilities.insert("publisher_confirms".into(), true.into());
775 capabilities.insert("exchange_exchange_bindings".into(), true.into());
776 capabilities.insert("basic.nack".into(), true.into());
777 capabilities.insert("consumer_cancel_notify".into(), true.into());
778 capabilities.insert("connection.blocked".into(), true.into());
779 capabilities.insert("consumer_priorities".into(), true.into());
780 capabilities.insert("authentication_failure_close".into(), true.into());
781 capabilities.insert("per_consumer_qos".into(), true.into());
782 capabilities.insert("direct_reply_to".into(), true.into());
783
784 options
785 .client_properties
786 .insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
787
788 let channel = self.clone();
789 self.internal_rpc.register_internal_future(async move {
790 channel
791 .connection_start_ok(
792 options.client_properties,
793 &mechanism_str,
794 &credentials.sasl_auth_string(mechanism),
795 &locale,
796 resolver,
797 connection,
798 credentials,
799 )
800 .await
801 });
802 Ok(())
803 } else {
804 error!(?state, ?step, "Invalid state");
805 let error: Error = ErrorKind::InvalidConnectionState(state).into();
806 self.internal_rpc.set_connection_error(error.clone());
807 Err(error)
808 }
809 }
810
811 fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<()> {
812 trace!(?method, "Server sent connection::Secure");
813
814 let state = self.connection_status.state();
815 let step = self.connection_status.connection_step_name();
816 if let (ConnectionState::Connecting, Some(ConnectionStep::StartOk(.., credentials))) =
817 (state, self.connection_status.connection_step())
818 {
819 let channel = self.clone();
820 self.internal_rpc.register_internal_future(async move {
821 channel
822 .connection_secure_ok(&credentials.rabbit_cr_demo_answer())
823 .await
824 });
825 Ok(())
826 } else {
827 error!(?state, ?step, "Invalid state");
828 let error: Error = ErrorKind::InvalidConnectionState(state).into();
829 self.internal_rpc.set_connection_error(error.clone());
830 Err(error)
831 }
832 }
833
834 fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<()> {
835 trace!(?method, "Server sent Connection::Tune");
836
837 let state = self.connection_status.state();
838 let step = self.connection_status.connection_step_name();
839 if let (
840 ConnectionState::Connecting,
841 Some(ConnectionStep::StartOk(resolver, connection, _)),
842 ) = (state, self.connection_status.connection_step())
843 {
844 self.tune_connection_configuration(
845 method.channel_max,
846 method.frame_max,
847 method.heartbeat,
848 );
849
850 let channel = self.clone();
851 let configuration = self.configuration.clone();
852 let vhost = self.connection_status.vhost();
853 self.internal_rpc.register_internal_future(async move {
854 channel
855 .connection_tune_ok(
856 configuration.channel_max(),
857 configuration.frame_max(),
858 configuration.heartbeat(),
859 )
860 .await?;
861 channel
862 .connection_open(&vhost, Box::new(connection), resolver)
863 .await
864 });
865 Ok(())
866 } else {
867 error!(?state, ?step, "Invalid state");
868 let error: Error = ErrorKind::InvalidConnectionState(state).into();
869 self.internal_rpc.set_connection_error(error.clone());
870 Err(error)
871 }
872 }
873
874 fn on_connection_open_ok_received(
875 &self,
876 _: protocol::connection::OpenOk,
877 connection: Box<Connection>,
878 ) -> Result<()> {
879 let state = self.connection_status.state();
880 let step = self.connection_status.connection_step_name();
881 if let (ConnectionState::Connecting, Some(ConnectionStep::Open(resolver))) =
882 (state, self.connection_status.connection_step())
883 {
884 self.connection_status.set_state(ConnectionState::Connected);
885 resolver.resolve(*connection);
886 Ok(())
887 } else {
888 error!(?state, ?step, "Invalid state");
889 let error: Error = ErrorKind::InvalidConnectionState(state).into();
890 self.internal_rpc.set_connection_error(error.clone());
891 Err(error)
892 }
893 }
894
895 fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<()> {
896 let error: Error = AMQPError::try_from(method.clone())
897 .map(|error| {
898 error!(
899 channel=%self.id,
900 ?method,
901 ?error,
902 "Connection closed",
903 );
904 ErrorKind::ProtocolError(error).into()
905 })
906 .unwrap_or_else(|error| {
907 error!(%error);
908 info!(channel=%self.id, ?method, "Connection closed");
909 ErrorKind::InvalidConnectionState(ConnectionState::Closed).into()
910 });
911 if self.recovery_config.can_recover_connection(&error) {
912 self.internal_rpc.init_connection_recovery(error.clone());
913 } else {
914 self.internal_rpc.init_connection_shutdown(error.clone());
915 }
916 self.internal_rpc.send_connection_close_ok(error);
917 Ok(())
918 }
919
920 fn on_connection_blocked_received(&self, _method: protocol::connection::Blocked) -> Result<()> {
921 self.connection_status.block();
922 Ok(())
923 }
924
925 fn on_connection_unblocked_received(
926 &self,
927 _method: protocol::connection::Unblocked,
928 ) -> Result<()> {
929 self.connection_status.unblock();
930 self.wake();
931 Ok(())
932 }
933
934 fn on_connection_close_ok_received(&self) -> Result<()> {
935 self.internal_rpc.set_connection_closed(
936 ErrorKind::InvalidConnectionState(ConnectionState::Closed).into(),
937 );
938 Ok(())
939 }
940
941 fn on_channel_open_ok_received(
942 &self,
943 _method: protocol::channel::OpenOk,
944 resolver: PromiseResolver<Channel>,
945 channel: Channel,
946 ) -> Result<()> {
947 if !self.status.confirm() {
948 self.finalize_connection();
949 }
950 resolver.resolve(channel);
951 Ok(())
952 }
953
954 fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<()> {
955 self.status.set_send_flow(method.active);
956 let channel = self.clone();
957 self.internal_rpc.register_internal_future(async move {
958 channel
959 .channel_flow_ok(ChannelFlowOkOptions {
960 active: method.active,
961 })
962 .await
963 });
964 Ok(())
965 }
966
967 fn on_channel_flow_ok_received(
968 &self,
969 method: protocol::channel::FlowOk,
970 resolver: PromiseResolver<Boolean>,
971 ) -> Result<()> {
972 resolver.resolve(method.active);
974 Ok(())
975 }
976
977 fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<()> {
978 let error = self.init_recovery_or_shutdown(AMQPError::try_from(method.clone()).map(|error| {
979 error!(
980 channel=%self.id, ?method, ?error,
981 "Channel closed"
982 );
983 Error::from(ErrorKind::ProtocolError(error))
984 }).map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok());
985 let recover = error
986 .as_ref()
987 .is_some_and(|err| self.recovery_config.can_recover_channel(err));
988 let channel = self.clone();
989 self.internal_rpc.register_internal_future(async move {
990 channel.channel_close_ok(error).await?;
991 if recover {
992 channel.start_recovery().await?;
993 }
994 Ok(())
995 });
996 Ok(())
997 }
998
999 fn on_channel_close_ok_received(&self) -> Result<()> {
1000 self.set_closed(
1001 ErrorKind::InvalidChannelState(ChannelState::Closed, "channel.close-ok received")
1002 .into(),
1003 );
1004 Ok(())
1005 }
1006
1007 fn on_exchange_bind_ok_received(
1008 &self,
1009 destination: ShortString,
1010 source: ShortString,
1011 routing_key: ShortString,
1012 arguments: FieldTable,
1013 ) -> Result<()> {
1014 self.local_registry
1015 .register_exchange_binding(destination, source, routing_key, arguments);
1016 Ok(())
1017 }
1018
1019 fn on_exchange_unbind_ok_received(
1020 &self,
1021 destination: ShortString,
1022 source: ShortString,
1023 routing_key: ShortString,
1024 arguments: FieldTable,
1025 ) -> Result<()> {
1026 self.local_registry.deregister_exchange_binding(
1027 destination.as_str(),
1028 source.as_str(),
1029 routing_key.as_str(),
1030 &arguments,
1031 );
1032 Ok(())
1033 }
1034
1035 fn on_exchange_declare_ok_received(
1036 &self,
1037 resolver: PromiseResolver<()>,
1038 exchange: ShortString,
1039 kind: ExchangeKind,
1040 options: ExchangeDeclareOptions,
1041 arguments: FieldTable,
1042 ) -> Result<()> {
1043 self.local_registry
1044 .register_exchange(exchange, kind, options, arguments);
1045 resolver.resolve(());
1046 Ok(())
1047 }
1048
1049 fn on_exchange_delete_ok_received(&self, exchange: ShortString) -> Result<()> {
1050 self.local_registry.deregister_exchange(exchange.as_str());
1051 Ok(())
1052 }
1053
1054 fn on_queue_delete_ok_received(
1055 &self,
1056 method: protocol::queue::DeleteOk,
1057 resolver: PromiseResolver<MessageCount>,
1058 queue: ShortString,
1059 ) -> Result<()> {
1060 self.local_registry.deregister_queue(queue.as_str());
1061 resolver.resolve(method.message_count);
1062 Ok(())
1063 }
1064
1065 fn on_queue_purge_ok_received(
1066 &self,
1067 method: protocol::queue::PurgeOk,
1068 resolver: PromiseResolver<MessageCount>,
1069 ) -> Result<()> {
1070 resolver.resolve(method.message_count);
1071 Ok(())
1072 }
1073
1074 fn on_queue_declare_ok_received(
1075 &self,
1076 method: protocol::queue::DeclareOk,
1077 resolver: PromiseResolver<Queue>,
1078 options: QueueDeclareOptions,
1079 arguments: FieldTable,
1080 ) -> Result<()> {
1081 self.local_registry
1082 .register_queue(method.queue.clone(), options, arguments);
1083 resolver.resolve(Queue::new(
1084 method.queue,
1085 method.message_count,
1086 method.consumer_count,
1087 ));
1088 Ok(())
1089 }
1090
1091 fn on_queue_bind_ok_received(
1092 &self,
1093 queue: ShortString,
1094 exchange: ShortString,
1095 routing_key: ShortString,
1096 arguments: FieldTable,
1097 ) -> Result<()> {
1098 self.local_registry
1099 .register_queue_binding(queue, exchange, routing_key, arguments);
1100 Ok(())
1101 }
1102
1103 fn on_queue_unbind_ok_received(
1104 &self,
1105 queue: ShortString,
1106 exchange: ShortString,
1107 routing_key: ShortString,
1108 arguments: FieldTable,
1109 ) -> Result<()> {
1110 self.local_registry.deregister_queue_binding(
1111 queue.as_str(),
1112 exchange.as_str(),
1113 routing_key.as_str(),
1114 &arguments,
1115 );
1116 Ok(())
1117 }
1118
1119 fn on_basic_get_ok_received(
1120 &self,
1121 method: protocol::basic::GetOk,
1122 resolver: PromiseResolver<Option<BasicGetMessage>>,
1123 ) -> Result<()> {
1124 let class_id = method.get_amqp_class_id();
1125 let killswitch = self.status.set_will_receive(class_id, DeliveryCause::Get);
1126 self.basic_get_delivery.start_new_delivery(
1127 BasicGetMessage::new(
1128 self.id,
1129 method.delivery_tag,
1130 method.exchange,
1131 method.routing_key,
1132 method.redelivered,
1133 method.message_count,
1134 self.internal_rpc.clone(),
1135 killswitch,
1136 ),
1137 resolver,
1138 );
1139 Ok(())
1140 }
1141
1142 fn on_basic_get_empty_received(&self, method: protocol::basic::GetEmpty) -> Result<()> {
1143 match self
1144 .frames
1145 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
1146 {
1147 Some(Reply::BasicGetOk(resolver, ..)) => {
1148 resolver.resolve(None);
1149 Ok(())
1150 }
1151 _ => self.handle_invalid_contents(
1152 format!("unexpected basic get empty received on channel {}", self.id),
1153 method.get_amqp_class_id(),
1154 method.get_amqp_method_id(),
1155 ),
1156 }
1157 }
1158
1159 fn on_basic_consume_ok_received(
1160 &self,
1161 method: protocol::basic::ConsumeOk,
1162 resolver: PromiseResolver<Consumer>,
1163 channel_closer: Option<Arc<ChannelCloser>>,
1164 queue: ShortString,
1165 options: BasicConsumeOptions,
1166 arguments: FieldTable,
1167 original: Option<Consumer>,
1168 ) -> Result<()> {
1169 let consumer = original.unwrap_or_else(|| {
1170 Consumer::new(
1171 method.consumer_tag.clone(),
1172 self.executor.clone(),
1173 channel_closer,
1174 queue,
1175 options,
1176 arguments,
1177 )
1178 });
1179 self.consumers
1180 .register(method.consumer_tag, consumer.clone());
1181 resolver.resolve(consumer);
1182 Ok(())
1183 }
1184
1185 fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<()> {
1186 let class_id = method.get_amqp_class_id();
1187 let consumer_tag = method.consumer_tag.clone();
1188 let killswitch = self
1189 .status
1190 .set_will_receive(class_id, DeliveryCause::Consume(consumer_tag.clone()));
1191 self.consumers.start_delivery(&consumer_tag, |error| {
1192 Delivery::new(
1193 self.id,
1194 method.delivery_tag,
1195 method.exchange,
1196 method.routing_key,
1197 method.redelivered,
1198 Some(self.internal_rpc.clone()),
1199 Some(error),
1200 killswitch,
1201 )
1202 });
1203 Ok(())
1204 }
1205
1206 fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<()> {
1207 self.consumers.deregister(method.consumer_tag.as_str());
1208 if !method.nowait {
1209 let channel = self.clone();
1210 self.internal_rpc.register_internal_future(async move {
1211 channel.basic_cancel_ok(method.consumer_tag.as_str()).await
1212 });
1213 }
1214 Ok(())
1215 }
1216
1217 fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<()> {
1218 self.consumers.deregister(method.consumer_tag.as_str());
1219 Ok(())
1220 }
1221
1222 fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<()> {
1223 if self.status.confirm() {
1224 if method.multiple {
1225 if method.delivery_tag > 0 {
1226 self.acknowledgements
1227 .ack_all_before(method.delivery_tag)
1228 .or_else(|err| {
1229 self.acknowledgement_error(
1230 err,
1231 method.get_amqp_class_id(),
1232 method.get_amqp_method_id(),
1233 )
1234 })?;
1235 } else {
1236 self.acknowledgements.ack_all_pending();
1237 }
1238 } else {
1239 self.acknowledgements
1240 .ack(method.delivery_tag)
1241 .or_else(|err| {
1242 self.acknowledgement_error(
1243 err,
1244 method.get_amqp_class_id(),
1245 method.get_amqp_method_id(),
1246 )
1247 })?;
1248 }
1249 }
1250 Ok(())
1251 }
1252
1253 fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<()> {
1254 if self.status.confirm() {
1255 if method.multiple {
1256 if method.delivery_tag > 0 {
1257 self.acknowledgements
1258 .nack_all_before(method.delivery_tag)
1259 .or_else(|err| {
1260 self.acknowledgement_error(
1261 err,
1262 method.get_amqp_class_id(),
1263 method.get_amqp_method_id(),
1264 )
1265 })?;
1266 } else {
1267 self.acknowledgements.nack_all_pending();
1268 }
1269 } else {
1270 self.acknowledgements
1271 .nack(method.delivery_tag)
1272 .or_else(|err| {
1273 self.acknowledgement_error(
1274 err,
1275 method.get_amqp_class_id(),
1276 method.get_amqp_method_id(),
1277 )
1278 })?;
1279 }
1280 }
1281 Ok(())
1282 }
1283
1284 fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<()> {
1285 let class_id = method.get_amqp_class_id();
1286 let killswitch = self
1287 .status
1288 .set_will_receive(class_id, DeliveryCause::Return);
1289 self.returned_messages
1290 .start_new_delivery(BasicReturnMessage::new(
1291 method.exchange,
1292 method.routing_key,
1293 method.reply_code,
1294 method.reply_text,
1295 killswitch,
1296 ));
1297 Ok(())
1298 }
1299
1300 fn on_basic_recover_ok_received(&self) -> Result<()> {
1301 self.consumers.drop_prefetched_messages();
1302 Ok(())
1303 }
1304
1305 fn on_confirm_select_ok_received(&self) -> Result<()> {
1306 self.status.set_confirm();
1307 Ok(())
1308 }
1309
1310 fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<()> {
1311 Ok(())
1312 }
1313}
1314
1315#[cfg(feature = "codegen")]
1316include!(concat!(env!("OUT_DIR"), "/channel.rs"));
1317#[cfg(not(feature = "codegen"))]
1318include!("generated.rs");