Skip to main content

lapin/
channel.rs

1use crate::{
2    BasicProperties, ChannelState, ChannelStatus, Connection, ConnectionState, ConnectionStatus,
3    Error, ErrorKind, ExchangeKind, Promise, PromiseResolver, Result,
4    acknowledgement::Acknowledgements,
5    auth::AuthProvider,
6    basic_get_delivery::BasicGetDelivery,
7    channel_closer::ChannelCloser,
8    channel_receiver_state::DeliveryCause,
9    configuration::{NegotiatedConfig, RecoveryConfig},
10    connection_closer::ConnectionCloser,
11    connection_status::ConnectionStep,
12    consumer::Consumer,
13    consumers::Consumers,
14    events::EventsSender,
15    frames::{ExpectedReply, Frames},
16    internal_rpc::InternalRPCHandle,
17    message::{BasicGetMessage, BasicReturnMessage, Delivery},
18    promise::Cancelable,
19    protocol::{self, AMQPClass, AMQPError, AMQPHardError},
20    publisher_confirm::PublisherConfirm,
21    queue::Queue,
22    registry::Registry,
23    returned_messages::ReturnedMessages,
24    socket_state::SocketStateHandle,
25    topology::ChannelDefinition,
26    types::*,
27};
28use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
29use std::{convert::TryFrom, fmt, sync::Arc};
30use tracing::{error, info, trace};
31
32/// Main entry point for most AMQP operations.
33///
34/// It serves as a lightweight connection and can be obtained from a
35///  [`Connection`] by calling [`Connection::create_channel`].
36///
37/// See also the RabbitMQ documentation on [channels](https://www.rabbitmq.com/channels.html).
38///
39/// [`Connection`]: ./struct.Connection.html
40/// [`Connection::create_channel`]: ./struct.Connection.html#method.create_channel
41#[derive(Clone)]
42pub struct Channel {
43    id: ChannelId,
44    configuration: NegotiatedConfig,
45    status: ChannelStatus,
46    connection_status: ConnectionStatus,
47    local_registry: Registry,
48    acknowledgements: Acknowledgements,
49    consumers: Consumers,
50    basic_get_delivery: BasicGetDelivery,
51    returned_messages: ReturnedMessages,
52    waker: SocketStateHandle,
53    internal_rpc: InternalRPCHandle,
54    frames: Frames,
55    events_sender: EventsSender,
56    channel_closer: Option<Arc<ChannelCloser>>,
57    _connection_closer: Option<Arc<ConnectionCloser>>,
58    recovery_config: RecoveryConfig,
59}
60
61impl PartialEq for Channel {
62    fn eq(&self, other: &Self) -> bool {
63        self.id == other.id
64    }
65}
66
67impl fmt::Debug for Channel {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("Channel")
70            .field("id", &self.id)
71            .field("configuration", &self.configuration)
72            .field("status", &self.status)
73            .field("connection_status", &self.connection_status)
74            .field("acknowledgements", &self.acknowledgements)
75            .field("consumers", &self.consumers)
76            .field("basic_get_delivery", &self.basic_get_delivery)
77            .field("returned_messages", &self.returned_messages)
78            .field("frames", &self.frames)
79            .finish()
80    }
81}
82
83impl Channel {
84    #[allow(clippy::too_many_arguments)]
85    pub(crate) fn new(
86        channel_id: ChannelId,
87        configuration: NegotiatedConfig,
88        connection_status: ConnectionStatus,
89        waker: SocketStateHandle,
90        internal_rpc: InternalRPCHandle,
91        frames: Frames,
92        connection_closer: Option<Arc<ConnectionCloser>>,
93        recovery_config: RecoveryConfig,
94        events_sender: EventsSender,
95    ) -> Channel {
96        let returned_messages = ReturnedMessages::default();
97        let status = ChannelStatus::new(channel_id, internal_rpc.clone());
98        let channel_closer = if channel_id == 0 {
99            None
100        } else {
101            Some(Arc::new(ChannelCloser::new(
102                channel_id,
103                status.clone(),
104                internal_rpc.clone(),
105            )))
106        };
107        Self {
108            id: channel_id,
109            configuration,
110            status,
111            connection_status,
112            local_registry: Registry::default(),
113            acknowledgements: Acknowledgements::new(channel_id, returned_messages.clone()),
114            consumers: Consumers::default(),
115            basic_get_delivery: BasicGetDelivery::default(),
116            returned_messages,
117            waker,
118            internal_rpc,
119            frames,
120            events_sender,
121            channel_closer,
122            _connection_closer: connection_closer,
123            recovery_config,
124        }
125    }
126
127    pub fn status(&self) -> &ChannelStatus {
128        &self.status
129    }
130
131    pub async fn wait_for_recovery(&self, error: Error) -> Result<()> {
132        if self.recovery_config.can_recover(&error)
133            && let Some(notifier) = error.notifier()
134        {
135            notifier.await;
136            return Ok(());
137        }
138        Err(error)
139    }
140
141    pub(crate) fn set_closing(&self, error: Option<Error>) {
142        self.set_state(ChannelState::Closing);
143        if let Some(error) = error {
144            self.error_publisher_confirms(error.clone());
145            self.error_consumers(error); // ignore the returned error here, only happens with default executor if we cannot spawn a thread
146        } else {
147            self.consumers.start_cancel();
148        }
149    }
150
151    pub(crate) fn set_closed(&self, error: Error) {
152        self.set_state(ChannelState::Closed);
153        self.error_publisher_confirms(error.clone());
154        self.cancel_consumers();
155        self.internal_rpc.remove_channel(self.id, error);
156    }
157
158    // Only called in case of a protocol failure
159    pub(crate) fn set_connection_error(&self, error: Error) {
160        self.set_state(ChannelState::Error);
161        self.error_publisher_confirms(error.clone());
162        self.error_consumers(error.clone());
163        self.internal_rpc.remove_channel(self.id, error.clone());
164    }
165
166    fn error_publisher_confirms(&self, error: Error) {
167        self.acknowledgements.on_channel_error(error);
168    }
169
170    fn cancel_consumers(&self) {
171        self.consumers.cancel();
172    }
173
174    fn error_consumers(&self, error: Error) {
175        let recover = self.recovery_config.can_recover(&error);
176        self.consumers.error(error, recover);
177    }
178
179    pub(crate) fn set_state(&self, state: ChannelState) {
180        self.status.set_state(state);
181    }
182
183    pub fn id(&self) -> ChannelId {
184        self.id
185    }
186
187    pub(crate) fn clone_internal(&self) -> Self {
188        let mut this = self.clone();
189        this.channel_closer = None;
190        this
191    }
192
193    fn wake(&self) {
194        trace!(channel=%self.id, "wake");
195        self.waker.wake()
196    }
197
198    fn assert_channel0(&self, class_id: Identifier, method_id: Identifier) -> Result<()> {
199        if self.id == 0 {
200            Ok(())
201        } else {
202            error!(
203                channel=%self.id,
204                "Got a connection frame on, closing connection"
205            );
206            let error = AMQPError::new(
207                AMQPHardError::COMMANDINVALID.into(),
208                format!("connection frame received on channel {}", self.id).into(),
209            );
210            self.internal_rpc.close_connection(
211                error.get_id(),
212                error.get_message().clone(),
213                class_id,
214                method_id,
215            );
216            Err(ErrorKind::ProtocolError(error).into())
217        }
218    }
219
220    pub async fn close(&self, reply_code: ReplyCode, reply_text: ShortString) -> Result<()> {
221        self.do_channel_close(reply_code, reply_text, 0, 0).await
222    }
223
224    pub async fn basic_consume(
225        &self,
226        queue: ShortString,
227        consumer_tag: ShortString,
228        options: BasicConsumeOptions,
229        arguments: FieldTable,
230    ) -> Result<Consumer> {
231        let consumer = self
232            .do_basic_consume(queue, consumer_tag, options, arguments, None)
233            .await?;
234        Ok(consumer.external(self.id))
235    }
236
237    pub async fn basic_get(
238        &self,
239        queue: ShortString,
240        options: BasicGetOptions,
241    ) -> Result<Option<BasicGetMessage>> {
242        self.do_basic_get(queue, options, None).await
243    }
244
245    pub async fn exchange_declare(
246        &self,
247        exchange: ShortString,
248        kind: ExchangeKind,
249        options: ExchangeDeclareOptions,
250        arguments: FieldTable,
251    ) -> Result<()> {
252        self.do_exchange_declare(
253            exchange,
254            kind.kind().into(),
255            options,
256            arguments,
257            kind.clone(),
258        )
259        .await
260    }
261
262    pub async fn wait_for_confirms(&self) -> Result<Vec<BasicReturnMessage>> {
263        if let Some(last_pending) = self.acknowledgements.get_last_pending() {
264            trace!("Waiting for pending confirms");
265            last_pending.await?;
266        } else {
267            trace!("No confirms to wait for");
268        }
269        Ok(self.returned_messages.drain())
270    }
271
272    #[cfg(test)]
273    pub(crate) fn register_queue(
274        &self,
275        name: ShortString,
276        options: QueueDeclareOptions,
277        arguments: FieldTable,
278    ) {
279        self.local_registry.register_queue(name, options, arguments);
280    }
281
282    #[cfg(test)]
283    pub(crate) fn register_consumer(&self, tag: ShortString, consumer: Consumer) {
284        self.consumers.register(tag, consumer);
285    }
286
287    pub(crate) fn finalize_connection(&self) {
288        self.status.finalize_connection();
289    }
290
291    pub(crate) fn init_recovery(&self, error: Error) -> Error {
292        let err = self.status.set_reconnecting(error, self.topology());
293        self.frames.drop_frames_for_channel(self.id, err.clone());
294        self.poison(err.clone());
295        err
296    }
297
298    fn poison(&self, error: Error) {
299        self.frames.poison_channel(self.id, error);
300    }
301
302    pub(crate) fn update_recovery(&self) -> Option<ChannelDefinition> {
303        self.status.update_recovery_context(|ctx| {
304            // Cleanup any pending expecting reply
305            ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
306            // Also reset the acknowledgements state for this channel
307            self.acknowledgements.reset(ctx.cause());
308            // Also drop frames poisoning
309            self.frames.drop_channel_poison(self.id);
310            ctx.topology()
311        })
312    }
313
314    pub(crate) async fn start_recovery(&self) -> Result<()> {
315        let topology = self.update_recovery().expect("No topology during recovery");
316
317        // First, reopen the channel
318        self.channel_open(self.clone()).await?;
319
320        // Then, reenable confirm_select if needed
321        if self.status.confirm() {
322            self.confirm_select(ConfirmSelectOptions::default()).await?;
323        }
324
325        // Third, redeclare all exchanges
326        for ex in &topology.exchanges {
327            if ex.is_declared {
328                self.exchange_declare(
329                    ex.name.clone(),
330                    ex.kind.clone().unwrap_or_default(),
331                    ex.options.unwrap_or_default(),
332                    ex.arguments.clone().unwrap_or_default(),
333                )
334                .await?;
335            }
336        }
337
338        // Fourth, redeclare all exchange bindings
339        for ex in &topology.exchanges {
340            for binding in &ex.bindings {
341                self.exchange_bind(
342                    ex.name.clone(),
343                    binding.source.clone(),
344                    binding.routing_key.clone(),
345                    ExchangeBindOptions::default(),
346                    binding.arguments.clone(),
347                )
348                .await?;
349            }
350        }
351
352        // Fifth, redeclare all queues
353        for queue in &topology.queues {
354            if queue.is_declared {
355                self.queue_declare(
356                    queue.name.clone(),
357                    queue.options.unwrap_or_default(),
358                    queue.arguments.clone().unwrap_or_default(),
359                )
360                .await?;
361            }
362        }
363
364        // Sixth, redeclare all queues bindings
365        for queue in &topology.queues {
366            for binding in &queue.bindings {
367                self.queue_bind(
368                    queue.name.clone(),
369                    binding.source.clone(),
370                    binding.routing_key.clone(),
371                    QueueBindOptions::default(),
372                    binding.arguments.clone(),
373                )
374                .await?;
375            }
376        }
377
378        // Finally, redeclare all consumers
379        for consumer in topology.consumers.iter().cloned() {
380            consumer.reset();
381            self.do_basic_consume(
382                consumer.queue().clone(),
383                consumer.tag().clone(),
384                consumer.options(),
385                consumer.arguments(),
386                Some(consumer),
387            )
388            .await?;
389        }
390
391        Ok(())
392    }
393
394    pub(crate) fn send_method_frame(
395        &self,
396        method: AMQPClass,
397        canceler: Box<dyn Cancelable + Send + Sync>,
398        expected_reply: Option<ExpectedReply>,
399        resolver: Option<PromiseResolver<()>>,
400    ) {
401        self.send_frame(
402            AMQPFrame::Method(self.id, method),
403            canceler,
404            expected_reply,
405            resolver,
406        );
407    }
408
409    pub(crate) fn send_frame(
410        &self,
411        frame: AMQPFrame,
412        canceler: Box<dyn Cancelable + Send + Sync>,
413        expected_reply: Option<ExpectedReply>,
414        resolver: Option<PromiseResolver<()>>,
415    ) {
416        trace!(channel=%self.id, "send_frame");
417        self.frames
418            .push(self.id, frame, canceler, expected_reply, resolver);
419        self.wake();
420    }
421
422    async fn send_method_frame_with_body(
423        &self,
424        ctx: &'static str,
425        method: AMQPClass,
426        payload: &[u8],
427        properties: BasicProperties,
428        publisher_confirms_result: Option<PublisherConfirm>,
429    ) -> Result<PublisherConfirm> {
430        let class_id = method.get_amqp_class_id();
431        let header = AMQPContentHeader {
432            class_id,
433            body_size: payload.len() as PayloadSize,
434            properties,
435        };
436        let frame_max = self.configuration.frame_max();
437        let mut frames = vec![
438            AMQPFrame::Method(self.id, method),
439            AMQPFrame::Header(self.id, header),
440        ];
441
442        frames.extend(
443            payload
444                .chunks(frame_max as usize - 8 /* An empty body frame weighs 8 bytes of overhead that we cannot use for payload */)
445                .map(|chunk| AMQPFrame::Body(self.id, chunk.into())),
446        );
447
448        trace!(channel=%self.id, "send_frames");
449        let (promise, resolver) = Promise::new(ctx);
450        self.frames.push_frames(self.id, frames, resolver);
451        self.wake();
452        promise.await?;
453        Ok(publisher_confirms_result
454            .unwrap_or_else(|| PublisherConfirm::not_requested(self.returned_messages.clone())))
455    }
456
457    pub(crate) fn report_protocol_violation(
458        &self,
459        error: AMQPError,
460        class_id: Identifier,
461        method_id: Identifier,
462    ) -> Result<()> {
463        error!(%error);
464        self.internal_rpc.close_connection(
465            error.get_id(),
466            error.get_message().clone(),
467            class_id,
468            method_id,
469        );
470        let error = Error::from(ErrorKind::ProtocolError(error));
471        self.internal_rpc.set_connection_error(error.clone());
472        Err(error)
473    }
474
475    fn handle_invalid_contents(
476        &self,
477        error: String,
478        class_id: Identifier,
479        method_id: Identifier,
480    ) -> Result<()> {
481        error!(%error);
482        let error = AMQPError::new(AMQPHardError::UNEXPECTEDFRAME.into(), error.into());
483        self.report_protocol_violation(error, class_id, method_id)
484    }
485
486    pub(crate) fn handle_content_header_frame(
487        &self,
488        class_id: Identifier,
489        size: PayloadSize,
490        properties: BasicProperties,
491    ) -> Result<()> {
492        self.status.set_content_length(
493            self.id,
494            class_id,
495            size,
496            |delivery_cause, confirm_mode| match delivery_cause {
497                DeliveryCause::Consume(consumer_tag) => {
498                    self.consumers
499                        .handle_content_header_frame(consumer_tag, size, properties);
500                }
501                DeliveryCause::Get => {
502                    self.basic_get_delivery
503                        .handle_content_header_frame(size, properties);
504                }
505                DeliveryCause::Return => {
506                    self.returned_messages.handle_content_header_frame(
507                        size,
508                        properties,
509                        confirm_mode,
510                    );
511                }
512            },
513            |msg| {
514                let error = AMQPError::new(AMQPHardError::FRAMEERROR.into(), msg.into());
515                self.report_protocol_violation(error, class_id, 0)
516            },
517            |msg| self.handle_invalid_contents(msg, class_id, 0),
518        )
519    }
520
521    pub(crate) fn handle_body_frame(&self, payload: Vec<u8>) -> Result<()> {
522        self.status.receive(
523            self.id,
524            payload.len() as PayloadSize,
525            |delivery_cause, remaining_size, confirm_mode| match delivery_cause {
526                DeliveryCause::Consume(consumer_tag) => {
527                    self.consumers
528                        .handle_body_frame(consumer_tag, remaining_size, payload);
529                }
530                DeliveryCause::Get => {
531                    self.basic_get_delivery
532                        .handle_body_frame(remaining_size, payload);
533                }
534                DeliveryCause::Return => {
535                    self.returned_messages
536                        .handle_body_frame(remaining_size, payload, confirm_mode);
537                }
538            },
539            |msg| self.handle_invalid_contents(msg, 0, 0),
540        )
541    }
542
543    pub(crate) fn topology(&self) -> ChannelDefinition {
544        ChannelDefinition {
545            exchanges: self.local_registry.exchanges_topology(),
546            queues: self.local_registry.queues_topology(),
547            consumers: self.consumers.topology(),
548        }
549    }
550
551    fn before_basic_publish(&self) -> Option<PublisherConfirm> {
552        if self.status.confirm() {
553            Some(self.acknowledgements.register_pending())
554        } else {
555            None
556        }
557    }
558
559    fn before_basic_cancel(&self, consumer_tag: &str) {
560        self.consumers.start_cancel_one(consumer_tag);
561    }
562
563    fn acknowledgement_error(
564        &self,
565        error: AMQPError,
566        class_id: Identifier,
567        method_id: Identifier,
568    ) -> Result<()> {
569        error!("Got a bad acknowledgement from server, closing channel");
570        let channel = self.clone();
571        let err = error.clone();
572        self.internal_rpc.spawn(async move {
573            channel
574                .do_channel_close(
575                    error.get_id(),
576                    error.get_message().clone(),
577                    class_id,
578                    method_id,
579                )
580                .await
581        });
582        Err(ErrorKind::ProtocolError(err).into())
583    }
584
585    fn before_connection_start_ok(
586        &self,
587        resolver: PromiseResolver<Connection>,
588        connection: Connection,
589        auth_provider: Arc<dyn AuthProvider>,
590    ) {
591        self.connection_status
592            .set_connection_step(ConnectionStep::StartOk(resolver, connection, auth_provider));
593    }
594
595    fn before_connection_secure_ok(
596        &self,
597        resolver: PromiseResolver<Connection>,
598        connection: Connection,
599        auth_provider: Arc<dyn AuthProvider>,
600    ) {
601        self.connection_status
602            .set_connection_step(ConnectionStep::SecureOk(
603                resolver,
604                connection,
605                auth_provider,
606            ));
607    }
608
609    fn before_connection_open(&self, resolver: PromiseResolver<Connection>) {
610        self.connection_status
611            .set_connection_step(ConnectionStep::Open(resolver));
612    }
613
614    fn on_connection_close_ok_sent(&self, error: Error) {
615        self.internal_rpc.finish_connection_shutdown();
616        if !self.recovery_config.can_recover(&error) {
617            if let ErrorKind::ProtocolError(_) = error.kind() {
618                self.internal_rpc.set_connection_error(error);
619            } else {
620                self.internal_rpc.set_connection_closed(error);
621            }
622        }
623    }
624
625    fn next_expected_close_ok_reply(&self) -> Option<Reply> {
626        self.frames.next_expected_close_ok_reply(
627            self.id,
628            ErrorKind::InvalidChannelState(
629                ChannelState::Closed,
630                "unexpected channel.close-ok received",
631            )
632            .into(),
633        )
634    }
635
636    fn before_channel_close(&self) {
637        self.set_closing(None);
638    }
639
640    fn on_channel_close_ok_sent(&self, error: Option<Error>) {
641        let err = error.clone().unwrap_or_else(|| {
642            ErrorKind::InvalidChannelState(ChannelState::Closing, "channel.close-ok sent").into()
643        });
644        self.poison(err.clone());
645        self.set_closed(err);
646        if let Some(error) = error {
647            self.events_sender.error(error.clone());
648        }
649    }
650
651    fn on_basic_recover_async_sent(&self) {
652        self.consumers.drop_prefetched_messages();
653    }
654
655    fn on_basic_ack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
656        if multiple && delivery_tag == 0 {
657            self.consumers.drop_prefetched_messages();
658        }
659    }
660
661    fn on_basic_nack_sent(&self, multiple: bool, delivery_tag: DeliveryTag) {
662        if multiple && delivery_tag == 0 {
663            self.consumers.drop_prefetched_messages();
664        }
665    }
666
667    fn tune_connection_configuration(
668        &self,
669        channel_max: ChannelId,
670        frame_max: FrameSize,
671        heartbeat: Heartbeat,
672    ) {
673        // If we disable the heartbeat (0) but the server don't, follow it and enable it too
674        // If both us and the server want heartbeat enabled, pick the lowest value.
675        if self.configuration.heartbeat() == 0
676            || (heartbeat != 0 && heartbeat < self.configuration.heartbeat())
677        {
678            self.configuration.set_heartbeat(heartbeat);
679        }
680
681        if channel_max != 0 {
682            // 0 means we want to take the server's value
683            // If both us and the server specified a channel_max, pick the lowest value.
684            if self.configuration.channel_max() == 0
685                || channel_max < self.configuration.channel_max()
686            {
687                self.configuration.set_channel_max(channel_max);
688            }
689        }
690        if self.configuration.channel_max() == 0 {
691            self.configuration.set_channel_max(ChannelId::MAX);
692        }
693
694        if frame_max != 0 {
695            // 0 means we want to take the server's value
696            // If both us and the server specified a frame_max, pick the lowest value.
697            if self.configuration.frame_max() == 0 || frame_max < self.configuration.frame_max() {
698                self.configuration.set_frame_max(frame_max);
699            }
700        }
701        if self.configuration.frame_max() == 0 {
702            self.configuration.set_frame_max(FrameSize::MAX);
703        }
704    }
705
706    fn connection_process_error(
707        &self,
708        state: ConnectionState,
709        step: Option<ConnectionStep>,
710    ) -> Result<()> {
711        error!(
712            ?state,
713            step = step.as_ref().map(ConnectionStep::name),
714            "Invalid state"
715        );
716        let error: Error = ErrorKind::InvalidConnectionState(state).into();
717        self.internal_rpc.set_connection_error(error.clone());
718        if let Some((resolver, _)) = step.map(ConnectionStep::into_connection_resolver) {
719            resolver.reject(error.clone());
720        }
721        Err(error)
722    }
723
724    fn on_connection_start_received(&self, method: protocol::connection::Start) -> Result<()> {
725        trace!(?method, "Server sent connection::Start");
726
727        let state = self.connection_status.state();
728        let step = self.connection_status.connection_step();
729        if state != ConnectionState::Connecting {
730            return self.connection_process_error(state, step);
731        }
732        let Some(step) = step else {
733            return self.connection_process_error(state, step);
734        };
735
736        match step {
737            ConnectionStep::ProtocolHeader(resolver, mut connection) => {
738                let configuration = connection.configuration_mut();
739                let auth_provider = configuration.auth_provider.clone();
740                let mechanism = auth_provider.mechanism();
741                let locale = configuration.amqp_locale.clone();
742
743                if !method
744                    .mechanisms
745                    .to_string()
746                    .split_whitespace()
747                    .any(|m| m == mechanism.as_str())
748                {
749                    error!(%mechanism, "unsupported mechanism");
750                }
751                if !method
752                    .locales
753                    .to_string()
754                    .split_whitespace()
755                    .any(|l| l == locale.as_str())
756                {
757                    error!(%locale, "unsupported locale");
758                }
759
760                if !configuration.amqp_client_properties.contains_key("product")
761                    || !configuration.amqp_client_properties.contains_key("version")
762                {
763                    configuration.amqp_client_properties.insert(
764                        "product".into(),
765                        AMQPValue::LongString(env!("CARGO_PKG_NAME").into()),
766                    );
767                    configuration.amqp_client_properties.insert(
768                        "version".into(),
769                        AMQPValue::LongString(env!("CARGO_PKG_VERSION").into()),
770                    );
771                }
772
773                configuration
774                    .amqp_client_properties
775                    .insert("platform".into(), AMQPValue::LongString("rust".into()));
776
777                let mut capabilities = FieldTable::default();
778                capabilities.insert("publisher_confirms".into(), true.into());
779                capabilities.insert("exchange_exchange_bindings".into(), true.into());
780                capabilities.insert("basic.nack".into(), true.into());
781                capabilities.insert("consumer_cancel_notify".into(), true.into());
782                capabilities.insert("connection.blocked".into(), true.into());
783                capabilities.insert("consumer_priorities".into(), true.into());
784                capabilities.insert("authentication_failure_close".into(), true.into());
785                capabilities.insert("per_consumer_qos".into(), true.into());
786                capabilities.insert("direct_reply_to".into(), true.into());
787
788                configuration
789                    .amqp_client_properties
790                    .insert("capabilities".into(), AMQPValue::FieldTable(capabilities));
791
792                let auth_starter = auth_provider
793                    .auth_starter()
794                    .map_err(ErrorKind::AuthProviderError)?;
795                let channel = self.clone();
796                let client_properties = configuration.amqp_client_properties.clone();
797                self.internal_rpc.spawn(async move {
798                    channel
799                        .connection_start_ok(
800                            client_properties,
801                            mechanism,
802                            auth_starter,
803                            locale,
804                            resolver,
805                            connection,
806                            auth_provider,
807                        )
808                        .await
809                });
810                Ok(())
811            }
812            step => self.connection_process_error(state, Some(step)),
813        }
814    }
815
816    fn on_connection_secure_received(&self, method: protocol::connection::Secure) -> Result<()> {
817        trace!(?method, "Server sent connection::Secure");
818
819        let state = self.connection_status.state();
820        let step = self.connection_status.connection_step();
821        if state != ConnectionState::Connecting {
822            return self.connection_process_error(state, step);
823        }
824        let Some(step) = step else {
825            return self.connection_process_error(state, step);
826        };
827
828        match step {
829            ConnectionStep::StartOk(resolver, connection, auth_provider)
830            | ConnectionStep::SecureOk(resolver, connection, auth_provider) => {
831                let channel = self.clone();
832                let response = auth_provider
833                    .continue_auth(method.challenge)
834                    .map_err(ErrorKind::AuthProviderError)?;
835                self.internal_rpc.spawn(async move {
836                    channel
837                        .connection_secure_ok(response, resolver, connection, auth_provider)
838                        .await
839                });
840                Ok(())
841            }
842            step => self.connection_process_error(state, Some(step)),
843        }
844    }
845
846    fn on_connection_tune_received(&self, method: protocol::connection::Tune) -> Result<()> {
847        trace!(?method, "Server sent Connection::Tune");
848
849        let state = self.connection_status.state();
850        let step = self.connection_status.connection_step();
851        if state != ConnectionState::Connecting {
852            return self.connection_process_error(state, step);
853        }
854        let Some(step) = step else {
855            return self.connection_process_error(state, step);
856        };
857
858        match step {
859            ConnectionStep::StartOk(resolver, connection, _)
860            | ConnectionStep::SecureOk(resolver, connection, _) => {
861                self.tune_connection_configuration(
862                    method.channel_max,
863                    method.frame_max,
864                    method.heartbeat,
865                );
866
867                let channel = self.clone();
868                let configuration = self.configuration.clone();
869                let vhost = self.connection_status.vhost();
870                self.internal_rpc.spawn(async move {
871                    channel
872                        .connection_tune_ok(
873                            configuration.channel_max(),
874                            configuration.frame_max(),
875                            configuration.heartbeat(),
876                        )
877                        .await?;
878                    channel
879                        .connection_open(vhost, Box::new(connection), resolver)
880                        .await
881                });
882                Ok(())
883            }
884            step => self.connection_process_error(state, Some(step)),
885        }
886    }
887
888    #[allow(clippy::boxed_local)]
889    fn on_connection_open_ok_received(
890        &self,
891        _: protocol::connection::OpenOk,
892        connection: Box<Connection>,
893    ) -> Result<()> {
894        let state = self.connection_status.state();
895        let step = self.connection_status.connection_step();
896        if state != ConnectionState::Connecting {
897            return self.connection_process_error(state, step);
898        }
899        let Some(step) = step else {
900            return self.connection_process_error(state, step);
901        };
902
903        match step {
904            ConnectionStep::Open(resolver) => {
905                self.connection_status.set_state(ConnectionState::Connected);
906                resolver.resolve(*connection);
907                self.events_sender.connected();
908                Ok(())
909            }
910            step => self.connection_process_error(state, Some(step)),
911        }
912    }
913
914    fn on_connection_close_received(&self, method: protocol::connection::Close) -> Result<()> {
915        let error: Error = AMQPError::try_from(method.clone())
916            .map(|error| {
917                error!(
918                    channel=%self.id,
919                    ?method,
920                    ?error,
921                    "Connection closed",
922                );
923                ErrorKind::ProtocolError(error).into()
924            })
925            .unwrap_or_else(|error| {
926                error!(%error);
927                info!(channel=%self.id, ?method, "Connection closed");
928                ErrorKind::InvalidConnectionState(ConnectionState::Closed).into()
929            });
930        if self.recovery_config.can_recover(&error) {
931            self.internal_rpc.init_connection_recovery(error.clone());
932        } else {
933            let connection_resolver = self.connection_status.connection_resolver();
934            self.internal_rpc
935                .init_connection_shutdown(error.clone(), connection_resolver);
936        }
937        self.internal_rpc.send_connection_close_ok(error);
938        Ok(())
939    }
940
941    fn on_connection_blocked_received(&self, method: protocol::connection::Blocked) -> Result<()> {
942        self.connection_status.block();
943        self.events_sender.connection_blocked(method.reason.into());
944        Ok(())
945    }
946
947    fn on_connection_unblocked_received(
948        &self,
949        _method: protocol::connection::Unblocked,
950    ) -> Result<()> {
951        self.connection_status.unblock();
952        self.events_sender.connection_unblocked();
953        self.wake();
954        Ok(())
955    }
956
957    fn on_connection_close_ok_received(&self) -> Result<()> {
958        self.internal_rpc.set_connection_closed(
959            ErrorKind::InvalidConnectionState(ConnectionState::Closed).into(),
960        );
961        Ok(())
962    }
963
964    fn on_channel_open_ok_received(
965        &self,
966        _method: protocol::channel::OpenOk,
967        resolver: PromiseResolver<Channel>,
968        channel: Channel,
969    ) -> Result<()> {
970        if !self.status.confirm() {
971            self.finalize_connection();
972        }
973        resolver.resolve(channel);
974        Ok(())
975    }
976
977    fn on_channel_flow_received(&self, method: protocol::channel::Flow) -> Result<()> {
978        self.status.set_send_flow(method.active);
979        self.events_sender.send_flow(method.active);
980        let channel = self.clone();
981        self.internal_rpc.spawn(async move {
982            channel
983                .channel_flow_ok(ChannelFlowOkOptions {
984                    active: method.active,
985                })
986                .await
987        });
988        Ok(())
989    }
990
991    fn on_channel_flow_ok_received(
992        &self,
993        method: protocol::channel::FlowOk,
994        resolver: PromiseResolver<Boolean>,
995    ) -> Result<()> {
996        // Nothing to do here, the server just confirmed that we paused/resumed the receiving flow
997        resolver.resolve(method.active);
998        Ok(())
999    }
1000
1001    fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<()> {
1002        let error = AMQPError::try_from(method.clone()).map(|error| {
1003                error!(
1004                    channel=%self.id, ?method, ?error,
1005                    "Channel closed"
1006                );
1007                Error::from(ErrorKind::ProtocolError(error))
1008            }).map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
1009        self.set_closing(error.clone());
1010        let channel = self.clone();
1011        self.internal_rpc
1012            .spawn(async move { channel.channel_close_ok(error).await });
1013        Ok(())
1014    }
1015
1016    fn on_channel_close_ok_received(&self) -> Result<()> {
1017        self.set_closed(
1018            ErrorKind::InvalidChannelState(ChannelState::Closed, "channel.close-ok received")
1019                .into(),
1020        );
1021        Ok(())
1022    }
1023
1024    fn on_exchange_bind_ok_received(
1025        &self,
1026        destination: ShortString,
1027        source: ShortString,
1028        routing_key: ShortString,
1029        arguments: FieldTable,
1030    ) -> Result<()> {
1031        self.local_registry
1032            .register_exchange_binding(destination, source, routing_key, arguments);
1033        Ok(())
1034    }
1035
1036    fn on_exchange_unbind_ok_received(
1037        &self,
1038        destination: ShortString,
1039        source: ShortString,
1040        routing_key: ShortString,
1041        arguments: FieldTable,
1042    ) -> Result<()> {
1043        self.local_registry.deregister_exchange_binding(
1044            destination.as_str(),
1045            source.as_str(),
1046            routing_key.as_str(),
1047            &arguments,
1048        );
1049        Ok(())
1050    }
1051
1052    fn on_exchange_declare_ok_received(
1053        &self,
1054        resolver: PromiseResolver<()>,
1055        exchange: ShortString,
1056        kind: ExchangeKind,
1057        options: ExchangeDeclareOptions,
1058        arguments: FieldTable,
1059    ) -> Result<()> {
1060        self.local_registry
1061            .register_exchange(exchange, kind, options, arguments);
1062        resolver.resolve(());
1063        Ok(())
1064    }
1065
1066    fn on_exchange_delete_ok_received(&self, exchange: ShortString) -> Result<()> {
1067        self.local_registry.deregister_exchange(exchange.as_str());
1068        Ok(())
1069    }
1070
1071    fn on_queue_delete_ok_received(
1072        &self,
1073        method: protocol::queue::DeleteOk,
1074        resolver: PromiseResolver<MessageCount>,
1075        queue: ShortString,
1076    ) -> Result<()> {
1077        self.local_registry.deregister_queue(queue.as_str());
1078        resolver.resolve(method.message_count);
1079        Ok(())
1080    }
1081
1082    fn on_queue_purge_ok_received(
1083        &self,
1084        method: protocol::queue::PurgeOk,
1085        resolver: PromiseResolver<MessageCount>,
1086    ) -> Result<()> {
1087        resolver.resolve(method.message_count);
1088        Ok(())
1089    }
1090
1091    fn on_queue_declare_ok_received(
1092        &self,
1093        method: protocol::queue::DeclareOk,
1094        resolver: PromiseResolver<Queue>,
1095        options: QueueDeclareOptions,
1096        arguments: FieldTable,
1097    ) -> Result<()> {
1098        self.local_registry
1099            .register_queue(method.queue.clone(), options, arguments);
1100        resolver.resolve(Queue::new(
1101            method.queue,
1102            method.message_count,
1103            method.consumer_count,
1104        ));
1105        Ok(())
1106    }
1107
1108    fn on_queue_bind_ok_received(
1109        &self,
1110        queue: ShortString,
1111        exchange: ShortString,
1112        routing_key: ShortString,
1113        arguments: FieldTable,
1114    ) -> Result<()> {
1115        self.local_registry
1116            .register_queue_binding(queue, exchange, routing_key, arguments);
1117        Ok(())
1118    }
1119
1120    fn on_queue_unbind_ok_received(
1121        &self,
1122        queue: ShortString,
1123        exchange: ShortString,
1124        routing_key: ShortString,
1125        arguments: FieldTable,
1126    ) -> Result<()> {
1127        self.local_registry.deregister_queue_binding(
1128            queue.as_str(),
1129            exchange.as_str(),
1130            routing_key.as_str(),
1131            &arguments,
1132        );
1133        Ok(())
1134    }
1135
1136    fn on_basic_get_ok_received(
1137        &self,
1138        method: protocol::basic::GetOk,
1139        resolver: PromiseResolver<Option<BasicGetMessage>>,
1140    ) -> Result<()> {
1141        let class_id = method.get_amqp_class_id();
1142        let killswitch = self.status.set_will_receive(class_id, DeliveryCause::Get);
1143        self.basic_get_delivery.start_new_delivery(
1144            BasicGetMessage::new(
1145                self.id,
1146                method.delivery_tag,
1147                method.exchange,
1148                method.routing_key,
1149                method.redelivered,
1150                method.message_count,
1151                self.internal_rpc.clone(),
1152                killswitch,
1153            ),
1154            resolver,
1155        );
1156        Ok(())
1157    }
1158
1159    fn on_basic_get_empty_received(&self, method: protocol::basic::GetEmpty) -> Result<()> {
1160        match self
1161            .frames
1162            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
1163        {
1164            Some(Reply::BasicGetOk(resolver, ..)) => {
1165                resolver.resolve(None);
1166                Ok(())
1167            }
1168            _ => self.handle_invalid_contents(
1169                format!("unexpected basic get empty received on channel {}", self.id),
1170                method.get_amqp_class_id(),
1171                method.get_amqp_method_id(),
1172            ),
1173        }
1174    }
1175
1176    fn on_basic_consume_ok_received(
1177        &self,
1178        method: protocol::basic::ConsumeOk,
1179        resolver: PromiseResolver<Consumer>,
1180        channel_closer: Option<Arc<ChannelCloser>>,
1181        queue: ShortString,
1182        options: BasicConsumeOptions,
1183        arguments: FieldTable,
1184        original: Option<Consumer>,
1185    ) -> Result<()> {
1186        let consumer = original.unwrap_or_else(|| {
1187            Consumer::new(
1188                method.consumer_tag.clone(),
1189                self.internal_rpc.clone(),
1190                channel_closer,
1191                queue,
1192                options,
1193                arguments,
1194            )
1195        });
1196        self.consumers
1197            .register(method.consumer_tag, consumer.clone());
1198        resolver.resolve(consumer);
1199        Ok(())
1200    }
1201
1202    fn on_basic_deliver_received(&self, method: protocol::basic::Deliver) -> Result<()> {
1203        let class_id = method.get_amqp_class_id();
1204        let consumer_tag = method.consumer_tag.clone();
1205        let killswitch = self
1206            .status
1207            .set_will_receive(class_id, DeliveryCause::Consume(consumer_tag.clone()));
1208        self.consumers.start_delivery(&consumer_tag, |error| {
1209            Delivery::new(
1210                self.id,
1211                method.delivery_tag,
1212                method.exchange,
1213                method.routing_key,
1214                method.redelivered,
1215                Some(self.internal_rpc.clone()),
1216                Some(error),
1217                killswitch,
1218            )
1219        });
1220        Ok(())
1221    }
1222
1223    pub(crate) fn deregister_consumer(&self, consumer_tag: &str) {
1224        self.consumers.deregister(consumer_tag);
1225    }
1226
1227    fn on_basic_cancel_received(&self, method: protocol::basic::Cancel) -> Result<()> {
1228        self.deregister_consumer(method.consumer_tag.as_str());
1229        if !method.nowait {
1230            let channel = self.clone();
1231            self.internal_rpc
1232                .spawn(async move { channel.basic_cancel_ok(method.consumer_tag).await });
1233        }
1234        Ok(())
1235    }
1236
1237    fn on_basic_cancel_ok_received(&self, method: protocol::basic::CancelOk) -> Result<()> {
1238        self.deregister_consumer(method.consumer_tag.as_str());
1239        Ok(())
1240    }
1241
1242    fn on_basic_ack_received(&self, method: protocol::basic::Ack) -> Result<()> {
1243        if self.status.confirm() {
1244            if method.multiple {
1245                if method.delivery_tag > 0 {
1246                    self.acknowledgements
1247                        .ack_all_before(method.delivery_tag)
1248                        .or_else(|err| {
1249                            self.acknowledgement_error(
1250                                err,
1251                                method.get_amqp_class_id(),
1252                                method.get_amqp_method_id(),
1253                            )
1254                        })?;
1255                } else {
1256                    self.acknowledgements.ack_all_pending();
1257                }
1258            } else {
1259                self.acknowledgements
1260                    .ack(method.delivery_tag)
1261                    .or_else(|err| {
1262                        self.acknowledgement_error(
1263                            err,
1264                            method.get_amqp_class_id(),
1265                            method.get_amqp_method_id(),
1266                        )
1267                    })?;
1268            }
1269        }
1270        Ok(())
1271    }
1272
1273    fn on_basic_nack_received(&self, method: protocol::basic::Nack) -> Result<()> {
1274        if self.status.confirm() {
1275            if method.multiple {
1276                if method.delivery_tag > 0 {
1277                    self.acknowledgements
1278                        .nack_all_before(method.delivery_tag)
1279                        .or_else(|err| {
1280                            self.acknowledgement_error(
1281                                err,
1282                                method.get_amqp_class_id(),
1283                                method.get_amqp_method_id(),
1284                            )
1285                        })?;
1286                } else {
1287                    self.acknowledgements.nack_all_pending();
1288                }
1289            } else {
1290                self.acknowledgements
1291                    .nack(method.delivery_tag)
1292                    .or_else(|err| {
1293                        self.acknowledgement_error(
1294                            err,
1295                            method.get_amqp_class_id(),
1296                            method.get_amqp_method_id(),
1297                        )
1298                    })?;
1299            }
1300        }
1301        Ok(())
1302    }
1303
1304    fn on_basic_return_received(&self, method: protocol::basic::Return) -> Result<()> {
1305        let class_id = method.get_amqp_class_id();
1306        let killswitch = self
1307            .status
1308            .set_will_receive(class_id, DeliveryCause::Return);
1309        self.returned_messages
1310            .start_new_delivery(BasicReturnMessage::new(
1311                method.exchange,
1312                method.routing_key,
1313                method.reply_code,
1314                method.reply_text,
1315                killswitch,
1316            ));
1317        Ok(())
1318    }
1319
1320    fn on_basic_recover_ok_received(&self) -> Result<()> {
1321        self.consumers.drop_prefetched_messages();
1322        Ok(())
1323    }
1324
1325    fn on_confirm_select_ok_received(&self) -> Result<()> {
1326        self.status.set_confirm();
1327        Ok(())
1328    }
1329
1330    fn on_access_request_ok_received(&self, _: protocol::access::RequestOk) -> Result<()> {
1331        Ok(())
1332    }
1333}
1334
1335#[cfg(feature = "codegen")]
1336include!(concat!(env!("OUT_DIR"), "/channel.rs"));
1337#[cfg(not(feature = "codegen"))]
1338include!("generated/channel.rs");