lapin/
generated.rs

1pub mod options {
2    use super::*;
3
4    #[derive(Copy, Clone, Debug, Default, PartialEq)]
5    pub struct BasicQosOptions {
6        pub global: Boolean,
7    }
8
9    #[derive(Copy, Clone, Debug, Default, PartialEq)]
10    pub struct BasicConsumeOptions {
11        pub no_local: Boolean,
12        pub no_ack: Boolean,
13        pub exclusive: Boolean,
14        pub nowait: Boolean,
15    }
16
17    #[derive(Copy, Clone, Debug, Default, PartialEq)]
18    pub struct BasicCancelOptions {
19        pub nowait: Boolean,
20    }
21
22    #[derive(Copy, Clone, Debug, Default, PartialEq)]
23    pub struct BasicPublishOptions {
24        pub mandatory: Boolean,
25        pub immediate: Boolean,
26    }
27
28    #[derive(Copy, Clone, Debug, Default, PartialEq)]
29    pub struct BasicDeliverOptions {
30        pub redelivered: Boolean,
31    }
32
33    #[derive(Copy, Clone, Debug, Default, PartialEq)]
34    pub struct BasicGetOptions {
35        pub no_ack: Boolean,
36    }
37
38    #[derive(Copy, Clone, Debug, Default, PartialEq)]
39    pub struct BasicGetOkOptions {
40        pub redelivered: Boolean,
41    }
42
43    #[derive(Copy, Clone, Debug, Default, PartialEq)]
44    pub struct BasicAckOptions {
45        pub multiple: Boolean,
46    }
47
48    #[derive(Copy, Clone, Debug, Default, PartialEq)]
49    pub struct BasicRejectOptions {
50        pub requeue: Boolean,
51    }
52
53    #[derive(Copy, Clone, Debug, Default, PartialEq)]
54    pub struct BasicRecoverAsyncOptions {
55        pub requeue: Boolean,
56    }
57
58    #[derive(Copy, Clone, Debug, Default, PartialEq)]
59    pub struct BasicRecoverOptions {
60        pub requeue: Boolean,
61    }
62
63    #[derive(Copy, Clone, Debug, Default, PartialEq)]
64    pub struct BasicNackOptions {
65        pub multiple: Boolean,
66        pub requeue: Boolean,
67    }
68
69    #[derive(Copy, Clone, Debug, Default, PartialEq)]
70    pub struct ChannelFlowOptions {
71        pub active: Boolean,
72    }
73
74    #[derive(Copy, Clone, Debug, Default, PartialEq)]
75    pub struct ChannelFlowOkOptions {
76        pub active: Boolean,
77    }
78
79    #[derive(Copy, Clone, Debug, Default, PartialEq)]
80    pub struct AccessRequestOptions {
81        pub exclusive: Boolean,
82        pub passive: Boolean,
83        pub active: Boolean,
84        pub write: Boolean,
85        pub read: Boolean,
86    }
87
88    #[derive(Copy, Clone, Debug, Default, PartialEq)]
89    pub struct ExchangeDeclareOptions {
90        pub passive: Boolean,
91        pub durable: Boolean,
92        pub auto_delete: Boolean,
93        pub internal: Boolean,
94        pub nowait: Boolean,
95    }
96
97    #[derive(Copy, Clone, Debug, Default, PartialEq)]
98    pub struct ExchangeDeleteOptions {
99        pub if_unused: Boolean,
100        pub nowait: Boolean,
101    }
102
103    #[derive(Copy, Clone, Debug, Default, PartialEq)]
104    pub struct ExchangeBindOptions {
105        pub nowait: Boolean,
106    }
107
108    #[derive(Copy, Clone, Debug, Default, PartialEq)]
109    pub struct ExchangeUnbindOptions {
110        pub nowait: Boolean,
111    }
112
113    #[derive(Copy, Clone, Debug, Default, PartialEq)]
114    pub struct QueueDeclareOptions {
115        pub passive: Boolean,
116        pub durable: Boolean,
117        pub exclusive: Boolean,
118        pub auto_delete: Boolean,
119        pub nowait: Boolean,
120    }
121
122    #[derive(Copy, Clone, Debug, Default, PartialEq)]
123    pub struct QueueBindOptions {
124        pub nowait: Boolean,
125    }
126
127    #[derive(Copy, Clone, Debug, Default, PartialEq)]
128    pub struct QueuePurgeOptions {
129        pub nowait: Boolean,
130    }
131
132    #[derive(Copy, Clone, Debug, Default, PartialEq)]
133    pub struct QueueDeleteOptions {
134        pub if_unused: Boolean,
135        pub if_empty: Boolean,
136        pub nowait: Boolean,
137    }
138
139    #[derive(Copy, Clone, Debug, Default, PartialEq)]
140    pub struct ConfirmSelectOptions {
141        pub nowait: Boolean,
142    }
143}
144
145use options::*;
146
147#[derive(Debug)]
148#[allow(clippy::enum_variant_names)]
149pub(crate) enum Reply {
150    BasicQosOk(PromiseResolver<()>),
151    BasicConsumeOk(
152        PromiseResolver<Consumer>,
153        Option<Arc<ChannelCloser>>,
154        ShortString,
155        BasicConsumeOptions,
156        FieldTable,
157        Option<Consumer>,
158    ),
159    BasicCancelOk(PromiseResolver<()>),
160    BasicGetOk(PromiseResolver<Option<BasicGetMessage>>),
161    BasicRecoverOk(PromiseResolver<()>),
162    ConnectionOpenOk(PromiseResolver<()>, Box<Connection>),
163    ConnectionCloseOk(PromiseResolver<()>),
164    ConnectionUpdateSecretOk(PromiseResolver<()>),
165    ChannelOpenOk(PromiseResolver<Channel>, Channel),
166    ChannelFlowOk(PromiseResolver<Boolean>),
167    ChannelCloseOk(PromiseResolver<()>),
168    AccessRequestOk(PromiseResolver<()>),
169    ExchangeDeclareOk(
170        PromiseResolver<()>,
171        ShortString,
172        ExchangeKind,
173        ExchangeDeclareOptions,
174        FieldTable,
175    ),
176    ExchangeDeleteOk(PromiseResolver<()>, ShortString),
177    ExchangeBindOk(
178        PromiseResolver<()>,
179        ShortString,
180        ShortString,
181        ShortString,
182        FieldTable,
183    ),
184    ExchangeUnbindOk(
185        PromiseResolver<()>,
186        ShortString,
187        ShortString,
188        ShortString,
189        FieldTable,
190    ),
191    QueueDeclareOk(PromiseResolver<Queue>, QueueDeclareOptions, FieldTable),
192    QueueBindOk(
193        PromiseResolver<()>,
194        ShortString,
195        ShortString,
196        ShortString,
197        FieldTable,
198    ),
199    QueuePurgeOk(PromiseResolver<MessageCount>),
200    QueueDeleteOk(PromiseResolver<MessageCount>, ShortString),
201    QueueUnbindOk(
202        PromiseResolver<()>,
203        ShortString,
204        ShortString,
205        ShortString,
206        FieldTable,
207    ),
208    TxSelectOk(PromiseResolver<()>),
209    TxCommitOk(PromiseResolver<()>),
210    TxRollbackOk(PromiseResolver<()>),
211    ConfirmSelectOk(PromiseResolver<()>),
212}
213
214impl Channel {
215    pub(crate) fn receive_method(&self, method: AMQPClass) -> Result<()> {
216        match method {
217            AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk(m)) => self.receive_basic_qos_ok(m),
218            AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk(m)) => {
219                self.receive_basic_consume_ok(m)
220            }
221            AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(m)) => {
222                self.receive_basic_cancel(m)
223            }
224            AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(m)) => {
225                self.receive_basic_cancel_ok(m)
226            }
227            AMQPClass::Basic(protocol::basic::AMQPMethod::Return(m)) => {
228                self.receive_basic_return(m)
229            }
230            AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver(m)) => {
231                self.receive_basic_deliver(m)
232            }
233            AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk(m)) => self.receive_basic_get_ok(m),
234            AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty(m)) => {
235                self.receive_basic_get_empty(m)
236            }
237            AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(m)) => self.receive_basic_ack(m),
238            AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk(m)) => {
239                self.receive_basic_recover_ok(m)
240            }
241            AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(m)) => self.receive_basic_nack(m),
242            AMQPClass::Connection(protocol::connection::AMQPMethod::Start(m)) => {
243                self.receive_connection_start(m)
244            }
245            AMQPClass::Connection(protocol::connection::AMQPMethod::Secure(m)) => {
246                self.receive_connection_secure(m)
247            }
248            AMQPClass::Connection(protocol::connection::AMQPMethod::Tune(m)) => {
249                self.receive_connection_tune(m)
250            }
251            AMQPClass::Connection(protocol::connection::AMQPMethod::OpenOk(m)) => {
252                self.receive_connection_open_ok(m)
253            }
254            AMQPClass::Connection(protocol::connection::AMQPMethod::Close(m)) => {
255                self.receive_connection_close(m)
256            }
257            AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(m)) => {
258                self.receive_connection_close_ok(m)
259            }
260            AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(m)) => {
261                self.receive_connection_blocked(m)
262            }
263            AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(m)) => {
264                self.receive_connection_unblocked(m)
265            }
266            AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecretOk(m)) => {
267                self.receive_connection_update_secret_ok(m)
268            }
269            AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk(m)) => {
270                self.receive_channel_open_ok(m)
271            }
272            AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(m)) => {
273                self.receive_channel_flow(m)
274            }
275            AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(m)) => {
276                self.receive_channel_flow_ok(m)
277            }
278            AMQPClass::Channel(protocol::channel::AMQPMethod::Close(m)) => {
279                self.receive_channel_close(m)
280            }
281            AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(m)) => {
282                self.receive_channel_close_ok(m)
283            }
284            AMQPClass::Access(protocol::access::AMQPMethod::RequestOk(m)) => {
285                self.receive_access_request_ok(m)
286            }
287            AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk(m)) => {
288                self.receive_exchange_declare_ok(m)
289            }
290            AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk(m)) => {
291                self.receive_exchange_delete_ok(m)
292            }
293            AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk(m)) => {
294                self.receive_exchange_bind_ok(m)
295            }
296            AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk(m)) => {
297                self.receive_exchange_unbind_ok(m)
298            }
299            AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk(m)) => {
300                self.receive_queue_declare_ok(m)
301            }
302            AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk(m)) => {
303                self.receive_queue_bind_ok(m)
304            }
305            AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk(m)) => {
306                self.receive_queue_purge_ok(m)
307            }
308            AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk(m)) => {
309                self.receive_queue_delete_ok(m)
310            }
311            AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk(m)) => {
312                self.receive_queue_unbind_ok(m)
313            }
314            AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk(m)) => self.receive_tx_select_ok(m),
315            AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk(m)) => self.receive_tx_commit_ok(m),
316            AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk(m)) => {
317                self.receive_tx_rollback_ok(m)
318            }
319            AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk(m)) => {
320                self.receive_confirm_select_ok(m)
321            }
322            m => {
323                error!(method=?m, "The client should not receive this method");
324                self.handle_invalid_contents(
325                    format!("unexpected method received on channel {}", self.id),
326                    m.get_amqp_class_id(),
327                    m.get_amqp_method_id(),
328                )
329            }
330        }
331    }
332
333    pub async fn basic_qos(
334        &self,
335        prefetch_count: ShortUInt,
336        options: BasicQosOptions,
337    ) -> Result<()> {
338        if !self.status.connected() {
339            return Err(self.status.state_error("basic.qos"));
340        }
341
342        let BasicQosOptions { global } = options;
343        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Qos(protocol::basic::Qos {
344            prefetch_count,
345            global,
346        }));
347
348        let (promise, send_resolver) = Promise::new();
349        if level_enabled!(Level::TRACE) {
350            promise.set_marker("basic.qos".into());
351        }
352        let ((promise, resolver), promise_out) = (Promise::new(), promise);
353        if level_enabled!(Level::TRACE) {
354            promise.set_marker("basic.qos.Ok".into());
355        }
356        self.send_method_frame(
357            method,
358            send_resolver,
359            Some(ExpectedReply(
360                Reply::BasicQosOk(resolver.clone()),
361                Box::new(resolver),
362            )),
363        );
364        promise_out.await?;
365        promise.await
366    }
367    fn receive_basic_qos_ok(&self, method: protocol::basic::QosOk) -> Result<()> {
368        if !self.status.can_receive_messages() {
369            return Err(self.status.state_error("basic.qos-ok"));
370        }
371
372        match self
373            .frames
374            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicQosOk(..)))
375        {
376            Some(Reply::BasicQosOk(resolver)) => {
377                let res = Ok(());
378                resolver.complete(res.clone());
379                res
380            }
381            unexpected => self.handle_invalid_contents(
382                format!(
383                    "unexpected basic qos-ok received on channel {}, was awaiting for {:?}",
384                    self.id, unexpected
385                ),
386                method.get_amqp_class_id(),
387                method.get_amqp_method_id(),
388            ),
389        }
390    }
391    async fn do_basic_consume(
392        &self,
393        queue: &str,
394        consumer_tag: &str,
395        options: BasicConsumeOptions,
396        arguments: FieldTable,
397        original: Option<Consumer>,
398    ) -> Result<Consumer> {
399        if !self.status.connected() {
400            return Err(self.status.state_error("basic.consume"));
401        }
402
403        let creation_arguments = arguments.clone();
404        let BasicConsumeOptions {
405            no_local,
406            no_ack,
407            exclusive,
408            nowait,
409        } = options;
410        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Consume(
411            protocol::basic::Consume {
412                queue: queue.into(),
413                consumer_tag: consumer_tag.into(),
414                no_local,
415                no_ack,
416                exclusive,
417                nowait,
418                arguments,
419            },
420        ));
421
422        let (promise, send_resolver) = Promise::new();
423        if level_enabled!(Level::TRACE) {
424            promise.set_marker("basic.consume".into());
425        }
426        let ((promise, resolver), promise_out) = (Promise::new(), promise);
427        if level_enabled!(Level::TRACE) {
428            promise.set_marker("basic.consume.Ok".into());
429        }
430        self.send_method_frame(
431            method,
432            send_resolver,
433            Some(ExpectedReply(
434                Reply::BasicConsumeOk(
435                    resolver.clone(),
436                    self.channel_closer.clone(),
437                    queue.into(),
438                    options,
439                    creation_arguments,
440                    original,
441                ),
442                Box::new(resolver),
443            )),
444        );
445        if nowait {
446            self.receive_basic_consume_ok(protocol::basic::ConsumeOk {
447                consumer_tag: consumer_tag.into(),
448            })?;
449        }
450        promise_out.await?;
451        promise.await
452    }
453    fn receive_basic_consume_ok(&self, method: protocol::basic::ConsumeOk) -> Result<()> {
454        if !self.status.can_receive_messages() {
455            return Err(self.status.state_error("basic.consume-ok"));
456        }
457
458        match self.frames.find_expected_reply(self.id, |reply| {
459            matches!(&reply.0, Reply::BasicConsumeOk(..))
460        }) {
461            Some(Reply::BasicConsumeOk(
462                resolver,
463                channel_closer,
464                queue,
465                options,
466                creation_arguments,
467                original,
468            )) => self.on_basic_consume_ok_received(
469                method,
470                resolver,
471                channel_closer,
472                queue,
473                options,
474                creation_arguments,
475                original,
476            ),
477            unexpected => self.handle_invalid_contents(
478                format!(
479                    "unexpected basic consume-ok received on channel {}, was awaiting for {:?}",
480                    self.id, unexpected
481                ),
482                method.get_amqp_class_id(),
483                method.get_amqp_method_id(),
484            ),
485        }
486    }
487    pub async fn basic_cancel(
488        &self,
489        consumer_tag: &str,
490        options: BasicCancelOptions,
491    ) -> Result<()> {
492        if !self.status.connected() {
493            return Err(self.status.state_error("basic.cancel"));
494        }
495
496        self.before_basic_cancel(consumer_tag);
497        let BasicCancelOptions { nowait } = options;
498        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(
499            protocol::basic::Cancel {
500                consumer_tag: consumer_tag.into(),
501                nowait,
502            },
503        ));
504
505        let (promise, send_resolver) = Promise::new();
506        if level_enabled!(Level::TRACE) {
507            promise.set_marker("basic.cancel".into());
508        }
509        let ((promise, resolver), promise_out) = (Promise::new(), promise);
510        if level_enabled!(Level::TRACE) {
511            promise.set_marker("basic.cancel.Ok".into());
512        }
513        self.send_method_frame(
514            method,
515            send_resolver,
516            Some(ExpectedReply(
517                Reply::BasicCancelOk(resolver.clone()),
518                Box::new(resolver),
519            )),
520        );
521        if nowait {
522            self.receive_basic_cancel_ok(protocol::basic::CancelOk {
523                consumer_tag: consumer_tag.into(),
524            })?;
525        }
526        promise_out.await?;
527        promise.await
528    }
529    fn receive_basic_cancel(&self, method: protocol::basic::Cancel) -> Result<()> {
530        if !self.status.can_receive_messages() {
531            return Err(self.status.state_error("basic.cancel"));
532        }
533        self.on_basic_cancel_received(method)
534    }
535    async fn basic_cancel_ok(&self, consumer_tag: &str) -> Result<()> {
536        if !self.status.connected() {
537            return Err(self.status.state_error("basic.cancel-ok"));
538        }
539
540        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(
541            protocol::basic::CancelOk {
542                consumer_tag: consumer_tag.into(),
543            },
544        ));
545
546        let (promise, send_resolver) = Promise::new();
547        if level_enabled!(Level::TRACE) {
548            promise.set_marker("basic.cancel-ok".into());
549        }
550        self.send_method_frame(method, send_resolver, None);
551        promise.await
552    }
553    fn receive_basic_cancel_ok(&self, method: protocol::basic::CancelOk) -> Result<()> {
554        if !self.status.can_receive_messages() {
555            return Err(self.status.state_error("basic.cancel-ok"));
556        }
557
558        match self.frames.find_expected_reply(self.id, |reply| {
559            matches!(&reply.0, Reply::BasicCancelOk(..))
560        }) {
561            Some(Reply::BasicCancelOk(resolver)) => {
562                let res = self.on_basic_cancel_ok_received(method);
563                resolver.complete(res.clone());
564                res
565            }
566            unexpected => self.handle_invalid_contents(
567                format!(
568                    "unexpected basic cancel-ok received on channel {}, was awaiting for {:?}",
569                    self.id, unexpected
570                ),
571                method.get_amqp_class_id(),
572                method.get_amqp_method_id(),
573            ),
574        }
575    }
576    pub async fn basic_publish(
577        &self,
578        exchange: &str,
579        routing_key: &str,
580        options: BasicPublishOptions,
581        payload: &[u8],
582        properties: BasicProperties,
583    ) -> Result<PublisherConfirm> {
584        if !self.status.connected() {
585            return Err(self.status.state_error("basic.publish"));
586        }
587
588        let start_hook_res = self.before_basic_publish();
589        let BasicPublishOptions {
590            mandatory,
591            immediate,
592        } = options;
593        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Publish(
594            protocol::basic::Publish {
595                exchange: exchange.into(),
596                routing_key: routing_key.into(),
597                mandatory,
598                immediate,
599            },
600        ));
601
602        self.send_method_frame_with_body(method, payload, properties, start_hook_res)
603            .await
604    }
605    fn receive_basic_return(&self, method: protocol::basic::Return) -> Result<()> {
606        if !self.status.can_receive_messages() {
607            return Err(self.status.state_error("basic.return"));
608        }
609        self.on_basic_return_received(method)
610    }
611    fn receive_basic_deliver(&self, method: protocol::basic::Deliver) -> Result<()> {
612        if !self.status.can_receive_messages() {
613            return Err(self.status.state_error("basic.deliver"));
614        }
615        self.on_basic_deliver_received(method)
616    }
617    async fn do_basic_get(
618        &self,
619        queue: &str,
620        options: BasicGetOptions,
621        original: Option<PromiseResolver<Option<BasicGetMessage>>>,
622    ) -> Result<Option<BasicGetMessage>> {
623        if !self.status.connected() {
624            return Err(self.status.state_error("basic.get"));
625        }
626
627        let BasicGetOptions { no_ack } = options;
628        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Get(protocol::basic::Get {
629            queue: queue.into(),
630            no_ack,
631        }));
632
633        let (promise, send_resolver) = Promise::new();
634        if level_enabled!(Level::TRACE) {
635            promise.set_marker("basic.get".into());
636        }
637        let ((promise, resolver), promise_out) = (Promise::new(), promise);
638        if level_enabled!(Level::TRACE) {
639            promise.set_marker("basic.get.Ok".into());
640        }
641        let resolver = original.unwrap_or(resolver);
642        self.send_method_frame(
643            method,
644            send_resolver,
645            Some(ExpectedReply(
646                Reply::BasicGetOk(resolver.clone()),
647                Box::new(resolver),
648            )),
649        );
650        promise_out.await?;
651        promise.await
652    }
653    fn receive_basic_get_ok(&self, method: protocol::basic::GetOk) -> Result<()> {
654        if !self.status.can_receive_messages() {
655            return Err(self.status.state_error("basic.get-ok"));
656        }
657
658        match self
659            .frames
660            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
661        {
662            Some(Reply::BasicGetOk(resolver)) => self.on_basic_get_ok_received(method, resolver),
663            unexpected => self.handle_invalid_contents(
664                format!(
665                    "unexpected basic get-ok received on channel {}, was awaiting for {:?}",
666                    self.id, unexpected
667                ),
668                method.get_amqp_class_id(),
669                method.get_amqp_method_id(),
670            ),
671        }
672    }
673    fn receive_basic_get_empty(&self, method: protocol::basic::GetEmpty) -> Result<()> {
674        if !self.status.can_receive_messages() {
675            return Err(self.status.state_error("basic.get-empty"));
676        }
677        self.on_basic_get_empty_received(method)
678    }
679    pub async fn basic_ack(
680        &self,
681        delivery_tag: LongLongUInt,
682        options: BasicAckOptions,
683    ) -> Result<()> {
684        if !self.status.connected() {
685            return Err(self.status.state_error("basic.ack"));
686        }
687
688        let BasicAckOptions { multiple } = options;
689        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(protocol::basic::Ack {
690            delivery_tag,
691            multiple,
692        }));
693
694        let (promise, send_resolver) = Promise::new();
695        if level_enabled!(Level::TRACE) {
696            promise.set_marker("basic.ack".into());
697        }
698        self.send_method_frame(method, send_resolver, None);
699        self.on_basic_ack_sent(multiple, delivery_tag);
700        promise.await
701    }
702    fn receive_basic_ack(&self, method: protocol::basic::Ack) -> Result<()> {
703        if !self.status.can_receive_messages() {
704            return Err(self.status.state_error("basic.ack"));
705        }
706        self.on_basic_ack_received(method)
707    }
708    pub async fn basic_reject(
709        &self,
710        delivery_tag: LongLongUInt,
711        options: BasicRejectOptions,
712    ) -> Result<()> {
713        if !self.status.connected() {
714            return Err(self.status.state_error("basic.reject"));
715        }
716
717        let BasicRejectOptions { requeue } = options;
718        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Reject(
719            protocol::basic::Reject {
720                delivery_tag,
721                requeue,
722            },
723        ));
724
725        let (promise, send_resolver) = Promise::new();
726        if level_enabled!(Level::TRACE) {
727            promise.set_marker("basic.reject".into());
728        }
729        self.send_method_frame(method, send_resolver, None);
730        promise.await
731    }
732    pub async fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> Result<()> {
733        if !self.status.connected() {
734            return Err(self.status.state_error("basic.recover-async"));
735        }
736
737        let BasicRecoverAsyncOptions { requeue } = options;
738        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverAsync(
739            protocol::basic::RecoverAsync { requeue },
740        ));
741
742        let (promise, send_resolver) = Promise::new();
743        if level_enabled!(Level::TRACE) {
744            promise.set_marker("basic.recover-async".into());
745        }
746        self.send_method_frame(method, send_resolver, None);
747        self.on_basic_recover_async_sent();
748        promise.await
749    }
750    pub async fn basic_recover(&self, options: BasicRecoverOptions) -> Result<()> {
751        if !self.status.connected() {
752            return Err(self.status.state_error("basic.recover"));
753        }
754
755        let BasicRecoverOptions { requeue } = options;
756        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Recover(
757            protocol::basic::Recover { requeue },
758        ));
759
760        let (promise, send_resolver) = Promise::new();
761        if level_enabled!(Level::TRACE) {
762            promise.set_marker("basic.recover".into());
763        }
764        let ((promise, resolver), promise_out) = (Promise::new(), promise);
765        if level_enabled!(Level::TRACE) {
766            promise.set_marker("basic.recover.Ok".into());
767        }
768        self.send_method_frame(
769            method,
770            send_resolver,
771            Some(ExpectedReply(
772                Reply::BasicRecoverOk(resolver.clone()),
773                Box::new(resolver),
774            )),
775        );
776        promise_out.await?;
777        promise.await
778    }
779    fn receive_basic_recover_ok(&self, method: protocol::basic::RecoverOk) -> Result<()> {
780        if !self.status.can_receive_messages() {
781            return Err(self.status.state_error("basic.recover-ok"));
782        }
783
784        match self.frames.find_expected_reply(self.id, |reply| {
785            matches!(&reply.0, Reply::BasicRecoverOk(..))
786        }) {
787            Some(Reply::BasicRecoverOk(resolver)) => {
788                let res = self.on_basic_recover_ok_received();
789                resolver.complete(res.clone());
790                res
791            }
792            unexpected => self.handle_invalid_contents(
793                format!(
794                    "unexpected basic recover-ok received on channel {}, was awaiting for {:?}",
795                    self.id, unexpected
796                ),
797                method.get_amqp_class_id(),
798                method.get_amqp_method_id(),
799            ),
800        }
801    }
802    pub async fn basic_nack(
803        &self,
804        delivery_tag: LongLongUInt,
805        options: BasicNackOptions,
806    ) -> Result<()> {
807        if !self.status.connected() {
808            return Err(self.status.state_error("basic.nack"));
809        }
810
811        let BasicNackOptions { multiple, requeue } = options;
812        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(protocol::basic::Nack {
813            delivery_tag,
814            multiple,
815            requeue,
816        }));
817
818        let (promise, send_resolver) = Promise::new();
819        if level_enabled!(Level::TRACE) {
820            promise.set_marker("basic.nack".into());
821        }
822        self.send_method_frame(method, send_resolver, None);
823        self.on_basic_nack_sent(multiple, delivery_tag);
824        promise.await
825    }
826    fn receive_basic_nack(&self, method: protocol::basic::Nack) -> Result<()> {
827        if !self.status.can_receive_messages() {
828            return Err(self.status.state_error("basic.nack"));
829        }
830        self.on_basic_nack_received(method)
831    }
832    fn receive_connection_start(&self, method: protocol::connection::Start) -> Result<()> {
833        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
834        if !self.status.can_receive_messages() {
835            return Err(self.status.state_error("connection.start"));
836        }
837        self.on_connection_start_received(method)
838    }
839    async fn connection_start_ok(
840        &self,
841        client_properties: FieldTable,
842        mechanism: &str,
843        response: &str,
844        locale: &str,
845        resolver: PromiseResolver<Connection>,
846        connection: Connection,
847        credentials: Credentials,
848    ) -> Result<()> {
849        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::StartOk(
850            protocol::connection::StartOk {
851                client_properties,
852                mechanism: mechanism.into(),
853                response: response.into(),
854                locale: locale.into(),
855            },
856        ));
857
858        let (promise, send_resolver) = Promise::new();
859        if level_enabled!(Level::TRACE) {
860            promise.set_marker("connection.start-ok".into());
861        }
862        self.before_connection_start_ok(resolver, connection, credentials);
863        self.send_method_frame(method, send_resolver, None);
864        promise.await
865    }
866    fn receive_connection_secure(&self, method: protocol::connection::Secure) -> Result<()> {
867        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
868        if !self.status.can_receive_messages() {
869            return Err(self.status.state_error("connection.secure"));
870        }
871        self.on_connection_secure_received(method)
872    }
873    async fn connection_secure_ok(&self, response: &str) -> Result<()> {
874        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::SecureOk(
875            protocol::connection::SecureOk {
876                response: response.into(),
877            },
878        ));
879
880        let (promise, send_resolver) = Promise::new();
881        if level_enabled!(Level::TRACE) {
882            promise.set_marker("connection.secure-ok".into());
883        }
884        self.send_method_frame(method, send_resolver, None);
885        promise.await
886    }
887    fn receive_connection_tune(&self, method: protocol::connection::Tune) -> Result<()> {
888        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
889        if !self.status.can_receive_messages() {
890            return Err(self.status.state_error("connection.tune"));
891        }
892        self.on_connection_tune_received(method)
893    }
894    async fn connection_tune_ok(
895        &self,
896        channel_max: ShortUInt,
897        frame_max: LongUInt,
898        heartbeat: ShortUInt,
899    ) -> Result<()> {
900        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::TuneOk(
901            protocol::connection::TuneOk {
902                channel_max,
903                frame_max,
904                heartbeat,
905            },
906        ));
907
908        let (promise, send_resolver) = Promise::new();
909        if level_enabled!(Level::TRACE) {
910            promise.set_marker("connection.tune-ok".into());
911        }
912        self.send_method_frame(method, send_resolver, None);
913        promise.await
914    }
915    pub(crate) async fn connection_open(
916        &self,
917        virtual_host: &str,
918        connection: Box<Connection>,
919        conn_resolver: PromiseResolver<Connection>,
920    ) -> Result<()> {
921        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Open(
922            protocol::connection::Open {
923                virtual_host: virtual_host.into(),
924            },
925        ));
926
927        let (promise, send_resolver) = Promise::new();
928        if level_enabled!(Level::TRACE) {
929            promise.set_marker("connection.open".into());
930        }
931        let ((promise, resolver), promise_out) = (Promise::new(), promise);
932        if level_enabled!(Level::TRACE) {
933            promise.set_marker("connection.open.Ok".into());
934        }
935        self.before_connection_open(conn_resolver);
936        self.send_method_frame(
937            method,
938            send_resolver,
939            Some(ExpectedReply(
940                Reply::ConnectionOpenOk(resolver.clone(), connection),
941                Box::new(resolver),
942            )),
943        );
944        promise_out.await?;
945        promise.await
946    }
947    fn receive_connection_open_ok(&self, method: protocol::connection::OpenOk) -> Result<()> {
948        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
949        if !self.status.can_receive_messages() {
950            return Err(self.status.state_error("connection.open-ok"));
951        }
952
953        match self.frames.find_expected_reply(self.id, |reply| {
954            matches!(&reply.0, Reply::ConnectionOpenOk(..))
955        }) {
956            Some(Reply::ConnectionOpenOk(resolver, connection)) => {
957                let res = self.on_connection_open_ok_received(method, connection);
958                resolver.complete(res.clone());
959                res
960            }
961            unexpected => self.handle_invalid_contents(
962                format!(
963                    "unexpected connection open-ok received on channel {}, was awaiting for {:?}",
964                    self.id, unexpected
965                ),
966                method.get_amqp_class_id(),
967                method.get_amqp_method_id(),
968            ),
969        }
970    }
971    pub(crate) async fn connection_close(
972        &self,
973        reply_code: ShortUInt,
974        reply_text: &str,
975        class_id: ShortUInt,
976        method_id: ShortUInt,
977    ) -> Result<()> {
978        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Close(
979            protocol::connection::Close {
980                reply_code,
981                reply_text: reply_text.into(),
982                class_id,
983                method_id,
984            },
985        ));
986
987        let (promise, send_resolver) = Promise::new();
988        if level_enabled!(Level::TRACE) {
989            promise.set_marker("connection.close".into());
990        }
991        let ((promise, resolver), promise_out) = (Promise::new(), promise);
992        if level_enabled!(Level::TRACE) {
993            promise.set_marker("connection.close.Ok".into());
994        }
995        self.send_method_frame(
996            method,
997            send_resolver,
998            Some(ExpectedReply(
999                Reply::ConnectionCloseOk(resolver.clone()),
1000                Box::new(resolver),
1001            )),
1002        );
1003        promise_out.await?;
1004        promise.await
1005    }
1006    fn receive_connection_close(&self, method: protocol::connection::Close) -> Result<()> {
1007        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1008        if !self.status.can_receive_messages() {
1009            return Err(self.status.state_error("connection.close"));
1010        }
1011        self.on_connection_close_received(method)
1012    }
1013    pub(crate) async fn connection_close_ok(&self, error: Error) -> Result<()> {
1014        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(
1015            protocol::connection::CloseOk {},
1016        ));
1017
1018        let (promise, send_resolver) = Promise::new();
1019        if level_enabled!(Level::TRACE) {
1020            promise.set_marker("connection.close-ok".into());
1021        }
1022        self.send_method_frame(method, send_resolver, None);
1023        self.on_connection_close_ok_sent(error);
1024        promise.await
1025    }
1026    fn receive_connection_close_ok(&self, method: protocol::connection::CloseOk) -> Result<()> {
1027        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1028        if !self.status.can_receive_messages() {
1029            return Err(self.status.state_error("connection.close-ok"));
1030        }
1031
1032        match self.frames.find_expected_reply(self.id, |reply| {
1033            matches!(&reply.0, Reply::ConnectionCloseOk(..))
1034        }) {
1035            Some(Reply::ConnectionCloseOk(resolver)) => {
1036                let res = self.on_connection_close_ok_received();
1037                resolver.complete(res.clone());
1038                res
1039            }
1040            unexpected => self.handle_invalid_contents(
1041                format!(
1042                    "unexpected connection close-ok received on channel {}, was awaiting for {:?}",
1043                    self.id, unexpected
1044                ),
1045                method.get_amqp_class_id(),
1046                method.get_amqp_method_id(),
1047            ),
1048        }
1049    }
1050    pub(crate) async fn connection_blocked(&self, reason: &str) -> Result<()> {
1051        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(
1052            protocol::connection::Blocked {
1053                reason: reason.into(),
1054            },
1055        ));
1056
1057        let (promise, send_resolver) = Promise::new();
1058        if level_enabled!(Level::TRACE) {
1059            promise.set_marker("connection.blocked".into());
1060        }
1061        self.send_method_frame(method, send_resolver, None);
1062        promise.await
1063    }
1064    fn receive_connection_blocked(&self, method: protocol::connection::Blocked) -> Result<()> {
1065        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1066        if !self.status.can_receive_messages() {
1067            return Err(self.status.state_error("connection.blocked"));
1068        }
1069        self.on_connection_blocked_received(method)
1070    }
1071    pub(crate) async fn connection_unblocked(&self) -> Result<()> {
1072        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(
1073            protocol::connection::Unblocked {},
1074        ));
1075
1076        let (promise, send_resolver) = Promise::new();
1077        if level_enabled!(Level::TRACE) {
1078            promise.set_marker("connection.unblocked".into());
1079        }
1080        self.send_method_frame(method, send_resolver, None);
1081        promise.await
1082    }
1083    fn receive_connection_unblocked(&self, method: protocol::connection::Unblocked) -> Result<()> {
1084        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1085        if !self.status.can_receive_messages() {
1086            return Err(self.status.state_error("connection.unblocked"));
1087        }
1088        self.on_connection_unblocked_received(method)
1089    }
1090    pub(crate) async fn connection_update_secret(
1091        &self,
1092        new_secret: &str,
1093        reason: &str,
1094    ) -> Result<()> {
1095        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecret(
1096            protocol::connection::UpdateSecret {
1097                new_secret: new_secret.into(),
1098                reason: reason.into(),
1099            },
1100        ));
1101
1102        let (promise, send_resolver) = Promise::new();
1103        if level_enabled!(Level::TRACE) {
1104            promise.set_marker("connection.update-secret".into());
1105        }
1106        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1107        if level_enabled!(Level::TRACE) {
1108            promise.set_marker("connection.update-secret.Ok".into());
1109        }
1110        self.send_method_frame(
1111            method,
1112            send_resolver,
1113            Some(ExpectedReply(
1114                Reply::ConnectionUpdateSecretOk(resolver.clone()),
1115                Box::new(resolver),
1116            )),
1117        );
1118        promise_out.await?;
1119        promise.await
1120    }
1121    fn receive_connection_update_secret_ok(
1122        &self,
1123        method: protocol::connection::UpdateSecretOk,
1124    ) -> Result<()> {
1125        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1126        if !self.status.can_receive_messages() {
1127            return Err(self.status.state_error("connection.update-secret-ok"));
1128        }
1129
1130        match self.frames.find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::ConnectionUpdateSecretOk(..))){
1131      Some(Reply::ConnectionUpdateSecretOk(resolver)) => {
1132        let res =        Ok(())
1133;
1134        resolver.complete(res.clone());
1135        res
1136},
1137      unexpected => {
1138        self.handle_invalid_contents(format!("unexpected connection update-secret-ok received on channel {}, was awaiting for {:?}", self.id, unexpected), method.get_amqp_class_id(), method.get_amqp_method_id())
1139      },
1140    }
1141    }
1142    pub(crate) async fn channel_open(&self, channel: Channel) -> Result<Channel> {
1143        if !self.status.initializing() {
1144            return Err(self.status.state_error("channel.open"));
1145        }
1146
1147        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Open(
1148            protocol::channel::Open {},
1149        ));
1150
1151        let (promise, send_resolver) = Promise::new();
1152        if level_enabled!(Level::TRACE) {
1153            promise.set_marker("channel.open".into());
1154        }
1155        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1156        if level_enabled!(Level::TRACE) {
1157            promise.set_marker("channel.open.Ok".into());
1158        }
1159        self.send_method_frame(
1160            method,
1161            send_resolver,
1162            Some(ExpectedReply(
1163                Reply::ChannelOpenOk(resolver.clone(), channel),
1164                Box::new(resolver),
1165            )),
1166        );
1167        promise_out.await?;
1168        promise.await
1169    }
1170    fn receive_channel_open_ok(&self, method: protocol::channel::OpenOk) -> Result<()> {
1171        if !self.status.initializing() {
1172            return Err(self.status.state_error("channel.open-ok"));
1173        }
1174
1175        match self.frames.find_expected_reply(self.id, |reply| {
1176            matches!(&reply.0, Reply::ChannelOpenOk(..))
1177        }) {
1178            Some(Reply::ChannelOpenOk(resolver, channel)) => {
1179                self.on_channel_open_ok_received(method, resolver, channel)
1180            }
1181            unexpected => self.handle_invalid_contents(
1182                format!(
1183                    "unexpected channel open-ok received on channel {}, was awaiting for {:?}",
1184                    self.id, unexpected
1185                ),
1186                method.get_amqp_class_id(),
1187                method.get_amqp_method_id(),
1188            ),
1189        }
1190    }
1191    pub async fn channel_flow(&self, options: ChannelFlowOptions) -> Result<Boolean> {
1192        if !self.status.connected() {
1193            return Err(self.status.state_error("channel.flow"));
1194        }
1195
1196        let ChannelFlowOptions { active } = options;
1197        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(
1198            protocol::channel::Flow { active },
1199        ));
1200
1201        let (promise, send_resolver) = Promise::new();
1202        if level_enabled!(Level::TRACE) {
1203            promise.set_marker("channel.flow".into());
1204        }
1205        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1206        if level_enabled!(Level::TRACE) {
1207            promise.set_marker("channel.flow.Ok".into());
1208        }
1209        self.send_method_frame(
1210            method,
1211            send_resolver,
1212            Some(ExpectedReply(
1213                Reply::ChannelFlowOk(resolver.clone()),
1214                Box::new(resolver),
1215            )),
1216        );
1217        promise_out.await?;
1218        promise.await
1219    }
1220    fn receive_channel_flow(&self, method: protocol::channel::Flow) -> Result<()> {
1221        if !self.status.can_receive_messages() {
1222            return Err(self.status.state_error("channel.flow"));
1223        }
1224        self.on_channel_flow_received(method)
1225    }
1226    async fn channel_flow_ok(&self, options: ChannelFlowOkOptions) -> Result<()> {
1227        if !self.status.connected() {
1228            return Err(self.status.state_error("channel.flow-ok"));
1229        }
1230
1231        let ChannelFlowOkOptions { active } = options;
1232        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(
1233            protocol::channel::FlowOk { active },
1234        ));
1235
1236        let (promise, send_resolver) = Promise::new();
1237        if level_enabled!(Level::TRACE) {
1238            promise.set_marker("channel.flow-ok".into());
1239        }
1240        self.send_method_frame(method, send_resolver, None);
1241        promise.await
1242    }
1243    fn receive_channel_flow_ok(&self, method: protocol::channel::FlowOk) -> Result<()> {
1244        if !self.status.can_receive_messages() {
1245            return Err(self.status.state_error("channel.flow-ok"));
1246        }
1247
1248        match self.frames.find_expected_reply(self.id, |reply| {
1249            matches!(&reply.0, Reply::ChannelFlowOk(..))
1250        }) {
1251            Some(Reply::ChannelFlowOk(resolver)) => {
1252                self.on_channel_flow_ok_received(method, resolver)
1253            }
1254            unexpected => self.handle_invalid_contents(
1255                format!(
1256                    "unexpected channel flow-ok received on channel {}, was awaiting for {:?}",
1257                    self.id, unexpected
1258                ),
1259                method.get_amqp_class_id(),
1260                method.get_amqp_method_id(),
1261            ),
1262        }
1263    }
1264    async fn do_channel_close(
1265        &self,
1266        reply_code: ShortUInt,
1267        reply_text: &str,
1268        class_id: ShortUInt,
1269        method_id: ShortUInt,
1270    ) -> Result<()> {
1271        if !self.status.connected() {
1272            return Err(self.status.state_error("channel.close"));
1273        }
1274
1275        self.before_channel_close();
1276        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Close(
1277            protocol::channel::Close {
1278                reply_code,
1279                reply_text: reply_text.into(),
1280                class_id,
1281                method_id,
1282            },
1283        ));
1284
1285        let (promise, send_resolver) = Promise::new();
1286        if level_enabled!(Level::TRACE) {
1287            promise.set_marker("channel.close".into());
1288        }
1289        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1290        if level_enabled!(Level::TRACE) {
1291            promise.set_marker("channel.close.Ok".into());
1292        }
1293        self.send_method_frame(
1294            method,
1295            send_resolver,
1296            Some(ExpectedReply(
1297                Reply::ChannelCloseOk(resolver.clone()),
1298                Box::new(resolver),
1299            )),
1300        );
1301        promise_out.await?;
1302        promise.await
1303    }
1304    fn receive_channel_close(&self, method: protocol::channel::Close) -> Result<()> {
1305        if !self.status.can_receive_messages() {
1306            return Err(self.status.state_error("channel.close"));
1307        }
1308        self.on_channel_close_received(method)
1309    }
1310    async fn channel_close_ok(&self, error: Option<Error>) -> Result<()> {
1311        if !self.status.closing() {
1312            return Err(self.status.state_error("channel.close-ok"));
1313        }
1314
1315        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(
1316            protocol::channel::CloseOk {},
1317        ));
1318
1319        let (promise, send_resolver) = Promise::new();
1320        if level_enabled!(Level::TRACE) {
1321            promise.set_marker("channel.close-ok".into());
1322        }
1323        self.send_method_frame(method, send_resolver, None);
1324        self.on_channel_close_ok_sent(error);
1325        promise.await
1326    }
1327    fn receive_channel_close_ok(&self, method: protocol::channel::CloseOk) -> Result<()> {
1328        if !self.status.can_receive_messages() {
1329            return Err(self.status.state_error("channel.close-ok"));
1330        }
1331
1332        match self.next_expected_close_ok_reply() {
1333            Some(Reply::ChannelCloseOk(resolver)) => {
1334                let res = self.on_channel_close_ok_received();
1335                resolver.complete(res.clone());
1336                res
1337            }
1338            unexpected => self.handle_invalid_contents(
1339                format!(
1340                    "unexpected channel close-ok received on channel {}, was awaiting for {:?}",
1341                    self.id, unexpected
1342                ),
1343                method.get_amqp_class_id(),
1344                method.get_amqp_method_id(),
1345            ),
1346        }
1347    }
1348    pub async fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Result<()> {
1349        if !self.status.connected() {
1350            return Err(self.status.state_error("access.request"));
1351        }
1352
1353        let AccessRequestOptions {
1354            exclusive,
1355            passive,
1356            active,
1357            write,
1358            read,
1359        } = options;
1360        let method = AMQPClass::Access(protocol::access::AMQPMethod::Request(
1361            protocol::access::Request {
1362                realm: realm.into(),
1363                exclusive,
1364                passive,
1365                active,
1366                write,
1367                read,
1368            },
1369        ));
1370
1371        let (promise, send_resolver) = Promise::new();
1372        if level_enabled!(Level::TRACE) {
1373            promise.set_marker("access.request".into());
1374        }
1375        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1376        if level_enabled!(Level::TRACE) {
1377            promise.set_marker("access.request.Ok".into());
1378        }
1379        self.send_method_frame(
1380            method,
1381            send_resolver,
1382            Some(ExpectedReply(
1383                Reply::AccessRequestOk(resolver.clone()),
1384                Box::new(resolver),
1385            )),
1386        );
1387        promise_out.await?;
1388        promise.await
1389    }
1390    fn receive_access_request_ok(&self, method: protocol::access::RequestOk) -> Result<()> {
1391        if !self.status.can_receive_messages() {
1392            return Err(self.status.state_error("access.request-ok"));
1393        }
1394
1395        match self.frames.find_expected_reply(self.id, |reply| {
1396            matches!(&reply.0, Reply::AccessRequestOk(..))
1397        }) {
1398            Some(Reply::AccessRequestOk(resolver)) => {
1399                let res = self.on_access_request_ok_received(method);
1400                resolver.complete(res.clone());
1401                res
1402            }
1403            unexpected => self.handle_invalid_contents(
1404                format!(
1405                    "unexpected access request-ok received on channel {}, was awaiting for {:?}",
1406                    self.id, unexpected
1407                ),
1408                method.get_amqp_class_id(),
1409                method.get_amqp_method_id(),
1410            ),
1411        }
1412    }
1413    async fn do_exchange_declare(
1414        &self,
1415        exchange: &str,
1416        kind: &str,
1417        options: ExchangeDeclareOptions,
1418        arguments: FieldTable,
1419        exchange_kind: ExchangeKind,
1420    ) -> Result<()> {
1421        if !self.status.connected() {
1422            return Err(self.status.state_error("exchange.declare"));
1423        }
1424
1425        let creation_arguments = arguments.clone();
1426        let ExchangeDeclareOptions {
1427            passive,
1428            durable,
1429            auto_delete,
1430            internal,
1431            nowait,
1432        } = options;
1433        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Declare(
1434            protocol::exchange::Declare {
1435                exchange: exchange.into(),
1436                kind: kind.into(),
1437                passive,
1438                durable,
1439                auto_delete,
1440                internal,
1441                nowait,
1442                arguments,
1443            },
1444        ));
1445
1446        let (promise, send_resolver) = Promise::new();
1447        if level_enabled!(Level::TRACE) {
1448            promise.set_marker("exchange.declare".into());
1449        }
1450        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1451        if level_enabled!(Level::TRACE) {
1452            promise.set_marker("exchange.declare.Ok".into());
1453        }
1454        self.send_method_frame(
1455            method,
1456            send_resolver,
1457            Some(ExpectedReply(
1458                Reply::ExchangeDeclareOk(
1459                    resolver.clone(),
1460                    exchange.into(),
1461                    exchange_kind,
1462                    options,
1463                    creation_arguments,
1464                ),
1465                Box::new(resolver),
1466            )),
1467        );
1468        if nowait {
1469            self.receive_exchange_declare_ok(protocol::exchange::DeclareOk {})?;
1470        }
1471        promise_out.await?;
1472        promise.await
1473    }
1474    fn receive_exchange_declare_ok(&self, method: protocol::exchange::DeclareOk) -> Result<()> {
1475        if !self.status.can_receive_messages() {
1476            return Err(self.status.state_error("exchange.declare-ok"));
1477        }
1478
1479        match self.frames.find_expected_reply(self.id, |reply| {
1480            matches!(&reply.0, Reply::ExchangeDeclareOk(..))
1481        }) {
1482            Some(Reply::ExchangeDeclareOk(
1483                resolver,
1484                exchange,
1485                exchange_kind,
1486                options,
1487                creation_arguments,
1488            )) => self.on_exchange_declare_ok_received(
1489                resolver,
1490                exchange,
1491                exchange_kind,
1492                options,
1493                creation_arguments,
1494            ),
1495            unexpected => self.handle_invalid_contents(
1496                format!(
1497                    "unexpected exchange declare-ok received on channel {}, was awaiting for {:?}",
1498                    self.id, unexpected
1499                ),
1500                method.get_amqp_class_id(),
1501                method.get_amqp_method_id(),
1502            ),
1503        }
1504    }
1505    /// Delete an exchange
1506    pub async fn exchange_delete(
1507        &self,
1508        exchange: &str,
1509        options: ExchangeDeleteOptions,
1510    ) -> Result<()> {
1511        if !self.status.connected() {
1512            return Err(self.status.state_error("exchange.delete"));
1513        }
1514
1515        let ExchangeDeleteOptions { if_unused, nowait } = options;
1516        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Delete(
1517            protocol::exchange::Delete {
1518                exchange: exchange.into(),
1519                if_unused,
1520                nowait,
1521            },
1522        ));
1523
1524        let (promise, send_resolver) = Promise::new();
1525        if level_enabled!(Level::TRACE) {
1526            promise.set_marker("exchange.delete".into());
1527        }
1528        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1529        if level_enabled!(Level::TRACE) {
1530            promise.set_marker("exchange.delete.Ok".into());
1531        }
1532        self.send_method_frame(
1533            method,
1534            send_resolver,
1535            Some(ExpectedReply(
1536                Reply::ExchangeDeleteOk(resolver.clone(), exchange.into()),
1537                Box::new(resolver),
1538            )),
1539        );
1540        if nowait {
1541            self.receive_exchange_delete_ok(protocol::exchange::DeleteOk {})?;
1542        }
1543        promise_out.await?;
1544        promise.await
1545    }
1546    fn receive_exchange_delete_ok(&self, method: protocol::exchange::DeleteOk) -> Result<()> {
1547        if !self.status.can_receive_messages() {
1548            return Err(self.status.state_error("exchange.delete-ok"));
1549        }
1550
1551        match self.frames.find_expected_reply(self.id, |reply| {
1552            matches!(&reply.0, Reply::ExchangeDeleteOk(..))
1553        }) {
1554            Some(Reply::ExchangeDeleteOk(resolver, exchange)) => {
1555                let res = self.on_exchange_delete_ok_received(exchange);
1556                resolver.complete(res.clone());
1557                res
1558            }
1559            unexpected => self.handle_invalid_contents(
1560                format!(
1561                    "unexpected exchange delete-ok received on channel {}, was awaiting for {:?}",
1562                    self.id, unexpected
1563                ),
1564                method.get_amqp_class_id(),
1565                method.get_amqp_method_id(),
1566            ),
1567        }
1568    }
1569    pub async fn exchange_bind(
1570        &self,
1571        destination: &str,
1572        source: &str,
1573        routing_key: &str,
1574        options: ExchangeBindOptions,
1575        arguments: FieldTable,
1576    ) -> Result<()> {
1577        if !self.status.connected() {
1578            return Err(self.status.state_error("exchange.bind"));
1579        }
1580
1581        let creation_arguments = arguments.clone();
1582        let ExchangeBindOptions { nowait } = options;
1583        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Bind(
1584            protocol::exchange::Bind {
1585                destination: destination.into(),
1586                source: source.into(),
1587                routing_key: routing_key.into(),
1588                nowait,
1589                arguments,
1590            },
1591        ));
1592
1593        let (promise, send_resolver) = Promise::new();
1594        if level_enabled!(Level::TRACE) {
1595            promise.set_marker("exchange.bind".into());
1596        }
1597        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1598        if level_enabled!(Level::TRACE) {
1599            promise.set_marker("exchange.bind.Ok".into());
1600        }
1601        self.send_method_frame(
1602            method,
1603            send_resolver,
1604            Some(ExpectedReply(
1605                Reply::ExchangeBindOk(
1606                    resolver.clone(),
1607                    destination.into(),
1608                    source.into(),
1609                    routing_key.into(),
1610                    creation_arguments,
1611                ),
1612                Box::new(resolver),
1613            )),
1614        );
1615        if nowait {
1616            self.receive_exchange_bind_ok(protocol::exchange::BindOk {})?;
1617        }
1618        promise_out.await?;
1619        promise.await
1620    }
1621    fn receive_exchange_bind_ok(&self, method: protocol::exchange::BindOk) -> Result<()> {
1622        if !self.status.can_receive_messages() {
1623            return Err(self.status.state_error("exchange.bind-ok"));
1624        }
1625
1626        match self.frames.find_expected_reply(self.id, |reply| {
1627            matches!(&reply.0, Reply::ExchangeBindOk(..))
1628        }) {
1629            Some(Reply::ExchangeBindOk(
1630                resolver,
1631                destination,
1632                source,
1633                routing_key,
1634                creation_arguments,
1635            )) => {
1636                let res = self.on_exchange_bind_ok_received(
1637                    destination,
1638                    source,
1639                    routing_key,
1640                    creation_arguments,
1641                );
1642                resolver.complete(res.clone());
1643                res
1644            }
1645            unexpected => self.handle_invalid_contents(
1646                format!(
1647                    "unexpected exchange bind-ok received on channel {}, was awaiting for {:?}",
1648                    self.id, unexpected
1649                ),
1650                method.get_amqp_class_id(),
1651                method.get_amqp_method_id(),
1652            ),
1653        }
1654    }
1655    pub async fn exchange_unbind(
1656        &self,
1657        destination: &str,
1658        source: &str,
1659        routing_key: &str,
1660        options: ExchangeUnbindOptions,
1661        arguments: FieldTable,
1662    ) -> Result<()> {
1663        if !self.status.connected() {
1664            return Err(self.status.state_error("exchange.unbind"));
1665        }
1666
1667        let creation_arguments = arguments.clone();
1668        let ExchangeUnbindOptions { nowait } = options;
1669        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Unbind(
1670            protocol::exchange::Unbind {
1671                destination: destination.into(),
1672                source: source.into(),
1673                routing_key: routing_key.into(),
1674                nowait,
1675                arguments,
1676            },
1677        ));
1678
1679        let (promise, send_resolver) = Promise::new();
1680        if level_enabled!(Level::TRACE) {
1681            promise.set_marker("exchange.unbind".into());
1682        }
1683        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1684        if level_enabled!(Level::TRACE) {
1685            promise.set_marker("exchange.unbind.Ok".into());
1686        }
1687        self.send_method_frame(
1688            method,
1689            send_resolver,
1690            Some(ExpectedReply(
1691                Reply::ExchangeUnbindOk(
1692                    resolver.clone(),
1693                    destination.into(),
1694                    source.into(),
1695                    routing_key.into(),
1696                    creation_arguments,
1697                ),
1698                Box::new(resolver),
1699            )),
1700        );
1701        if nowait {
1702            self.receive_exchange_unbind_ok(protocol::exchange::UnbindOk {})?;
1703        }
1704        promise_out.await?;
1705        promise.await
1706    }
1707    fn receive_exchange_unbind_ok(&self, method: protocol::exchange::UnbindOk) -> Result<()> {
1708        if !self.status.can_receive_messages() {
1709            return Err(self.status.state_error("exchange.unbind-ok"));
1710        }
1711
1712        match self.frames.find_expected_reply(self.id, |reply| {
1713            matches!(&reply.0, Reply::ExchangeUnbindOk(..))
1714        }) {
1715            Some(Reply::ExchangeUnbindOk(
1716                resolver,
1717                destination,
1718                source,
1719                routing_key,
1720                creation_arguments,
1721            )) => {
1722                let res = self.on_exchange_unbind_ok_received(
1723                    destination,
1724                    source,
1725                    routing_key,
1726                    creation_arguments,
1727                );
1728                resolver.complete(res.clone());
1729                res
1730            }
1731            unexpected => self.handle_invalid_contents(
1732                format!(
1733                    "unexpected exchange unbind-ok received on channel {}, was awaiting for {:?}",
1734                    self.id, unexpected
1735                ),
1736                method.get_amqp_class_id(),
1737                method.get_amqp_method_id(),
1738            ),
1739        }
1740    }
1741    pub async fn queue_declare(
1742        &self,
1743        queue: &str,
1744        options: QueueDeclareOptions,
1745        arguments: FieldTable,
1746    ) -> Result<Queue> {
1747        if !self.status.connected() {
1748            return Err(self.status.state_error("queue.declare"));
1749        }
1750
1751        let creation_arguments = arguments.clone();
1752        let QueueDeclareOptions {
1753            passive,
1754            durable,
1755            exclusive,
1756            auto_delete,
1757            nowait,
1758        } = options;
1759        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Declare(
1760            protocol::queue::Declare {
1761                queue: queue.into(),
1762                passive,
1763                durable,
1764                exclusive,
1765                auto_delete,
1766                nowait,
1767                arguments,
1768            },
1769        ));
1770
1771        let (promise, send_resolver) = Promise::new();
1772        if level_enabled!(Level::TRACE) {
1773            promise.set_marker("queue.declare".into());
1774        }
1775        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1776        if level_enabled!(Level::TRACE) {
1777            promise.set_marker("queue.declare.Ok".into());
1778        }
1779        self.send_method_frame(
1780            method,
1781            send_resolver,
1782            Some(ExpectedReply(
1783                Reply::QueueDeclareOk(resolver.clone(), options, creation_arguments),
1784                Box::new(resolver),
1785            )),
1786        );
1787        if nowait {
1788            self.receive_queue_declare_ok(protocol::queue::DeclareOk {
1789                queue: queue.into(),
1790                ..Default::default()
1791            })?;
1792        }
1793        promise_out.await?;
1794        promise.await
1795    }
1796    fn receive_queue_declare_ok(&self, method: protocol::queue::DeclareOk) -> Result<()> {
1797        if !self.status.can_receive_messages() {
1798            return Err(self.status.state_error("queue.declare-ok"));
1799        }
1800
1801        match self.frames.find_expected_reply(self.id, |reply| {
1802            matches!(&reply.0, Reply::QueueDeclareOk(..))
1803        }) {
1804            Some(Reply::QueueDeclareOk(resolver, options, creation_arguments)) => {
1805                self.on_queue_declare_ok_received(method, resolver, options, creation_arguments)
1806            }
1807            unexpected => self.handle_invalid_contents(
1808                format!(
1809                    "unexpected queue declare-ok received on channel {}, was awaiting for {:?}",
1810                    self.id, unexpected
1811                ),
1812                method.get_amqp_class_id(),
1813                method.get_amqp_method_id(),
1814            ),
1815        }
1816    }
1817    pub async fn queue_bind(
1818        &self,
1819        queue: &str,
1820        exchange: &str,
1821        routing_key: &str,
1822        options: QueueBindOptions,
1823        arguments: FieldTable,
1824    ) -> Result<()> {
1825        if !self.status.connected() {
1826            return Err(self.status.state_error("queue.bind"));
1827        }
1828
1829        let creation_arguments = arguments.clone();
1830        let QueueBindOptions { nowait } = options;
1831        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Bind(protocol::queue::Bind {
1832            queue: queue.into(),
1833            exchange: exchange.into(),
1834            routing_key: routing_key.into(),
1835            nowait,
1836            arguments,
1837        }));
1838
1839        let (promise, send_resolver) = Promise::new();
1840        if level_enabled!(Level::TRACE) {
1841            promise.set_marker("queue.bind".into());
1842        }
1843        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1844        if level_enabled!(Level::TRACE) {
1845            promise.set_marker("queue.bind.Ok".into());
1846        }
1847        self.send_method_frame(
1848            method,
1849            send_resolver,
1850            Some(ExpectedReply(
1851                Reply::QueueBindOk(
1852                    resolver.clone(),
1853                    queue.into(),
1854                    exchange.into(),
1855                    routing_key.into(),
1856                    creation_arguments,
1857                ),
1858                Box::new(resolver),
1859            )),
1860        );
1861        if nowait {
1862            self.receive_queue_bind_ok(protocol::queue::BindOk {})?;
1863        }
1864        promise_out.await?;
1865        promise.await
1866    }
1867    fn receive_queue_bind_ok(&self, method: protocol::queue::BindOk) -> Result<()> {
1868        if !self.status.can_receive_messages() {
1869            return Err(self.status.state_error("queue.bind-ok"));
1870        }
1871
1872        match self
1873            .frames
1874            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueueBindOk(..)))
1875        {
1876            Some(Reply::QueueBindOk(
1877                resolver,
1878                queue,
1879                exchange,
1880                routing_key,
1881                creation_arguments,
1882            )) => {
1883                let res = self.on_queue_bind_ok_received(
1884                    queue,
1885                    exchange,
1886                    routing_key,
1887                    creation_arguments,
1888                );
1889                resolver.complete(res.clone());
1890                res
1891            }
1892            unexpected => self.handle_invalid_contents(
1893                format!(
1894                    "unexpected queue bind-ok received on channel {}, was awaiting for {:?}",
1895                    self.id, unexpected
1896                ),
1897                method.get_amqp_class_id(),
1898                method.get_amqp_method_id(),
1899            ),
1900        }
1901    }
1902    pub async fn queue_purge(
1903        &self,
1904        queue: &str,
1905        options: QueuePurgeOptions,
1906    ) -> Result<MessageCount> {
1907        if !self.status.connected() {
1908            return Err(self.status.state_error("queue.purge"));
1909        }
1910
1911        let QueuePurgeOptions { nowait } = options;
1912        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Purge(protocol::queue::Purge {
1913            queue: queue.into(),
1914            nowait,
1915        }));
1916
1917        let (promise, send_resolver) = Promise::new();
1918        if level_enabled!(Level::TRACE) {
1919            promise.set_marker("queue.purge".into());
1920        }
1921        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1922        if level_enabled!(Level::TRACE) {
1923            promise.set_marker("queue.purge.Ok".into());
1924        }
1925        self.send_method_frame(
1926            method,
1927            send_resolver,
1928            Some(ExpectedReply(
1929                Reply::QueuePurgeOk(resolver.clone()),
1930                Box::new(resolver),
1931            )),
1932        );
1933        promise_out.await?;
1934        promise.await
1935    }
1936    fn receive_queue_purge_ok(&self, method: protocol::queue::PurgeOk) -> Result<()> {
1937        if !self.status.can_receive_messages() {
1938            return Err(self.status.state_error("queue.purge-ok"));
1939        }
1940
1941        match self
1942            .frames
1943            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueuePurgeOk(..)))
1944        {
1945            Some(Reply::QueuePurgeOk(resolver)) => {
1946                self.on_queue_purge_ok_received(method, resolver)
1947            }
1948            unexpected => self.handle_invalid_contents(
1949                format!(
1950                    "unexpected queue purge-ok received on channel {}, was awaiting for {:?}",
1951                    self.id, unexpected
1952                ),
1953                method.get_amqp_class_id(),
1954                method.get_amqp_method_id(),
1955            ),
1956        }
1957    }
1958    pub async fn queue_delete(
1959        &self,
1960        queue: &str,
1961        options: QueueDeleteOptions,
1962    ) -> Result<MessageCount> {
1963        if !self.status.connected() {
1964            return Err(self.status.state_error("queue.delete"));
1965        }
1966
1967        let QueueDeleteOptions {
1968            if_unused,
1969            if_empty,
1970            nowait,
1971        } = options;
1972        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Delete(
1973            protocol::queue::Delete {
1974                queue: queue.into(),
1975                if_unused,
1976                if_empty,
1977                nowait,
1978            },
1979        ));
1980
1981        let (promise, send_resolver) = Promise::new();
1982        if level_enabled!(Level::TRACE) {
1983            promise.set_marker("queue.delete".into());
1984        }
1985        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1986        if level_enabled!(Level::TRACE) {
1987            promise.set_marker("queue.delete.Ok".into());
1988        }
1989        self.send_method_frame(
1990            method,
1991            send_resolver,
1992            Some(ExpectedReply(
1993                Reply::QueueDeleteOk(resolver.clone(), queue.into()),
1994                Box::new(resolver),
1995            )),
1996        );
1997        if nowait {
1998            self.receive_queue_delete_ok(protocol::queue::DeleteOk {
1999                ..Default::default()
2000            })?;
2001        }
2002        promise_out.await?;
2003        promise.await
2004    }
2005    fn receive_queue_delete_ok(&self, method: protocol::queue::DeleteOk) -> Result<()> {
2006        if !self.status.can_receive_messages() {
2007            return Err(self.status.state_error("queue.delete-ok"));
2008        }
2009
2010        match self.frames.find_expected_reply(self.id, |reply| {
2011            matches!(&reply.0, Reply::QueueDeleteOk(..))
2012        }) {
2013            Some(Reply::QueueDeleteOk(resolver, queue)) => {
2014                self.on_queue_delete_ok_received(method, resolver, queue)
2015            }
2016            unexpected => self.handle_invalid_contents(
2017                format!(
2018                    "unexpected queue delete-ok received on channel {}, was awaiting for {:?}",
2019                    self.id, unexpected
2020                ),
2021                method.get_amqp_class_id(),
2022                method.get_amqp_method_id(),
2023            ),
2024        }
2025    }
2026    pub async fn queue_unbind(
2027        &self,
2028        queue: &str,
2029        exchange: &str,
2030        routing_key: &str,
2031        arguments: FieldTable,
2032    ) -> Result<()> {
2033        if !self.status.connected() {
2034            return Err(self.status.state_error("queue.unbind"));
2035        }
2036
2037        let creation_arguments = arguments.clone();
2038        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Unbind(
2039            protocol::queue::Unbind {
2040                queue: queue.into(),
2041                exchange: exchange.into(),
2042                routing_key: routing_key.into(),
2043                arguments,
2044            },
2045        ));
2046
2047        let (promise, send_resolver) = Promise::new();
2048        if level_enabled!(Level::TRACE) {
2049            promise.set_marker("queue.unbind".into());
2050        }
2051        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2052        if level_enabled!(Level::TRACE) {
2053            promise.set_marker("queue.unbind.Ok".into());
2054        }
2055        self.send_method_frame(
2056            method,
2057            send_resolver,
2058            Some(ExpectedReply(
2059                Reply::QueueUnbindOk(
2060                    resolver.clone(),
2061                    queue.into(),
2062                    exchange.into(),
2063                    routing_key.into(),
2064                    creation_arguments,
2065                ),
2066                Box::new(resolver),
2067            )),
2068        );
2069        promise_out.await?;
2070        promise.await
2071    }
2072    fn receive_queue_unbind_ok(&self, method: protocol::queue::UnbindOk) -> Result<()> {
2073        if !self.status.can_receive_messages() {
2074            return Err(self.status.state_error("queue.unbind-ok"));
2075        }
2076
2077        match self.frames.find_expected_reply(self.id, |reply| {
2078            matches!(&reply.0, Reply::QueueUnbindOk(..))
2079        }) {
2080            Some(Reply::QueueUnbindOk(
2081                resolver,
2082                queue,
2083                exchange,
2084                routing_key,
2085                creation_arguments,
2086            )) => {
2087                let res = self.on_queue_unbind_ok_received(
2088                    queue,
2089                    exchange,
2090                    routing_key,
2091                    creation_arguments,
2092                );
2093                resolver.complete(res.clone());
2094                res
2095            }
2096            unexpected => self.handle_invalid_contents(
2097                format!(
2098                    "unexpected queue unbind-ok received on channel {}, was awaiting for {:?}",
2099                    self.id, unexpected
2100                ),
2101                method.get_amqp_class_id(),
2102                method.get_amqp_method_id(),
2103            ),
2104        }
2105    }
2106    pub async fn tx_select(&self) -> Result<()> {
2107        if !self.status.connected() {
2108            return Err(self.status.state_error("tx.select"));
2109        }
2110
2111        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Select(protocol::tx::Select {}));
2112
2113        let (promise, send_resolver) = Promise::new();
2114        if level_enabled!(Level::TRACE) {
2115            promise.set_marker("tx.select".into());
2116        }
2117        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2118        if level_enabled!(Level::TRACE) {
2119            promise.set_marker("tx.select.Ok".into());
2120        }
2121        self.send_method_frame(
2122            method,
2123            send_resolver,
2124            Some(ExpectedReply(
2125                Reply::TxSelectOk(resolver.clone()),
2126                Box::new(resolver),
2127            )),
2128        );
2129        promise_out.await?;
2130        promise.await
2131    }
2132    fn receive_tx_select_ok(&self, method: protocol::tx::SelectOk) -> Result<()> {
2133        if !self.status.can_receive_messages() {
2134            return Err(self.status.state_error("tx.select-ok"));
2135        }
2136
2137        match self
2138            .frames
2139            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxSelectOk(..)))
2140        {
2141            Some(Reply::TxSelectOk(resolver)) => {
2142                let res = Ok(());
2143                resolver.complete(res.clone());
2144                res
2145            }
2146            unexpected => self.handle_invalid_contents(
2147                format!(
2148                    "unexpected tx select-ok received on channel {}, was awaiting for {:?}",
2149                    self.id, unexpected
2150                ),
2151                method.get_amqp_class_id(),
2152                method.get_amqp_method_id(),
2153            ),
2154        }
2155    }
2156    pub async fn tx_commit(&self) -> Result<()> {
2157        if !self.status.connected() {
2158            return Err(self.status.state_error("tx.commit"));
2159        }
2160
2161        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Commit(protocol::tx::Commit {}));
2162
2163        let (promise, send_resolver) = Promise::new();
2164        if level_enabled!(Level::TRACE) {
2165            promise.set_marker("tx.commit".into());
2166        }
2167        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2168        if level_enabled!(Level::TRACE) {
2169            promise.set_marker("tx.commit.Ok".into());
2170        }
2171        self.send_method_frame(
2172            method,
2173            send_resolver,
2174            Some(ExpectedReply(
2175                Reply::TxCommitOk(resolver.clone()),
2176                Box::new(resolver),
2177            )),
2178        );
2179        promise_out.await?;
2180        promise.await
2181    }
2182    fn receive_tx_commit_ok(&self, method: protocol::tx::CommitOk) -> Result<()> {
2183        if !self.status.can_receive_messages() {
2184            return Err(self.status.state_error("tx.commit-ok"));
2185        }
2186
2187        match self
2188            .frames
2189            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxCommitOk(..)))
2190        {
2191            Some(Reply::TxCommitOk(resolver)) => {
2192                let res = Ok(());
2193                resolver.complete(res.clone());
2194                res
2195            }
2196            unexpected => self.handle_invalid_contents(
2197                format!(
2198                    "unexpected tx commit-ok received on channel {}, was awaiting for {:?}",
2199                    self.id, unexpected
2200                ),
2201                method.get_amqp_class_id(),
2202                method.get_amqp_method_id(),
2203            ),
2204        }
2205    }
2206    pub async fn tx_rollback(&self) -> Result<()> {
2207        if !self.status.connected() {
2208            return Err(self.status.state_error("tx.rollback"));
2209        }
2210
2211        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Rollback(
2212            protocol::tx::Rollback {},
2213        ));
2214
2215        let (promise, send_resolver) = Promise::new();
2216        if level_enabled!(Level::TRACE) {
2217            promise.set_marker("tx.rollback".into());
2218        }
2219        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2220        if level_enabled!(Level::TRACE) {
2221            promise.set_marker("tx.rollback.Ok".into());
2222        }
2223        self.send_method_frame(
2224            method,
2225            send_resolver,
2226            Some(ExpectedReply(
2227                Reply::TxRollbackOk(resolver.clone()),
2228                Box::new(resolver),
2229            )),
2230        );
2231        promise_out.await?;
2232        promise.await
2233    }
2234    fn receive_tx_rollback_ok(&self, method: protocol::tx::RollbackOk) -> Result<()> {
2235        if !self.status.can_receive_messages() {
2236            return Err(self.status.state_error("tx.rollback-ok"));
2237        }
2238
2239        match self
2240            .frames
2241            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxRollbackOk(..)))
2242        {
2243            Some(Reply::TxRollbackOk(resolver)) => {
2244                let res = Ok(());
2245                resolver.complete(res.clone());
2246                res
2247            }
2248            unexpected => self.handle_invalid_contents(
2249                format!(
2250                    "unexpected tx rollback-ok received on channel {}, was awaiting for {:?}",
2251                    self.id, unexpected
2252                ),
2253                method.get_amqp_class_id(),
2254                method.get_amqp_method_id(),
2255            ),
2256        }
2257    }
2258    pub async fn confirm_select(&self, options: ConfirmSelectOptions) -> Result<()> {
2259        if !self.status.connected_or_recovering() {
2260            return Err(self.status.state_error("confirm.select"));
2261        }
2262
2263        let ConfirmSelectOptions { nowait } = options;
2264        let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::Select(
2265            protocol::confirm::Select { nowait },
2266        ));
2267
2268        let (promise, send_resolver) = Promise::new();
2269        if level_enabled!(Level::TRACE) {
2270            promise.set_marker("confirm.select".into());
2271        }
2272        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2273        if level_enabled!(Level::TRACE) {
2274            promise.set_marker("confirm.select.Ok".into());
2275        }
2276        self.send_method_frame(
2277            method,
2278            send_resolver,
2279            Some(ExpectedReply(
2280                Reply::ConfirmSelectOk(resolver.clone()),
2281                Box::new(resolver),
2282            )),
2283        );
2284        promise_out.await?;
2285        promise.await
2286    }
2287    fn receive_confirm_select_ok(&self, method: protocol::confirm::SelectOk) -> Result<()> {
2288        if !self.status.can_receive_messages() {
2289            return Err(self.status.state_error("confirm.select-ok"));
2290        }
2291
2292        match self.frames.find_expected_reply(self.id, |reply| {
2293            matches!(&reply.0, Reply::ConfirmSelectOk(..))
2294        }) {
2295            Some(Reply::ConfirmSelectOk(resolver)) => {
2296                let res = self.on_confirm_select_ok_received();
2297                resolver.complete(res.clone());
2298                res
2299            }
2300            unexpected => self.handle_invalid_contents(
2301                format!(
2302                    "unexpected confirm select-ok received on channel {}, was awaiting for {:?}",
2303                    self.id, unexpected
2304                ),
2305                method.get_amqp_class_id(),
2306                method.get_amqp_method_id(),
2307            ),
2308        }
2309    }
2310}