lapin/
channel.rs

1use crate::{
2    BasicProperties, Configuration, Connection, ConnectionStatus, Error, ErrorKind, ExchangeKind,
3    Promise, PromiseResolver, Result,
4    acknowledgement::Acknowledgements,
5    auth::Credentials,
6    basic_get_delivery::BasicGetDelivery,
7    channel_closer::ChannelCloser,
8    channel_receiver_state::DeliveryCause,
9    channel_status::{ChannelState, ChannelStatus},
10    connection_closer::ConnectionCloser,
11    connection_status::{ConnectionState, ConnectionStep},
12    consumer::Consumer,
13    consumers::Consumers,
14    error_handler::ErrorHandler,
15    frames::{ExpectedReply, Frames},
16    internal_rpc::InternalRPCHandle,
17    message::{BasicGetMessage, BasicReturnMessage, Delivery},
18    protocol::{self, AMQPClass, AMQPError, AMQPHardError},
19    publisher_confirm::PublisherConfirm,
20    queue::Queue,
21    recovery_config::RecoveryConfig,
22    registry::Registry,
23    returned_messages::ReturnedMessages,
24    socket_state::SocketStateHandle,
25    topology::ChannelDefinition,
26    types::*,
27};
28use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
29use executor_trait::FullExecutor;
30use std::{convert::TryFrom, fmt, sync::Arc};
31use tracing::{Level, error, info, level_enabled, trace};
32
33/// Main entry point for most AMQP operations.
34///
35/// It serves as a lightweight connection and can be obtained from a
36///  [`Connection`] by calling [`Connection::create_channel`].
37///
38/// See also the RabbitMQ documentation on [channels](https://www.rabbitmq.com/channels.html).
39///
40/// [`Connection`]: ./struct.Connection.html
41/// [`Connection::create_channel`]: ./struct.Connection.html#method.create_channel
42#[derive(Clone)]
43pub struct Channel {
44    id: ChannelId,
45    configuration: Configuration,
46    status: ChannelStatus,
47    connection_status: ConnectionStatus,
48    local_registry: Registry,
49    acknowledgements: Acknowledgements,
50    consumers: Consumers,
51    basic_get_delivery: BasicGetDelivery,
52    returned_messages: ReturnedMessages,
53    waker: SocketStateHandle,
54    internal_rpc: InternalRPCHandle,
55    frames: Frames,
56    error_handler: ErrorHandler,
57    executor: Arc<dyn FullExecutor + Send + Sync>,
58    channel_closer: Option<Arc<ChannelCloser>>,
59    connection_closer: Option<Arc<ConnectionCloser>>,
60    recovery_config: RecoveryConfig,
61}
62
63impl PartialEq for Channel {
64    fn eq(&self, other: &Self) -> bool {
65        self.id == other.id
66    }
67}
68
69impl fmt::Debug for Channel {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("Channel")
72            .field("id", &self.id)
73            .field("configuration", &self.configuration)
74            .field("status", &self.status)
75            .field("connection_status", &self.connection_status)
76            .field("acknowledgements", &self.acknowledgements)
77            .field("consumers", &self.consumers)
78            .field("basic_get_delivery", &self.basic_get_delivery)
79            .field("returned_messages", &self.returned_messages)
80            .field("frames", &self.frames)
81            .finish()
82    }
83}
84
85impl Channel {
86    #[allow(clippy::too_many_arguments)]
87    pub(crate) fn new(
88        channel_id: ChannelId,
89        configuration: Configuration,
90        connection_status: ConnectionStatus,
91        waker: SocketStateHandle,
92        internal_rpc: InternalRPCHandle,
93        frames: Frames,
94        executor: Arc<dyn FullExecutor + Send + Sync>,
95        connection_closer: Option<Arc<ConnectionCloser>>,
96        recovery_config: RecoveryConfig,
97    ) -> Channel {
98        let returned_messages = ReturnedMessages::default();
99        let status = ChannelStatus::new(channel_id, internal_rpc.clone());
100        let channel_closer = if channel_id == 0 {
101            None
102        } else {
103            Some(Arc::new(ChannelCloser::new(
104                channel_id,
105                status.clone(),
106                internal_rpc.clone(),
107            )))
108        };
109        Self {
110            id: channel_id,
111            configuration,
112            status,
113            connection_status,
114            local_registry: Registry::default(),
115            acknowledgements: Acknowledgements::new(channel_id, returned_messages.clone()),
116            consumers: Consumers::default(),
117            basic_get_delivery: BasicGetDelivery::default(),
118            returned_messages,
119            waker,
120            internal_rpc,
121            frames,
122            error_handler: ErrorHandler::default(),
123            executor,
124            channel_closer,
125            connection_closer,
126            recovery_config,
127        }
128    }
129
130    pub fn status(&self) -> &ChannelStatus {
131        &self.status
132    }
133
134    pub fn on_error<E: FnMut(Error) + Send + 'static>(&self, handler: E) {
135        self.error_handler.set_handler(handler);
136    }
137
138    pub async fn wait_for_recovery(&self, error: Error) -> Result<()> {
139        if self.recovery_config.can_recover(&error) {
140            #[allow(deprecated)]
141            if let Some(notifier) = error.notifier() {
142                notifier.await;
143                return Ok(());
144            }
145        }
146        Err(error)
147    }
148
149    pub(crate) fn set_closing(&self, error: Option<Error>) {
150        self.set_state(ChannelState::Closing);
151        if let Some(error) = error {
152            self.error_publisher_confirms(error.clone());
153            self.error_consumers(error); // ignore the returned error here, only happens with default executor if we cannot spawn a thread
154        } else {
155            self.consumers.start_cancel();
156        }
157    }
158
159    pub(crate) fn set_closed(&self, error: Error) {
160        self.set_state(ChannelState::Closed);
161        self.error_publisher_confirms(error.clone());
162        self.cancel_consumers();
163        self.internal_rpc.remove_channel(self.id, error);
164    }
165
166    // Only called in case of a protocol failure
167    pub(crate) fn set_connection_error(&self, error: Error) {
168        self.set_state(ChannelState::Error);
169        self.error_publisher_confirms(error.clone());
170        self.error_consumers(error.clone());
171        self.internal_rpc.remove_channel(self.id, error.clone());
172    }
173
174    fn error_publisher_confirms(&self, error: Error) {
175        self.acknowledgements.on_channel_error(error);
176    }
177
178    fn cancel_consumers(&self) {
179        self.consumers.cancel();
180    }
181
182    fn error_consumers(&self, error: Error) {
183        let recover = self.recovery_config.can_recover(&error);
184        self.consumers.error(error, recover);
185    }
186
187    pub(crate) fn set_state(&self, state: ChannelState) {
188        self.status.set_state(state);
189    }
190
191    pub fn id(&self) -> ChannelId {
192        self.id
193    }
194
195    pub(crate) fn clone_internal(&self) -> Self {
196        Self {
197            id: self.id,
198            configuration: self.configuration.clone(),
199            status: self.status.clone(),
200            connection_status: self.connection_status.clone(),
201            local_registry: self.local_registry.clone(),
202            acknowledgements: self.acknowledgements.clone(),
203            consumers: self.consumers.clone(),
204            basic_get_delivery: self.basic_get_delivery.clone(),
205            returned_messages: self.returned_messages.clone(),
206            waker: self.waker.clone(),
207            internal_rpc: self.internal_rpc.clone(),
208            frames: self.frames.clone(),
209            error_handler: self.error_handler.clone(),
210            executor: self.executor.clone(),
211            channel_closer: None,
212            connection_closer: self.connection_closer.clone(),
213            recovery_config: self.recovery_config.clone(),
214        }
215    }
216
217    fn wake(&self) {
218        trace!(channel=%self.id, "wake");
219        self.waker.wake()
220    }
221
222    fn assert_channel0(&self, class_id: Identifier, method_id: Identifier) -> Result<()> {
223        if self.id == 0 {
224            Ok(())
225        } else {
226            error!(
227                channel=%self.id,
228                "Got a connection frame on, closing connection"
229            );
230            let error = AMQPError::new(
231                AMQPHardError::COMMANDINVALID.into(),
232                format!("connection frame received on channel {}", self.id).into(),
233            );
234            self.internal_rpc.close_connection(
235                error.get_id(),
236                error.get_message().to_string(),
237                class_id,
238                method_id,
239            );
240            Err(ErrorKind::ProtocolError(error).into())
241        }
242    }
243
244    pub async fn close(&self, reply_code: ReplyCode, reply_text: &str) -> Result<()> {
245        self.do_channel_close(reply_code, reply_text, 0, 0).await
246    }
247
248    pub async fn basic_consume(
249        &self,
250        queue: &str,
251        consumer_tag: &str,
252        options: BasicConsumeOptions,
253        arguments: FieldTable,
254    ) -> Result<Consumer> {
255        let consumer = self
256            .do_basic_consume(queue, consumer_tag, options, arguments, None)
257            .await?;
258        Ok(consumer.external(self.id, self.internal_rpc.clone()))
259    }
260
261    pub async fn basic_get(
262        &self,
263        queue: &str,
264        options: BasicGetOptions,
265    ) -> Result<Option<BasicGetMessage>> {
266        self.do_basic_get(queue, options, None).await
267    }
268
269    pub async fn exchange_declare(
270        &self,
271        exchange: &str,
272        kind: ExchangeKind,
273        options: ExchangeDeclareOptions,
274        arguments: FieldTable,
275    ) -> Result<()> {
276        self.do_exchange_declare(exchange, kind.kind(), options, arguments, kind.clone())
277            .await
278    }
279
280    pub async fn wait_for_confirms(&self) -> Result<Vec<BasicReturnMessage>> {
281        if let Some(last_pending) = self.acknowledgements.get_last_pending() {
282            trace!("Waiting for pending confirms");
283            last_pending.await?;
284        } else {
285            trace!("No confirms to wait for");
286        }
287        Ok(self.returned_messages.drain())
288    }
289
290    #[cfg(test)]
291    pub(crate) fn register_queue(
292        &self,
293        name: ShortString,
294        options: QueueDeclareOptions,
295        arguments: FieldTable,
296    ) {
297        self.local_registry.register_queue(name, options, arguments);
298    }
299
300    #[cfg(test)]
301    pub(crate) fn register_consumer(&self, tag: ShortString, consumer: Consumer) {
302        self.consumers.register(tag, consumer);
303    }
304
305    pub(crate) fn finalize_connection(&self) {
306        self.status.finalize_connection();
307    }
308
309    fn init_recovery_or_shutdown(&self, error: Option<Error>) -> Option<Error> {
310        match error {
311            Some(err) if self.recovery_config.can_recover_channel(&err) => {
312                Some(self.init_recovery_poison(err, false))
313            }
314            err => {
315                self.set_closing(err.clone());
316                err
317            }
318        }
319    }
320
321    pub(crate) fn init_recovery(&self, error: Error) -> Error {
322        self.init_recovery_poison(error, true)
323    }
324
325    fn init_recovery_poison(&self, error: Error, poison: bool) -> Error {
326        let err = self.status.set_reconnecting(error, self.topology());
327        self.frames.drop_frames_for_channel(self.id, err.clone());
328        if poison {
329            self.poison(err.clone());
330        }
331        err
332    }
333
334    fn poison(&self, error: Error) {
335        self.frames.poison_channel(self.id, error);
336    }
337
338    pub(crate) fn update_recovery(&self) -> Option<ChannelDefinition> {
339        self.status.update_recovery_context(|ctx| {
340            // Cleanup any pending expecting reply
341            ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
342            // Also reset the acknowledgements state for this channel
343            self.acknowledgements.reset(ctx.cause());
344            // Also drop frames poisoning
345            self.frames.drop_channel_poison(self.id);
346            ctx.topology()
347        })
348    }
349
350    pub(crate) async fn start_recovery(&self) -> Result<()> {
351        let topology = self.update_recovery().expect("No topology during recovery");
352
353        // First, reopen the channel
354        self.channel_open(self.clone()).await?;
355
356        // Then, reenable confirm_select if needed
357        if self.status.confirm() {
358            self.confirm_select(ConfirmSelectOptions::default()).await?;
359        }
360
361        // Third, redeclare all exchanges
362        for ex in &topology.exchanges {
363            if ex.is_declared {
364                self.exchange_declare(
365                    ex.name.as_str(),
366                    ex.kind.clone().unwrap_or_default(),
367                    ex.options.unwrap_or_default(),
368                    ex.arguments.clone().unwrap_or_default(),
369                )
370                .await?;
371            }
372        }
373
374        // Fourth, redeclare all exchange bindings
375        for ex in &topology.exchanges {
376            for binding in &ex.bindings {
377                self.exchange_bind(
378                    ex.name.as_str(),
379                    binding.source.as_str(),
380                    binding.routing_key.as_str(),
381                    ExchangeBindOptions::default(),
382                    binding.arguments.clone(),
383                )
384                .await?;
385            }
386        }
387
388        // Fifth, redeclare all queues
389        for queue in &topology.queues {
390            if queue.is_declared {
391                self.queue_declare(
392                    queue.name.as_str(),
393                    queue.options.unwrap_or_default(),
394                    queue.arguments.clone().unwrap_or_default(),
395                )
396                .await?;
397            }
398        }
399
400        // Sixth, redeclare all queues bindings
401        for queue in &topology.queues {
402            for binding in &queue.bindings {
403                self.queue_bind(
404                    queue.name.as_str(),
405                    binding.source.as_str(),
406                    binding.routing_key.as_str(),
407                    QueueBindOptions::default(),
408                    binding.arguments.clone(),
409                )
410                .await?;
411            }
412        }
413
414        // Finally, redeclare all consumers
415        for consumer in topology.consumers.iter().cloned() {
416            consumer.reset();
417            self.do_basic_consume(
418                consumer.queue().as_str(),
419                consumer.tag().as_str(),
420                consumer.options(),
421                consumer.arguments(),
422                Some(consumer),
423            )
424            .await?;
425        }
426
427        Ok(())
428    }
429
430    pub(crate) fn send_method_frame(
431        &self,
432        method: AMQPClass,
433        resolver: PromiseResolver<()>,
434        expected_reply: Option<ExpectedReply>,
435    ) {
436        self.send_frame(AMQPFrame::Method(self.id, method), resolver, expected_reply);
437    }
438
439    pub(crate) fn send_frame(
440        &self,
441        frame: AMQPFrame,
442        resolver: PromiseResolver<()>,
443        expected_reply: Option<ExpectedReply>,
444    ) {
445        trace!(channel=%self.id, "send_frame");
446        self.frames.push(self.id, frame, resolver, expected_reply);
447        self.wake();
448    }
449
450    async fn send_method_frame_with_body(
451        &self,
452        method: AMQPClass,
453        payload: &[u8],
454        properties: BasicProperties,
455        publisher_confirms_result: Option<PublisherConfirm>,
456    ) -> Result<PublisherConfirm> {
457        let class_id = method.get_amqp_class_id();
458        let header = AMQPContentHeader {
459            class_id,
460            body_size: payload.len() as PayloadSize,
461            properties,
462        };
463        let frame_max = self.configuration.frame_max();
464        let mut frames = vec![
465            AMQPFrame::Method(self.id, method),
466            AMQPFrame::Header(self.id, class_id, Box::new(header)),
467        ];
468
469        frames.extend(
470            payload
471                .chunks(frame_max as usize - 8 /* An empty body frame weighs 8 bytes of overhead that we cannot use for payload */)
472                .map(|chunk| AMQPFrame::Body(self.id, chunk.into())),
473        );
474
475        trace!(channel=%self.id, "send_frames");
476        let promise = self.frames.push_frames(self.id, frames);
477        self.wake();
478        promise.await?;
479        Ok(publisher_confirms_result
480            .unwrap_or_else(|| PublisherConfirm::not_requested(self.returned_messages.clone())))
481    }
482
483    fn handle_invalid_contents(
484        &self,
485        error: String,
486        class_id: Identifier,
487        method_id: Identifier,
488    ) -> Result<()> {
489        error!(%error);
490        let error = AMQPError::new(AMQPHardError::UNEXPECTEDFRAME.into(), error.into());
491        self.internal_rpc.close_connection(
492            error.get_id(),
493            error.get_message().to_string(),
494            class_id,
495            method_id,
496        );
497        Err(ErrorKind::ProtocolError(error).into())
498    }
499
500    pub(crate) fn handle_content_header_frame(
501        &self,
502        class_id: Identifier,
503        size: PayloadSize,
504        properties: BasicProperties,
505    ) -> Result<()> {
506        self.status.set_content_length(
507            self.id,
508            class_id,
509            size,
510            |delivery_cause, confirm_mode| match delivery_cause {
511                DeliveryCause::Consume(consumer_tag) => {
512                    self.consumers
513                        .handle_content_header_frame(consumer_tag, size, properties);
514                }
515                DeliveryCause::Get => {
516                    self.basic_get_delivery
517                        .handle_content_header_frame(size, properties);
518                }
519                DeliveryCause::Return => {
520                    self.returned_messages.handle_content_header_frame(
521                        size,
522                        properties,
523                        confirm_mode,
524                    );
525                }
526            },
527            |msg| {
528                error!(%msg);
529                let error = AMQPError::new(AMQPHardError::FRAMEERROR.into(), msg.into());
530                self.internal_rpc.close_connection(
531                    error.get_id(),
532                    error.get_message().to_string(),
533                    class_id,
534                    0,
535                );
536                let error: Error = ErrorKind::ProtocolError(error).into();
537                self.internal_rpc.set_connection_error(error.clone());
538                Err(error)
539            },
540            |msg| self.handle_invalid_contents(msg, class_id, 0),
541        )
542    }
543
544    pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<()> {
545        self.status.receive(
546            self.id,
547            payload.len() as PayloadSize,
548            |delivery_cause, remaining_size, confirm_mode| match delivery_cause {
549                DeliveryCause::Consume(consumer_tag) => {
550                    self.consumers
551                        .handle_body_frame(consumer_tag, remaining_size, payload);
552                }
553                DeliveryCause::Get => {
554                    self.basic_get_delivery
555                        .handle_body_frame(remaining_size, payload);
556                }
557                DeliveryCause::Return => {
558                    self.returned_messages
559                        .handle_body_frame(remaining_size, payload, confirm_mode);
560                }
561            },
562            |msg| self.handle_invalid_contents(msg, 0, 0),
563        )
564    }
565
566    pub(crate) fn topology(&self) -> ChannelDefinition {
567        ChannelDefinition {
568            exchanges: self.local_registry.exchanges_topology(),
569            queues: self.local_registry.queues_topology(),
570            consumers: self.consumers.topology(),
571        }
572    }
573
574    fn before_basic_publish(&self) -> Option<PublisherConfirm> {
575        if self.status.confirm() {
576            Some(self.acknowledgements.register_pending())
577        } else {
578            None
579        }
580    }
581
582    fn before_basic_cancel(&self, consumer_tag: &str) {
583        self.consumers.start_cancel_one(consumer_tag);
584    }
585
586    fn acknowledgement_error(
587        &self,
588        error: AMQPError,
589        class_id: Identifier,
590        method_id: Identifier,
591    ) -> Result<()> {
592        error!("Got a bad acknowledgement from server, closing channel");
593        let channel = self.clone();
594        let err = error.clone();
595        self.internal_rpc.register_internal_future(async move {
596            channel
597                .do_channel_close(
598                    error.get_id(),
599                    error.get_message().as_str(),
600                    class_id,
601                    method_id,
602                )
603                .await
604        });
605        Err(ErrorKind::ProtocolError(err).into())
606    }
607
608    fn before_connection_start_ok(
609        &self,
610        resolver: PromiseResolver<Connection>,
611        connection: Connection,
612        credentials: Credentials,
613    ) {
614        self.connection_status
615            .set_connection_step(ConnectionStep::StartOk(resolver, connection, credentials));
616    }
617
618    fn before_connection_open(&self, resolver: PromiseResolver<Connection>) {
619        self.connection_status
620            .set_connection_step(ConnectionStep::Open(resolver));
621    }
622
623    fn on_connection_close_ok_sent(&self, error: Error) {
624        self.internal_rpc.finish_connection_shutdown();
625        if !self.recovery_config.can_recover_connection(&error) {
626            if let ErrorKind::ProtocolError(_) = error.kind() {
627                self.internal_rpc.set_connection_error(error);
628            } else {
629                self.internal_rpc.set_connection_closed(error);
630            }
631        }
632    }
633
634    fn next_expected_close_ok_reply(&self) -> Option<Reply> {
635        self.frames.next_expected_close_ok_reply(
636            self.id,
637            ErrorKind::InvalidChannelState(
638                ChannelState::Closed,
639                "unexpected channel.close-ok received",
640            )
641            .into(),
642        )
643    }
644
645    fn before_channel_close(&self) {
646        self.set_closing(None);
647    }
648
649    fn on_channel_close_ok_sent(&self, error: Option<Error>) {
650        let recover = error
651            .as_ref()
652            .is_some_and(|err| self.recovery_config.can_recover_channel(err));
653        let err = error.clone().unwrap_or_else(|| {
654            ErrorKind::InvalidChannelState(ChannelState::Closing, "channel.close-ok sent").into()
655        });
656
657        self.poison(err.clone());
658        if !recover {
659            self.set_closed(err);
660            if let Some(error) = error {
661                self.error_handler.on_error(error);
662            }
663        }
664    }
665
666    fn on_basic_recover_async_sent(&self) {
667        self.consumers.drop_prefetched_messages();
668    }
669
670    fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
671        if multiple && delivery_tag == 0 {
672            self.consumers.drop_prefetched_messages();
673        }
674    }
675
676    fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
677        if multiple && delivery_tag == 0 {
678            self.consumers.drop_prefetched_messages();
679        }
680    }
681
682    fn tune_connection_configuration(
683        &self,
684        channel_max: ChannelId,
685        frame_max: FrameSize,
686        heartbeat: Heartbeat,
687    ) {
688        // If we disable the heartbeat (0) but the server don't, follow it and enable it too
689        // If both us and the server want heartbeat enabled, pick the lowest value.
690        if self.configuration.heartbeat() == 0
691            || (heartbeat != 0 && heartbeat < self.configuration.heartbeat())
692        {
693            self.configuration.set_heartbeat(heartbeat);
694        }
695
696        if channel_max != 0 {
697            // 0 means we want to take the server's value
698            // If both us and the server specified a channel_max, pick the lowest value.
699            if self.configuration.channel_max() == 0
700                || channel_max < self.configuration.channel_max()
701            {
702                self.configuration.set_channel_max(channel_max);
703            }
704        }
705        if self.configuration.channel_max() == 0 {
706            self.configuration.set_channel_max(ChannelId::MAX);
707        }
708
709        if frame_max != 0 {
710            // 0 means we want to take the server's value
711            // If both us and the server specified a frame_max, pick the lowest value.
712            if self.configuration.frame_max() == 0 || frame_max < self.configuration.frame_max() {
713                self.configuration.set_frame_max(frame_max);
714            }
715        }
716        if self.configuration.frame_max() == 0 {
717            self.configuration.set_frame_max(FrameSize::MAX);
718        }
719    }
720
721    fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<()> {
722        trace!(?method, "Server sent connection::Start");
723        let state = self.connection_status.state();
724        let step = self.connection_status.connection_step_name();
725        if let (
726            ConnectionState::Connecting,
727            Some(ConnectionStep::ProtocolHeader(
728                resolver,
729                connection,
730                credentials,
731                mechanism,
732                mut options,
733            )),
734        ) = (state, self.connection_status.connection_step())
735        {
736            let mechanism_str = mechanism.to_string();
737            let locale = options.locale.clone();
738
739            if !method
740                .mechanisms
741                .to_string()
742                .split_whitespace()
743                .any(|m| m == mechanism_str)
744            {
745                error!(%mechanism, "unsupported mechanism");
746            }
747            if !method
748                .locales
749                .to_string()
750                .split_whitespace()
751                .any(|l| l == locale)
752            {
753                error!(%locale, "unsupported locale");
754            }
755
756            if !options.client_properties.contains_key("product")
757                || !options.client_properties.contains_key("version")
758            {
759                options.client_properties.insert(
760                    "product".into(),
761                    AMQPValue::LongString(env!("CARGO_PKG_NAME").into()),
762                );
763                options.client_properties.insert(
764                    "version".into(),
765                    AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()),
766                );
767            }
768
769            options
770                .client_properties
771                .insert("platform".into(), AMQPValue::LongString("rust".into()));
772
773            let mut capabilities = FieldTable::default();
774            capabilities.insert("publisher_confirms".into(), true.into());
775            capabilities.insert("exchange_exchange_bindings".into(), true.into());
776            capabilities.insert("basic.nack".into(), true.into());
777            capabilities.insert("consumer_cancel_notify".into(), true.into());
778            capabilities.insert("connection.blocked".into(), true.into());
779            capabilities.insert("consumer_priorities".into(), true.into());
780            capabilities.insert("authentication_failure_close".into(), true.into());
781            capabilities.insert("per_consumer_qos".into(), true.into());
782            capabilities.insert("direct_reply_to".into(), true.into());
783
784            options
785                .client_properties
786                .insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
787
788            let channel = self.clone();
789            self.internal_rpc.register_internal_future(async move {
790                channel
791                    .connection_start_ok(
792                        options.client_properties,
793                        &mechanism_str,
794                        &credentials.sasl_auth_string(mechanism),
795                        &locale,
796                        resolver,
797                        connection,
798                        credentials,
799                    )
800                    .await
801            });
802            Ok(())
803        } else {
804            error!(?state, ?step, "Invalid state");
805            let error: Error = ErrorKind::InvalidConnectionState(state).into();
806            self.internal_rpc.set_connection_error(error.clone());
807            Err(error)
808        }
809    }
810
811    fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<()> {
812        trace!(?method, "Server sent connection::Secure");
813
814        let state = self.connection_status.state();
815        let step = self.connection_status.connection_step_name();
816        if let (ConnectionState::Connecting, Some(ConnectionStep::StartOk(.., credentials))) =
817            (state, self.connection_status.connection_step())
818        {
819            let channel = self.clone();
820            self.internal_rpc.register_internal_future(async move {
821                channel
822                    .connection_secure_ok(&credentials.rabbit_cr_demo_answer())
823                    .await
824            });
825            Ok(())
826        } else {
827            error!(?state, ?step, "Invalid state");
828            let error: Error = ErrorKind::InvalidConnectionState(state).into();
829            self.internal_rpc.set_connection_error(error.clone());
830            Err(error)
831        }
832    }
833
834    fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<()> {
835        trace!(?method, "Server sent Connection::Tune");
836
837        let state = self.connection_status.state();
838        let step = self.connection_status.connection_step_name();
839        if let (
840            ConnectionState::Connecting,
841            Some(ConnectionStep::StartOk(resolver, connection, _)),
842        ) = (state, self.connection_status.connection_step())
843        {
844            self.tune_connection_configuration(
845                method.channel_max,
846                method.frame_max,
847                method.heartbeat,
848            );
849
850            let channel = self.clone();
851            let configuration = self.configuration.clone();
852            let vhost = self.connection_status.vhost();
853            self.internal_rpc.register_internal_future(async move {
854                channel
855                    .connection_tune_ok(
856                        configuration.channel_max(),
857                        configuration.frame_max(),
858                        configuration.heartbeat(),
859                    )
860                    .await?;
861                channel
862                    .connection_open(&vhost, Box::new(connection), resolver)
863                    .await
864            });
865            Ok(())
866        } else {
867            error!(?state, ?step, "Invalid state");
868            let error: Error = ErrorKind::InvalidConnectionState(state).into();
869            self.internal_rpc.set_connection_error(error.clone());
870            Err(error)
871        }
872    }
873
874    fn on_connection_open_ok_received(
875        &self,
876        _: protocol::connection::OpenOk,
877        connection: Box<Connection>,
878    ) -> Result<()> {
879        let state = self.connection_status.state();
880        let step = self.connection_status.connection_step_name();
881        if let (ConnectionState::Connecting, Some(ConnectionStep::Open(resolver))) =
882            (state, self.connection_status.connection_step())
883        {
884            self.connection_status.set_state(ConnectionState::Connected);
885            resolver.resolve(*connection);
886            Ok(())
887        } else {
888            error!(?state, ?step, "Invalid state");
889            let error: Error = ErrorKind::InvalidConnectionState(state).into();
890            self.internal_rpc.set_connection_error(error.clone());
891            Err(error)
892        }
893    }
894
895    fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<()> {
896        let error: Error = AMQPError::try_from(method.clone())
897            .map(|error| {
898                error!(
899                    channel=%self.id,
900                    ?method,
901                    ?error,
902                    "Connection closed",
903                );
904                ErrorKind::ProtocolError(error).into()
905            })
906            .unwrap_or_else(|error| {
907                error!(%error);
908                info!(channel=%self.id, ?method, "Connection closed");
909                ErrorKind::InvalidConnectionState(ConnectionState::Closed).into()
910            });
911        if self.recovery_config.can_recover_connection(&error) {
912            self.internal_rpc.init_connection_recovery(error.clone());
913        } else {
914            self.internal_rpc.init_connection_shutdown(error.clone());
915        }
916        self.internal_rpc.send_connection_close_ok(error);
917        Ok(())
918    }
919
920    fn on_connection_blocked_received(&self, _method: protocol::connection::Blocked) -> Result<()> {
921        self.connection_status.block();
922        Ok(())
923    }
924
925    fn on_connection_unblocked_received(
926        &self,
927        _method: protocol::connection::Unblocked,
928    ) -> Result<()> {
929        self.connection_status.unblock();
930        self.wake();
931        Ok(())
932    }
933
934    fn on_connection_close_ok_received(&self) -> Result<()> {
935        self.internal_rpc.set_connection_closed(
936            ErrorKind::InvalidConnectionState(ConnectionState::Closed).into(),
937        );
938        Ok(())
939    }
940
941    fn on_channel_open_ok_received(
942        &self,
943        _method: protocol::channel::OpenOk,
944        resolver: PromiseResolver<Channel>,
945        channel: Channel,
946    ) -> Result<()> {
947        if !self.status.confirm() {
948            self.finalize_connection();
949        }
950        resolver.resolve(channel);
951        Ok(())
952    }
953
954    fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<()> {
955        self.status.set_send_flow(method.active);
956        let channel = self.clone();
957        self.internal_rpc.register_internal_future(async move {
958            channel
959                .channel_flow_ok(ChannelFlowOkOptions {
960                    active: method.active,
961                })
962                .await
963        });
964        Ok(())
965    }
966
967    fn on_channel_flow_ok_received(
968        &self,
969        method: protocol::channel::FlowOk,
970        resolver: PromiseResolver<Boolean>,
971    ) -> Result<()> {
972        // Nothing to do here, the server just confirmed that we paused/resumed the receiving flow
973        resolver.resolve(method.active);
974        Ok(())
975    }
976
977    fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<()> {
978        let error = self.init_recovery_or_shutdown(AMQPError::try_from(method.clone()).map(|error| {
979                error!(
980                    channel=%self.id, ?method, ?error,
981                    "Channel closed"
982                );
983                Error::from(ErrorKind::ProtocolError(error))
984            }).map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok());
985        let recover = error
986            .as_ref()
987            .is_some_and(|err| self.recovery_config.can_recover_channel(err));
988        let channel = self.clone();
989        self.internal_rpc.register_internal_future(async move {
990            channel.channel_close_ok(error).await?;
991            if recover {
992                channel.start_recovery().await?;
993            }
994            Ok(())
995        });
996        Ok(())
997    }
998
999    fn on_channel_close_ok_received(&self) -> Result<()> {
1000        self.set_closed(
1001            ErrorKind::InvalidChannelState(ChannelState::Closed, "channel.close-ok received")
1002                .into(),
1003        );
1004        Ok(())
1005    }
1006
1007    fn on_exchange_bind_ok_received(
1008        &self,
1009        destination: ShortString,
1010        source: ShortString,
1011        routing_key: ShortString,
1012        arguments: FieldTable,
1013    ) -> Result<()> {
1014        self.local_registry
1015            .register_exchange_binding(destination, source, routing_key, arguments);
1016        Ok(())
1017    }
1018
1019    fn on_exchange_unbind_ok_received(
1020        &self,
1021        destination: ShortString,
1022        source: ShortString,
1023        routing_key: ShortString,
1024        arguments: FieldTable,
1025    ) -> Result<()> {
1026        self.local_registry.deregister_exchange_binding(
1027            destination.as_str(),
1028            source.as_str(),
1029            routing_key.as_str(),
1030            &arguments,
1031        );
1032        Ok(())
1033    }
1034
1035    fn on_exchange_declare_ok_received(
1036        &self,
1037        resolver: PromiseResolver<()>,
1038        exchange: ShortString,
1039        kind: ExchangeKind,
1040        options: ExchangeDeclareOptions,
1041        arguments: FieldTable,
1042    ) -> Result<()> {
1043        self.local_registry
1044            .register_exchange(exchange, kind, options, arguments);
1045        resolver.resolve(());
1046        Ok(())
1047    }
1048
1049    fn on_exchange_delete_ok_received(&self, exchange: ShortString) -> Result<()> {
1050        self.local_registry.deregister_exchange(exchange.as_str());
1051        Ok(())
1052    }
1053
1054    fn on_queue_delete_ok_received(
1055        &self,
1056        method: protocol::queue::DeleteOk,
1057        resolver: PromiseResolver<MessageCount>,
1058        queue: ShortString,
1059    ) -> Result<()> {
1060        self.local_registry.deregister_queue(queue.as_str());
1061        resolver.resolve(method.message_count);
1062        Ok(())
1063    }
1064
1065    fn on_queue_purge_ok_received(
1066        &self,
1067        method: protocol::queue::PurgeOk,
1068        resolver: PromiseResolver<MessageCount>,
1069    ) -> Result<()> {
1070        resolver.resolve(method.message_count);
1071        Ok(())
1072    }
1073
1074    fn on_queue_declare_ok_received(
1075        &self,
1076        method: protocol::queue::DeclareOk,
1077        resolver: PromiseResolver<Queue>,
1078        options: QueueDeclareOptions,
1079        arguments: FieldTable,
1080    ) -> Result<()> {
1081        self.local_registry
1082            .register_queue(method.queue.clone(), options, arguments);
1083        resolver.resolve(Queue::new(
1084            method.queue,
1085            method.message_count,
1086            method.consumer_count,
1087        ));
1088        Ok(())
1089    }
1090
1091    fn on_queue_bind_ok_received(
1092        &self,
1093        queue: ShortString,
1094        exchange: ShortString,
1095        routing_key: ShortString,
1096        arguments: FieldTable,
1097    ) -> Result<()> {
1098        self.local_registry
1099            .register_queue_binding(queue, exchange, routing_key, arguments);
1100        Ok(())
1101    }
1102
1103    fn on_queue_unbind_ok_received(
1104        &self,
1105        queue: ShortString,
1106        exchange: ShortString,
1107        routing_key: ShortString,
1108        arguments: FieldTable,
1109    ) -> Result<()> {
1110        self.local_registry.deregister_queue_binding(
1111            queue.as_str(),
1112            exchange.as_str(),
1113            routing_key.as_str(),
1114            &arguments,
1115        );
1116        Ok(())
1117    }
1118
1119    fn on_basic_get_ok_received(
1120        &self,
1121        method: protocol::basic::GetOk,
1122        resolver: PromiseResolver<Option<BasicGetMessage>>,
1123    ) -> Result<()> {
1124        let class_id = method.get_amqp_class_id();
1125        let killswitch = self.status.set_will_receive(class_id, DeliveryCause::Get);
1126        self.basic_get_delivery.start_new_delivery(
1127            BasicGetMessage::new(
1128                self.id,
1129                method.delivery_tag,
1130                method.exchange,
1131                method.routing_key,
1132                method.redelivered,
1133                method.message_count,
1134                self.internal_rpc.clone(),
1135                killswitch,
1136            ),
1137            resolver,
1138        );
1139        Ok(())
1140    }
1141
1142    fn on_basic_get_empty_received(&self, method: protocol::basic::GetEmpty) -> Result<()> {
1143        match self
1144            .frames
1145            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
1146        {
1147            Some(Reply::BasicGetOk(resolver, ..)) => {
1148                resolver.resolve(None);
1149                Ok(())
1150            }
1151            _ => self.handle_invalid_contents(
1152                format!("unexpected basic get empty received on channel {}", self.id),
1153                method.get_amqp_class_id(),
1154                method.get_amqp_method_id(),
1155            ),
1156        }
1157    }
1158
1159    fn on_basic_consume_ok_received(
1160        &self,
1161        method: protocol::basic::ConsumeOk,
1162        resolver: PromiseResolver<Consumer>,
1163        channel_closer: Option<Arc<ChannelCloser>>,
1164        queue: ShortString,
1165        options: BasicConsumeOptions,
1166        arguments: FieldTable,
1167        original: Option<Consumer>,
1168    ) -> Result<()> {
1169        let consumer = original.unwrap_or_else(|| {
1170            Consumer::new(
1171                method.consumer_tag.clone(),
1172                self.executor.clone(),
1173                channel_closer,
1174                queue,
1175                options,
1176                arguments,
1177            )
1178        });
1179        self.consumers
1180            .register(method.consumer_tag, consumer.clone());
1181        resolver.resolve(consumer);
1182        Ok(())
1183    }
1184
1185    fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<()> {
1186        let class_id = method.get_amqp_class_id();
1187        let consumer_tag = method.consumer_tag.clone();
1188        let killswitch = self
1189            .status
1190            .set_will_receive(class_id, DeliveryCause::Consume(consumer_tag.clone()));
1191        self.consumers.start_delivery(&consumer_tag, |error| {
1192            Delivery::new(
1193                self.id,
1194                method.delivery_tag,
1195                method.exchange,
1196                method.routing_key,
1197                method.redelivered,
1198                Some(self.internal_rpc.clone()),
1199                Some(error),
1200                killswitch,
1201            )
1202        });
1203        Ok(())
1204    }
1205
1206    fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<()> {
1207        self.consumers.deregister(method.consumer_tag.as_str());
1208        if !method.nowait {
1209            let channel = self.clone();
1210            self.internal_rpc.register_internal_future(async move {
1211                channel.basic_cancel_ok(method.consumer_tag.as_str()).await
1212            });
1213        }
1214        Ok(())
1215    }
1216
1217    fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<()> {
1218        self.consumers.deregister(method.consumer_tag.as_str());
1219        Ok(())
1220    }
1221
1222    fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<()> {
1223        if self.status.confirm() {
1224            if method.multiple {
1225                if method.delivery_tag > 0 {
1226                    self.acknowledgements
1227                        .ack_all_before(method.delivery_tag)
1228                        .or_else(|err| {
1229                            self.acknowledgement_error(
1230                                err,
1231                                method.get_amqp_class_id(),
1232                                method.get_amqp_method_id(),
1233                            )
1234                        })?;
1235                } else {
1236                    self.acknowledgements.ack_all_pending();
1237                }
1238            } else {
1239                self.acknowledgements
1240                    .ack(method.delivery_tag)
1241                    .or_else(|err| {
1242                        self.acknowledgement_error(
1243                            err,
1244                            method.get_amqp_class_id(),
1245                            method.get_amqp_method_id(),
1246                        )
1247                    })?;
1248            }
1249        }
1250        Ok(())
1251    }
1252
1253    fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<()> {
1254        if self.status.confirm() {
1255            if method.multiple {
1256                if method.delivery_tag > 0 {
1257                    self.acknowledgements
1258                        .nack_all_before(method.delivery_tag)
1259                        .or_else(|err| {
1260                            self.acknowledgement_error(
1261                                err,
1262                                method.get_amqp_class_id(),
1263                                method.get_amqp_method_id(),
1264                            )
1265                        })?;
1266                } else {
1267                    self.acknowledgements.nack_all_pending();
1268                }
1269            } else {
1270                self.acknowledgements
1271                    .nack(method.delivery_tag)
1272                    .or_else(|err| {
1273                        self.acknowledgement_error(
1274                            err,
1275                            method.get_amqp_class_id(),
1276                            method.get_amqp_method_id(),
1277                        )
1278                    })?;
1279            }
1280        }
1281        Ok(())
1282    }
1283
1284    fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<()> {
1285        let class_id = method.get_amqp_class_id();
1286        let killswitch = self
1287            .status
1288            .set_will_receive(class_id, DeliveryCause::Return);
1289        self.returned_messages
1290            .start_new_delivery(BasicReturnMessage::new(
1291                method.exchange,
1292                method.routing_key,
1293                method.reply_code,
1294                method.reply_text,
1295                killswitch,
1296            ));
1297        Ok(())
1298    }
1299
1300    fn on_basic_recover_ok_received(&self) -> Result<()> {
1301        self.consumers.drop_prefetched_messages();
1302        Ok(())
1303    }
1304
1305    fn on_confirm_select_ok_received(&self) -> Result<()> {
1306        self.status.set_confirm();
1307        Ok(())
1308    }
1309
1310    fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<()> {
1311        Ok(())
1312    }
1313}
1314
1315#[cfg(feature = "codegen")]
1316include!(concat!(env!("OUT_DIR"), "/channel.rs"));
1317#[cfg(not(feature = "codegen"))]
1318include!("generated.rs");