lapin/
generated.rs

1pub mod options {
2    use super::*;
3
4    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
5    pub struct BasicQosOptions {
6        #[serde(default)]
7        pub global: Boolean,
8    }
9
10    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
11    pub struct BasicConsumeOptions {
12        #[serde(default)]
13        pub no_local: Boolean,
14        #[serde(default)]
15        pub no_ack: Boolean,
16        #[serde(default)]
17        pub exclusive: Boolean,
18        #[serde(default)]
19        pub nowait: Boolean,
20    }
21
22    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
23    pub struct BasicCancelOptions {
24        #[serde(default)]
25        pub nowait: Boolean,
26    }
27
28    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
29    pub struct BasicPublishOptions {
30        #[serde(default)]
31        pub mandatory: Boolean,
32        #[serde(default)]
33        pub immediate: Boolean,
34    }
35
36    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
37    pub struct BasicDeliverOptions {
38        #[serde(default)]
39        pub redelivered: Boolean,
40    }
41
42    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
43    pub struct BasicGetOptions {
44        #[serde(default)]
45        pub no_ack: Boolean,
46    }
47
48    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
49    pub struct BasicGetOkOptions {
50        #[serde(default)]
51        pub redelivered: Boolean,
52    }
53
54    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
55    pub struct BasicAckOptions {
56        #[serde(default)]
57        pub multiple: Boolean,
58    }
59
60    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
61    pub struct BasicRejectOptions {
62        #[serde(default)]
63        pub requeue: Boolean,
64    }
65
66    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
67    pub struct BasicRecoverAsyncOptions {
68        #[serde(default)]
69        pub requeue: Boolean,
70    }
71
72    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
73    pub struct BasicRecoverOptions {
74        #[serde(default)]
75        pub requeue: Boolean,
76    }
77
78    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
79    pub struct BasicNackOptions {
80        #[serde(default)]
81        pub multiple: Boolean,
82        #[serde(default)]
83        pub requeue: Boolean,
84    }
85
86    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
87    pub struct ChannelFlowOptions {
88        #[serde(default)]
89        pub active: Boolean,
90    }
91
92    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
93    pub struct ChannelFlowOkOptions {
94        #[serde(default)]
95        pub active: Boolean,
96    }
97
98    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
99    pub struct AccessRequestOptions {
100        #[serde(default)]
101        pub exclusive: Boolean,
102        #[serde(default)]
103        pub passive: Boolean,
104        #[serde(default)]
105        pub active: Boolean,
106        #[serde(default)]
107        pub write: Boolean,
108        #[serde(default)]
109        pub read: Boolean,
110    }
111
112    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
113    pub struct ExchangeDeclareOptions {
114        #[serde(default)]
115        pub passive: Boolean,
116        #[serde(default)]
117        pub durable: Boolean,
118        #[serde(default)]
119        pub auto_delete: Boolean,
120        #[serde(default)]
121        pub internal: Boolean,
122        #[serde(default)]
123        pub nowait: Boolean,
124    }
125
126    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
127    pub struct ExchangeDeleteOptions {
128        #[serde(default)]
129        pub if_unused: Boolean,
130        #[serde(default)]
131        pub nowait: Boolean,
132    }
133
134    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
135    pub struct ExchangeBindOptions {
136        #[serde(default)]
137        pub nowait: Boolean,
138    }
139
140    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
141    pub struct ExchangeUnbindOptions {
142        #[serde(default)]
143        pub nowait: Boolean,
144    }
145
146    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
147    pub struct QueueDeclareOptions {
148        #[serde(default)]
149        pub passive: Boolean,
150        #[serde(default)]
151        pub durable: Boolean,
152        #[serde(default)]
153        pub exclusive: Boolean,
154        #[serde(default)]
155        pub auto_delete: Boolean,
156        #[serde(default)]
157        pub nowait: Boolean,
158    }
159
160    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
161    pub struct QueueBindOptions {
162        #[serde(default)]
163        pub nowait: Boolean,
164    }
165
166    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
167    pub struct QueuePurgeOptions {
168        #[serde(default)]
169        pub nowait: Boolean,
170    }
171
172    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
173    pub struct QueueDeleteOptions {
174        #[serde(default)]
175        pub if_unused: Boolean,
176        #[serde(default)]
177        pub if_empty: Boolean,
178        #[serde(default)]
179        pub nowait: Boolean,
180    }
181
182    #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
183    pub struct ConfirmSelectOptions {
184        #[serde(default)]
185        pub nowait: Boolean,
186    }
187}
188
189use options::*;
190
191#[derive(Debug)]
192#[allow(clippy::enum_variant_names)]
193pub(crate) enum Reply {
194    BasicQosOk(PromiseResolver<()>),
195    BasicConsumeOk(
196        PromiseResolver<Consumer>,
197        Option<Arc<ChannelCloser>>,
198        ShortString,
199        BasicConsumeOptions,
200        FieldTable,
201        Option<Consumer>,
202    ),
203    BasicCancelOk(PromiseResolver<()>),
204    BasicGetOk(
205        PromiseResolver<Option<BasicGetMessage>>,
206        ShortString,
207        BasicGetOptions,
208    ),
209    BasicRecoverOk(PromiseResolver<()>),
210    ConnectionOpenOk(PromiseResolver<()>, Connection),
211    ConnectionCloseOk(PromiseResolver<()>),
212    ConnectionUpdateSecretOk(PromiseResolver<()>),
213    ChannelOpenOk(PromiseResolver<Channel>, Channel),
214    ChannelFlowOk(PromiseResolver<Boolean>),
215    ChannelCloseOk(PromiseResolver<()>),
216    AccessRequestOk(PromiseResolver<()>),
217    ExchangeDeclareOk(
218        PromiseResolver<()>,
219        ShortString,
220        ExchangeKind,
221        ExchangeDeclareOptions,
222        FieldTable,
223    ),
224    ExchangeDeleteOk(PromiseResolver<()>, ShortString),
225    ExchangeBindOk(
226        PromiseResolver<()>,
227        ShortString,
228        ShortString,
229        ShortString,
230        FieldTable,
231    ),
232    ExchangeUnbindOk(
233        PromiseResolver<()>,
234        ShortString,
235        ShortString,
236        ShortString,
237        FieldTable,
238    ),
239    QueueDeclareOk(PromiseResolver<Queue>, QueueDeclareOptions, FieldTable),
240    QueueBindOk(
241        PromiseResolver<()>,
242        ShortString,
243        ShortString,
244        ShortString,
245        FieldTable,
246    ),
247    QueuePurgeOk(PromiseResolver<MessageCount>),
248    QueueDeleteOk(PromiseResolver<MessageCount>, ShortString),
249    QueueUnbindOk(
250        PromiseResolver<()>,
251        ShortString,
252        ShortString,
253        ShortString,
254        FieldTable,
255    ),
256    TxSelectOk(PromiseResolver<()>),
257    TxCommitOk(PromiseResolver<()>),
258    TxRollbackOk(PromiseResolver<()>),
259    ConfirmSelectOk(PromiseResolver<()>),
260}
261
262impl Channel {
263    pub(crate) fn receive_method(&self, method: AMQPClass) -> Result<()> {
264        match method {
265            AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk(m)) => self.receive_basic_qos_ok(m),
266            AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk(m)) => {
267                self.receive_basic_consume_ok(m)
268            }
269            AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(m)) => {
270                self.receive_basic_cancel(m)
271            }
272            AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(m)) => {
273                self.receive_basic_cancel_ok(m)
274            }
275            AMQPClass::Basic(protocol::basic::AMQPMethod::Return(m)) => {
276                self.receive_basic_return(m)
277            }
278            AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver(m)) => {
279                self.receive_basic_deliver(m)
280            }
281            AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk(m)) => self.receive_basic_get_ok(m),
282            AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty(m)) => {
283                self.receive_basic_get_empty(m)
284            }
285            AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(m)) => self.receive_basic_ack(m),
286            AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk(m)) => {
287                self.receive_basic_recover_ok(m)
288            }
289            AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(m)) => self.receive_basic_nack(m),
290            AMQPClass::Connection(protocol::connection::AMQPMethod::Start(m)) => {
291                self.receive_connection_start(m)
292            }
293            AMQPClass::Connection(protocol::connection::AMQPMethod::Secure(m)) => {
294                self.receive_connection_secure(m)
295            }
296            AMQPClass::Connection(protocol::connection::AMQPMethod::Tune(m)) => {
297                self.receive_connection_tune(m)
298            }
299            AMQPClass::Connection(protocol::connection::AMQPMethod::OpenOk(m)) => {
300                self.receive_connection_open_ok(m)
301            }
302            AMQPClass::Connection(protocol::connection::AMQPMethod::Close(m)) => {
303                self.receive_connection_close(m)
304            }
305            AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(m)) => {
306                self.receive_connection_close_ok(m)
307            }
308            AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(m)) => {
309                self.receive_connection_blocked(m)
310            }
311            AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(m)) => {
312                self.receive_connection_unblocked(m)
313            }
314            AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecretOk(m)) => {
315                self.receive_connection_update_secret_ok(m)
316            }
317            AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk(m)) => {
318                self.receive_channel_open_ok(m)
319            }
320            AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(m)) => {
321                self.receive_channel_flow(m)
322            }
323            AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(m)) => {
324                self.receive_channel_flow_ok(m)
325            }
326            AMQPClass::Channel(protocol::channel::AMQPMethod::Close(m)) => {
327                self.receive_channel_close(m)
328            }
329            AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(m)) => {
330                self.receive_channel_close_ok(m)
331            }
332            AMQPClass::Access(protocol::access::AMQPMethod::RequestOk(m)) => {
333                self.receive_access_request_ok(m)
334            }
335            AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk(m)) => {
336                self.receive_exchange_declare_ok(m)
337            }
338            AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk(m)) => {
339                self.receive_exchange_delete_ok(m)
340            }
341            AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk(m)) => {
342                self.receive_exchange_bind_ok(m)
343            }
344            AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk(m)) => {
345                self.receive_exchange_unbind_ok(m)
346            }
347            AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk(m)) => {
348                self.receive_queue_declare_ok(m)
349            }
350            AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk(m)) => {
351                self.receive_queue_bind_ok(m)
352            }
353            AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk(m)) => {
354                self.receive_queue_purge_ok(m)
355            }
356            AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk(m)) => {
357                self.receive_queue_delete_ok(m)
358            }
359            AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk(m)) => {
360                self.receive_queue_unbind_ok(m)
361            }
362            AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk(m)) => self.receive_tx_select_ok(m),
363            AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk(m)) => self.receive_tx_commit_ok(m),
364            AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk(m)) => {
365                self.receive_tx_rollback_ok(m)
366            }
367            AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk(m)) => {
368                self.receive_confirm_select_ok(m)
369            }
370            m => {
371                error!(method=?m, "The client should not receive this method");
372                self.handle_invalid_contents(
373                    format!("unexpected method received on channel {}", self.id),
374                    m.get_amqp_class_id(),
375                    m.get_amqp_method_id(),
376                )
377            }
378        }
379    }
380
381    pub async fn basic_qos(
382        &self,
383        prefetch_count: ShortUInt,
384        options: BasicQosOptions,
385    ) -> Result<()> {
386        if !self.status.connected() {
387            return Err(Error::InvalidChannelState(self.status.state()));
388        }
389
390        let BasicQosOptions { global } = options;
391        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Qos(protocol::basic::Qos {
392            prefetch_count,
393            global,
394        }));
395
396        let (promise, send_resolver) = Promise::new();
397        if level_enabled!(Level::TRACE) {
398            promise.set_marker("basic.qos".into());
399        }
400        let ((promise, resolver), promise_out) = (Promise::new(), promise);
401        if level_enabled!(Level::TRACE) {
402            promise.set_marker("basic.qos.Ok".into());
403        }
404        self.send_method_frame(
405            method,
406            send_resolver,
407            Some(ExpectedReply(
408                Reply::BasicQosOk(resolver.clone()),
409                Box::new(resolver),
410            )),
411        );
412        promise_out.await?;
413        promise.await
414    }
415    fn receive_basic_qos_ok(&self, method: protocol::basic::QosOk) -> Result<()> {
416        if !self.status.can_receive_messages() {
417            return Err(Error::InvalidChannelState(self.status.state()));
418        }
419
420        match self
421            .frames
422            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicQosOk(..)))
423        {
424            Some(Reply::BasicQosOk(resolver)) => {
425                let res = Ok(());
426                resolver.swear(res.clone());
427                res
428            }
429            unexpected => self.handle_invalid_contents(
430                format!(
431                    "unexpected basic qos-ok received on channel {}, was awaiting for {:?}",
432                    self.id, unexpected
433                ),
434                method.get_amqp_class_id(),
435                method.get_amqp_method_id(),
436            ),
437        }
438    }
439    async fn do_basic_consume(
440        &self,
441        queue: &str,
442        consumer_tag: &str,
443        options: BasicConsumeOptions,
444        arguments: FieldTable,
445        original: Option<Consumer>,
446    ) -> Result<Consumer> {
447        if !self.status.connected() {
448            return Err(Error::InvalidChannelState(self.status.state()));
449        }
450
451        let creation_arguments = arguments.clone();
452        let BasicConsumeOptions {
453            no_local,
454            no_ack,
455            exclusive,
456            nowait,
457        } = options;
458        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Consume(
459            protocol::basic::Consume {
460                queue: queue.into(),
461                consumer_tag: consumer_tag.into(),
462                no_local,
463                no_ack,
464                exclusive,
465                nowait,
466                arguments,
467            },
468        ));
469
470        let (promise, send_resolver) = Promise::new();
471        if level_enabled!(Level::TRACE) {
472            promise.set_marker("basic.consume".into());
473        }
474        let ((promise, resolver), promise_out) = (Promise::new(), promise);
475        if level_enabled!(Level::TRACE) {
476            promise.set_marker("basic.consume.Ok".into());
477        }
478        self.send_method_frame(
479            method,
480            send_resolver,
481            Some(ExpectedReply(
482                Reply::BasicConsumeOk(
483                    resolver.clone(),
484                    self.channel_closer.clone(),
485                    queue.into(),
486                    options,
487                    creation_arguments,
488                    original,
489                ),
490                Box::new(resolver),
491            )),
492        );
493        if nowait {
494            self.receive_basic_consume_ok(protocol::basic::ConsumeOk {
495                consumer_tag: consumer_tag.into(),
496            })?;
497        }
498        promise_out.await?;
499        promise.await
500    }
501    fn receive_basic_consume_ok(&self, method: protocol::basic::ConsumeOk) -> Result<()> {
502        if !self.status.can_receive_messages() {
503            return Err(Error::InvalidChannelState(self.status.state()));
504        }
505
506        match self.frames.find_expected_reply(self.id, |reply| {
507            matches!(&reply.0, Reply::BasicConsumeOk(..))
508        }) {
509            Some(Reply::BasicConsumeOk(
510                resolver,
511                channel_closer,
512                queue,
513                options,
514                creation_arguments,
515                original,
516            )) => self.on_basic_consume_ok_received(
517                method,
518                resolver,
519                channel_closer,
520                queue,
521                options,
522                creation_arguments,
523                original,
524            ),
525            unexpected => self.handle_invalid_contents(
526                format!(
527                    "unexpected basic consume-ok received on channel {}, was awaiting for {:?}",
528                    self.id, unexpected
529                ),
530                method.get_amqp_class_id(),
531                method.get_amqp_method_id(),
532            ),
533        }
534    }
535    pub async fn basic_cancel(
536        &self,
537        consumer_tag: &str,
538        options: BasicCancelOptions,
539    ) -> Result<()> {
540        if !self.status.connected() {
541            return Err(Error::InvalidChannelState(self.status.state()));
542        }
543
544        self.before_basic_cancel(consumer_tag);
545        let BasicCancelOptions { nowait } = options;
546        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(
547            protocol::basic::Cancel {
548                consumer_tag: consumer_tag.into(),
549                nowait,
550            },
551        ));
552
553        let (promise, send_resolver) = Promise::new();
554        if level_enabled!(Level::TRACE) {
555            promise.set_marker("basic.cancel".into());
556        }
557        let ((promise, resolver), promise_out) = (Promise::new(), promise);
558        if level_enabled!(Level::TRACE) {
559            promise.set_marker("basic.cancel.Ok".into());
560        }
561        self.send_method_frame(
562            method,
563            send_resolver,
564            Some(ExpectedReply(
565                Reply::BasicCancelOk(resolver.clone()),
566                Box::new(resolver),
567            )),
568        );
569        if nowait {
570            self.receive_basic_cancel_ok(protocol::basic::CancelOk {
571                consumer_tag: consumer_tag.into(),
572            })?;
573        }
574        promise_out.await?;
575        promise.await
576    }
577    fn receive_basic_cancel(&self, method: protocol::basic::Cancel) -> Result<()> {
578        if !self.status.can_receive_messages() {
579            return Err(Error::InvalidChannelState(self.status.state()));
580        }
581        self.on_basic_cancel_received(method)
582    }
583    async fn basic_cancel_ok(&self, consumer_tag: &str) -> Result<()> {
584        if !self.status.connected() {
585            return Err(Error::InvalidChannelState(self.status.state()));
586        }
587
588        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(
589            protocol::basic::CancelOk {
590                consumer_tag: consumer_tag.into(),
591            },
592        ));
593
594        let (promise, send_resolver) = Promise::new();
595        if level_enabled!(Level::TRACE) {
596            promise.set_marker("basic.cancel-ok".into());
597        }
598        self.send_method_frame(method, send_resolver, None);
599        promise.await
600    }
601    fn receive_basic_cancel_ok(&self, method: protocol::basic::CancelOk) -> Result<()> {
602        if !self.status.can_receive_messages() {
603            return Err(Error::InvalidChannelState(self.status.state()));
604        }
605
606        match self.frames.find_expected_reply(self.id, |reply| {
607            matches!(&reply.0, Reply::BasicCancelOk(..))
608        }) {
609            Some(Reply::BasicCancelOk(resolver)) => {
610                let res = self.on_basic_cancel_ok_received(method);
611                resolver.swear(res.clone());
612                res
613            }
614            unexpected => self.handle_invalid_contents(
615                format!(
616                    "unexpected basic cancel-ok received on channel {}, was awaiting for {:?}",
617                    self.id, unexpected
618                ),
619                method.get_amqp_class_id(),
620                method.get_amqp_method_id(),
621            ),
622        }
623    }
624    pub async fn basic_publish(
625        &self,
626        exchange: &str,
627        routing_key: &str,
628        options: BasicPublishOptions,
629        payload: &[u8],
630        properties: BasicProperties,
631    ) -> Result<PublisherConfirm> {
632        if !self.status.connected() {
633            return Err(Error::InvalidChannelState(self.status.state()));
634        }
635
636        let start_hook_res = self.before_basic_publish();
637        let BasicPublishOptions {
638            mandatory,
639            immediate,
640        } = options;
641        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Publish(
642            protocol::basic::Publish {
643                exchange: exchange.into(),
644                routing_key: routing_key.into(),
645                mandatory,
646                immediate,
647            },
648        ));
649
650        self.send_method_frame_with_body(method, payload, properties, start_hook_res)
651            .await
652    }
653    fn receive_basic_return(&self, method: protocol::basic::Return) -> Result<()> {
654        if !self.status.can_receive_messages() {
655            return Err(Error::InvalidChannelState(self.status.state()));
656        }
657        self.on_basic_return_received(method)
658    }
659    fn receive_basic_deliver(&self, method: protocol::basic::Deliver) -> Result<()> {
660        if !self.status.can_receive_messages() {
661            return Err(Error::InvalidChannelState(self.status.state()));
662        }
663        self.on_basic_deliver_received(method)
664    }
665    async fn do_basic_get(
666        &self,
667        queue: &str,
668        options: BasicGetOptions,
669        original: Option<PromiseResolver<Option<BasicGetMessage>>>,
670    ) -> Result<Option<BasicGetMessage>> {
671        if !self.status.connected() {
672            return Err(Error::InvalidChannelState(self.status.state()));
673        }
674
675        let BasicGetOptions { no_ack } = options;
676        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Get(protocol::basic::Get {
677            queue: queue.into(),
678            no_ack,
679        }));
680
681        let (promise, send_resolver) = Promise::new();
682        if level_enabled!(Level::TRACE) {
683            promise.set_marker("basic.get".into());
684        }
685        let ((promise, resolver), promise_out) = (Promise::new(), promise);
686        if level_enabled!(Level::TRACE) {
687            promise.set_marker("basic.get.Ok".into());
688        }
689        let resolver = original.unwrap_or(resolver);
690        self.send_method_frame(
691            method,
692            send_resolver,
693            Some(ExpectedReply(
694                Reply::BasicGetOk(resolver.clone(), queue.into(), options),
695                Box::new(resolver),
696            )),
697        );
698        promise_out.await?;
699        promise.await
700    }
701    fn receive_basic_get_ok(&self, method: protocol::basic::GetOk) -> Result<()> {
702        if !self.status.can_receive_messages() {
703            return Err(Error::InvalidChannelState(self.status.state()));
704        }
705
706        match self
707            .frames
708            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
709        {
710            Some(Reply::BasicGetOk(resolver, queue, options)) => {
711                self.on_basic_get_ok_received(method, resolver, queue, options)
712            }
713            unexpected => self.handle_invalid_contents(
714                format!(
715                    "unexpected basic get-ok received on channel {}, was awaiting for {:?}",
716                    self.id, unexpected
717                ),
718                method.get_amqp_class_id(),
719                method.get_amqp_method_id(),
720            ),
721        }
722    }
723    fn receive_basic_get_empty(&self, method: protocol::basic::GetEmpty) -> Result<()> {
724        if !self.status.can_receive_messages() {
725            return Err(Error::InvalidChannelState(self.status.state()));
726        }
727        self.on_basic_get_empty_received(method)
728    }
729    pub async fn basic_ack(
730        &self,
731        delivery_tag: LongLongUInt,
732        options: BasicAckOptions,
733    ) -> Result<()> {
734        if !self.status.connected() {
735            return Err(Error::InvalidChannelState(self.status.state()));
736        }
737
738        let BasicAckOptions { multiple } = options;
739        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(protocol::basic::Ack {
740            delivery_tag,
741            multiple,
742        }));
743
744        let (promise, send_resolver) = Promise::new();
745        if level_enabled!(Level::TRACE) {
746            promise.set_marker("basic.ack".into());
747        }
748        self.send_method_frame(method, send_resolver, None);
749        self.on_basic_ack_sent(multiple, delivery_tag);
750        promise.await
751    }
752    fn receive_basic_ack(&self, method: protocol::basic::Ack) -> Result<()> {
753        if !self.status.can_receive_messages() {
754            return Err(Error::InvalidChannelState(self.status.state()));
755        }
756        self.on_basic_ack_received(method)
757    }
758    pub async fn basic_reject(
759        &self,
760        delivery_tag: LongLongUInt,
761        options: BasicRejectOptions,
762    ) -> Result<()> {
763        if !self.status.connected() {
764            return Err(Error::InvalidChannelState(self.status.state()));
765        }
766
767        let BasicRejectOptions { requeue } = options;
768        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Reject(
769            protocol::basic::Reject {
770                delivery_tag,
771                requeue,
772            },
773        ));
774
775        let (promise, send_resolver) = Promise::new();
776        if level_enabled!(Level::TRACE) {
777            promise.set_marker("basic.reject".into());
778        }
779        self.send_method_frame(method, send_resolver, None);
780        promise.await
781    }
782    pub async fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> Result<()> {
783        if !self.status.connected() {
784            return Err(Error::InvalidChannelState(self.status.state()));
785        }
786
787        let BasicRecoverAsyncOptions { requeue } = options;
788        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverAsync(
789            protocol::basic::RecoverAsync { requeue },
790        ));
791
792        let (promise, send_resolver) = Promise::new();
793        if level_enabled!(Level::TRACE) {
794            promise.set_marker("basic.recover-async".into());
795        }
796        self.send_method_frame(method, send_resolver, None);
797        self.on_basic_recover_async_sent();
798        promise.await
799    }
800    pub async fn basic_recover(&self, options: BasicRecoverOptions) -> Result<()> {
801        if !self.status.connected() {
802            return Err(Error::InvalidChannelState(self.status.state()));
803        }
804
805        let BasicRecoverOptions { requeue } = options;
806        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Recover(
807            protocol::basic::Recover { requeue },
808        ));
809
810        let (promise, send_resolver) = Promise::new();
811        if level_enabled!(Level::TRACE) {
812            promise.set_marker("basic.recover".into());
813        }
814        let ((promise, resolver), promise_out) = (Promise::new(), promise);
815        if level_enabled!(Level::TRACE) {
816            promise.set_marker("basic.recover.Ok".into());
817        }
818        self.send_method_frame(
819            method,
820            send_resolver,
821            Some(ExpectedReply(
822                Reply::BasicRecoverOk(resolver.clone()),
823                Box::new(resolver),
824            )),
825        );
826        promise_out.await?;
827        promise.await
828    }
829    fn receive_basic_recover_ok(&self, method: protocol::basic::RecoverOk) -> Result<()> {
830        if !self.status.can_receive_messages() {
831            return Err(Error::InvalidChannelState(self.status.state()));
832        }
833
834        match self.frames.find_expected_reply(self.id, |reply| {
835            matches!(&reply.0, Reply::BasicRecoverOk(..))
836        }) {
837            Some(Reply::BasicRecoverOk(resolver)) => {
838                let res = self.on_basic_recover_ok_received();
839                resolver.swear(res.clone());
840                res
841            }
842            unexpected => self.handle_invalid_contents(
843                format!(
844                    "unexpected basic recover-ok received on channel {}, was awaiting for {:?}",
845                    self.id, unexpected
846                ),
847                method.get_amqp_class_id(),
848                method.get_amqp_method_id(),
849            ),
850        }
851    }
852    pub async fn basic_nack(
853        &self,
854        delivery_tag: LongLongUInt,
855        options: BasicNackOptions,
856    ) -> Result<()> {
857        if !self.status.connected() {
858            return Err(Error::InvalidChannelState(self.status.state()));
859        }
860
861        let BasicNackOptions { multiple, requeue } = options;
862        let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(protocol::basic::Nack {
863            delivery_tag,
864            multiple,
865            requeue,
866        }));
867
868        let (promise, send_resolver) = Promise::new();
869        if level_enabled!(Level::TRACE) {
870            promise.set_marker("basic.nack".into());
871        }
872        self.send_method_frame(method, send_resolver, None);
873        self.on_basic_nack_sent(multiple, delivery_tag);
874        promise.await
875    }
876    fn receive_basic_nack(&self, method: protocol::basic::Nack) -> Result<()> {
877        if !self.status.can_receive_messages() {
878            return Err(Error::InvalidChannelState(self.status.state()));
879        }
880        self.on_basic_nack_received(method)
881    }
882    fn receive_connection_start(&self, method: protocol::connection::Start) -> Result<()> {
883        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
884        if !self.status.can_receive_messages() {
885            return Err(Error::InvalidChannelState(self.status.state()));
886        }
887        self.on_connection_start_received(method)
888    }
889    async fn connection_start_ok(
890        &self,
891        client_properties: FieldTable,
892        mechanism: &str,
893        response: &str,
894        locale: &str,
895        resolver: PromiseResolver<Connection>,
896        connection: Connection,
897        credentials: Credentials,
898    ) -> Result<()> {
899        if !self.status.connected() {
900            return Err(Error::InvalidChannelState(self.status.state()));
901        }
902
903        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::StartOk(
904            protocol::connection::StartOk {
905                client_properties,
906                mechanism: mechanism.into(),
907                response: response.into(),
908                locale: locale.into(),
909            },
910        ));
911
912        let (promise, send_resolver) = Promise::new();
913        if level_enabled!(Level::TRACE) {
914            promise.set_marker("connection.start-ok".into());
915        }
916        self.before_connection_start_ok(resolver, connection, credentials);
917        self.send_method_frame(method, send_resolver, None);
918        promise.await
919    }
920    fn receive_connection_secure(&self, method: protocol::connection::Secure) -> Result<()> {
921        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
922        if !self.status.can_receive_messages() {
923            return Err(Error::InvalidChannelState(self.status.state()));
924        }
925        self.on_connection_secure_received(method)
926    }
927    async fn connection_secure_ok(&self, response: &str) -> Result<()> {
928        if !self.status.connected() {
929            return Err(Error::InvalidChannelState(self.status.state()));
930        }
931
932        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::SecureOk(
933            protocol::connection::SecureOk {
934                response: response.into(),
935            },
936        ));
937
938        let (promise, send_resolver) = Promise::new();
939        if level_enabled!(Level::TRACE) {
940            promise.set_marker("connection.secure-ok".into());
941        }
942        self.send_method_frame(method, send_resolver, None);
943        promise.await
944    }
945    fn receive_connection_tune(&self, method: protocol::connection::Tune) -> Result<()> {
946        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
947        if !self.status.can_receive_messages() {
948            return Err(Error::InvalidChannelState(self.status.state()));
949        }
950        self.on_connection_tune_received(method)
951    }
952    async fn connection_tune_ok(
953        &self,
954        channel_max: ShortUInt,
955        frame_max: LongUInt,
956        heartbeat: ShortUInt,
957    ) -> Result<()> {
958        if !self.status.connected() {
959            return Err(Error::InvalidChannelState(self.status.state()));
960        }
961
962        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::TuneOk(
963            protocol::connection::TuneOk {
964                channel_max,
965                frame_max,
966                heartbeat,
967            },
968        ));
969
970        let (promise, send_resolver) = Promise::new();
971        if level_enabled!(Level::TRACE) {
972            promise.set_marker("connection.tune-ok".into());
973        }
974        self.send_method_frame(method, send_resolver, None);
975        promise.await
976    }
977    pub(crate) async fn connection_open(
978        &self,
979        virtual_host: &str,
980        connection: Connection,
981        conn_resolver: PromiseResolver<Connection>,
982    ) -> Result<()> {
983        if !self.status.connected() {
984            return Err(Error::InvalidChannelState(self.status.state()));
985        }
986
987        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Open(
988            protocol::connection::Open {
989                virtual_host: virtual_host.into(),
990            },
991        ));
992
993        let (promise, send_resolver) = Promise::new();
994        if level_enabled!(Level::TRACE) {
995            promise.set_marker("connection.open".into());
996        }
997        let ((promise, resolver), promise_out) = (Promise::new(), promise);
998        if level_enabled!(Level::TRACE) {
999            promise.set_marker("connection.open.Ok".into());
1000        }
1001        self.before_connection_open(conn_resolver);
1002        self.send_method_frame(
1003            method,
1004            send_resolver,
1005            Some(ExpectedReply(
1006                Reply::ConnectionOpenOk(resolver.clone(), connection),
1007                Box::new(resolver),
1008            )),
1009        );
1010        promise_out.await?;
1011        promise.await
1012    }
1013    fn receive_connection_open_ok(&self, method: protocol::connection::OpenOk) -> Result<()> {
1014        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1015        if !self.status.can_receive_messages() {
1016            return Err(Error::InvalidChannelState(self.status.state()));
1017        }
1018
1019        match self.frames.find_expected_reply(self.id, |reply| {
1020            matches!(&reply.0, Reply::ConnectionOpenOk(..))
1021        }) {
1022            Some(Reply::ConnectionOpenOk(resolver, connection)) => {
1023                let res = self.on_connection_open_ok_received(method, connection);
1024                resolver.swear(res.clone());
1025                res
1026            }
1027            unexpected => self.handle_invalid_contents(
1028                format!(
1029                    "unexpected connection open-ok received on channel {}, was awaiting for {:?}",
1030                    self.id, unexpected
1031                ),
1032                method.get_amqp_class_id(),
1033                method.get_amqp_method_id(),
1034            ),
1035        }
1036    }
1037    pub(crate) async fn connection_close(
1038        &self,
1039        reply_code: ShortUInt,
1040        reply_text: &str,
1041        class_id: ShortUInt,
1042        method_id: ShortUInt,
1043    ) -> Result<()> {
1044        if !self.status.closing() {
1045            return Err(Error::InvalidChannelState(self.status.state()));
1046        }
1047
1048        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Close(
1049            protocol::connection::Close {
1050                reply_code,
1051                reply_text: reply_text.into(),
1052                class_id,
1053                method_id,
1054            },
1055        ));
1056
1057        let (promise, send_resolver) = Promise::new();
1058        if level_enabled!(Level::TRACE) {
1059            promise.set_marker("connection.close".into());
1060        }
1061        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1062        if level_enabled!(Level::TRACE) {
1063            promise.set_marker("connection.close.Ok".into());
1064        }
1065        self.send_method_frame(
1066            method,
1067            send_resolver,
1068            Some(ExpectedReply(
1069                Reply::ConnectionCloseOk(resolver.clone()),
1070                Box::new(resolver),
1071            )),
1072        );
1073        promise_out.await?;
1074        promise.await
1075    }
1076    fn receive_connection_close(&self, method: protocol::connection::Close) -> Result<()> {
1077        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1078        if !self.status.can_receive_messages() {
1079            return Err(Error::InvalidChannelState(self.status.state()));
1080        }
1081        self.on_connection_close_received(method)
1082    }
1083    pub(crate) async fn connection_close_ok(&self, error: Error) -> Result<()> {
1084        if !self.status.closing() {
1085            return Err(Error::InvalidChannelState(self.status.state()));
1086        }
1087
1088        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(
1089            protocol::connection::CloseOk {},
1090        ));
1091
1092        let (promise, send_resolver) = Promise::new();
1093        if level_enabled!(Level::TRACE) {
1094            promise.set_marker("connection.close-ok".into());
1095        }
1096        self.send_method_frame(method, send_resolver, None);
1097        self.on_connection_close_ok_sent(error);
1098        promise.await
1099    }
1100    fn receive_connection_close_ok(&self, method: protocol::connection::CloseOk) -> Result<()> {
1101        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1102        if !self.status.can_receive_messages() {
1103            return Err(Error::InvalidChannelState(self.status.state()));
1104        }
1105
1106        match self.frames.find_expected_reply(self.id, |reply| {
1107            matches!(&reply.0, Reply::ConnectionCloseOk(..))
1108        }) {
1109            Some(Reply::ConnectionCloseOk(resolver)) => {
1110                let res = self.on_connection_close_ok_received();
1111                resolver.swear(res.clone());
1112                res
1113            }
1114            unexpected => self.handle_invalid_contents(
1115                format!(
1116                    "unexpected connection close-ok received on channel {}, was awaiting for {:?}",
1117                    self.id, unexpected
1118                ),
1119                method.get_amqp_class_id(),
1120                method.get_amqp_method_id(),
1121            ),
1122        }
1123    }
1124    pub(crate) async fn connection_blocked(&self, reason: &str) -> Result<()> {
1125        if !self.status.connected() {
1126            return Err(Error::InvalidChannelState(self.status.state()));
1127        }
1128
1129        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(
1130            protocol::connection::Blocked {
1131                reason: reason.into(),
1132            },
1133        ));
1134
1135        let (promise, send_resolver) = Promise::new();
1136        if level_enabled!(Level::TRACE) {
1137            promise.set_marker("connection.blocked".into());
1138        }
1139        self.send_method_frame(method, send_resolver, None);
1140        promise.await
1141    }
1142    fn receive_connection_blocked(&self, method: protocol::connection::Blocked) -> Result<()> {
1143        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1144        if !self.status.can_receive_messages() {
1145            return Err(Error::InvalidChannelState(self.status.state()));
1146        }
1147        self.on_connection_blocked_received(method)
1148    }
1149    pub(crate) async fn connection_unblocked(&self) -> Result<()> {
1150        if !self.status.connected() {
1151            return Err(Error::InvalidChannelState(self.status.state()));
1152        }
1153
1154        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(
1155            protocol::connection::Unblocked {},
1156        ));
1157
1158        let (promise, send_resolver) = Promise::new();
1159        if level_enabled!(Level::TRACE) {
1160            promise.set_marker("connection.unblocked".into());
1161        }
1162        self.send_method_frame(method, send_resolver, None);
1163        promise.await
1164    }
1165    fn receive_connection_unblocked(&self, method: protocol::connection::Unblocked) -> Result<()> {
1166        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1167        if !self.status.can_receive_messages() {
1168            return Err(Error::InvalidChannelState(self.status.state()));
1169        }
1170        self.on_connection_unblocked_received(method)
1171    }
1172    pub(crate) async fn connection_update_secret(
1173        &self,
1174        new_secret: &str,
1175        reason: &str,
1176    ) -> Result<()> {
1177        if !self.status.connected() {
1178            return Err(Error::InvalidChannelState(self.status.state()));
1179        }
1180
1181        let method = AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecret(
1182            protocol::connection::UpdateSecret {
1183                new_secret: new_secret.into(),
1184                reason: reason.into(),
1185            },
1186        ));
1187
1188        let (promise, send_resolver) = Promise::new();
1189        if level_enabled!(Level::TRACE) {
1190            promise.set_marker("connection.update-secret".into());
1191        }
1192        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1193        if level_enabled!(Level::TRACE) {
1194            promise.set_marker("connection.update-secret.Ok".into());
1195        }
1196        self.send_method_frame(
1197            method,
1198            send_resolver,
1199            Some(ExpectedReply(
1200                Reply::ConnectionUpdateSecretOk(resolver.clone()),
1201                Box::new(resolver),
1202            )),
1203        );
1204        promise_out.await?;
1205        promise.await
1206    }
1207    fn receive_connection_update_secret_ok(
1208        &self,
1209        method: protocol::connection::UpdateSecretOk,
1210    ) -> Result<()> {
1211        self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1212        if !self.status.can_receive_messages() {
1213            return Err(Error::InvalidChannelState(self.status.state()));
1214        }
1215
1216        match self.frames.find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::ConnectionUpdateSecretOk(..))){
1217      Some(Reply::ConnectionUpdateSecretOk(resolver)) => {
1218        let res =        Ok(())
1219;
1220        resolver.swear(res.clone());
1221        res
1222},
1223      unexpected => {
1224        self.handle_invalid_contents(format!("unexpected connection update-secret-ok received on channel {}, was awaiting for {:?}", self.id, unexpected), method.get_amqp_class_id(), method.get_amqp_method_id())
1225      },
1226    }
1227    }
1228    pub(crate) async fn channel_open(&self, channel: Channel) -> Result<Channel> {
1229        if !self.status.initializing() {
1230            return Err(Error::InvalidChannelState(self.status.state()));
1231        }
1232
1233        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Open(
1234            protocol::channel::Open {},
1235        ));
1236
1237        let (promise, send_resolver) = Promise::new();
1238        if level_enabled!(Level::TRACE) {
1239            promise.set_marker("channel.open".into());
1240        }
1241        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1242        if level_enabled!(Level::TRACE) {
1243            promise.set_marker("channel.open.Ok".into());
1244        }
1245        self.send_method_frame(
1246            method,
1247            send_resolver,
1248            Some(ExpectedReply(
1249                Reply::ChannelOpenOk(resolver.clone(), channel),
1250                Box::new(resolver),
1251            )),
1252        );
1253        promise_out.await?;
1254        promise.await
1255    }
1256    fn receive_channel_open_ok(&self, method: protocol::channel::OpenOk) -> Result<()> {
1257        if !self.status.initializing() {
1258            return Err(Error::InvalidChannelState(self.status.state()));
1259        }
1260
1261        match self.frames.find_expected_reply(self.id, |reply| {
1262            matches!(&reply.0, Reply::ChannelOpenOk(..))
1263        }) {
1264            Some(Reply::ChannelOpenOk(resolver, channel)) => {
1265                self.on_channel_open_ok_received(method, resolver, channel)
1266            }
1267            unexpected => self.handle_invalid_contents(
1268                format!(
1269                    "unexpected channel open-ok received on channel {}, was awaiting for {:?}",
1270                    self.id, unexpected
1271                ),
1272                method.get_amqp_class_id(),
1273                method.get_amqp_method_id(),
1274            ),
1275        }
1276    }
1277    pub async fn channel_flow(&self, options: ChannelFlowOptions) -> Result<Boolean> {
1278        if !self.status.connected() {
1279            return Err(Error::InvalidChannelState(self.status.state()));
1280        }
1281
1282        let ChannelFlowOptions { active } = options;
1283        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(
1284            protocol::channel::Flow { active },
1285        ));
1286
1287        let (promise, send_resolver) = Promise::new();
1288        if level_enabled!(Level::TRACE) {
1289            promise.set_marker("channel.flow".into());
1290        }
1291        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1292        if level_enabled!(Level::TRACE) {
1293            promise.set_marker("channel.flow.Ok".into());
1294        }
1295        self.send_method_frame(
1296            method,
1297            send_resolver,
1298            Some(ExpectedReply(
1299                Reply::ChannelFlowOk(resolver.clone()),
1300                Box::new(resolver),
1301            )),
1302        );
1303        promise_out.await?;
1304        promise.await
1305    }
1306    fn receive_channel_flow(&self, method: protocol::channel::Flow) -> Result<()> {
1307        if !self.status.can_receive_messages() {
1308            return Err(Error::InvalidChannelState(self.status.state()));
1309        }
1310        self.on_channel_flow_received(method)
1311    }
1312    async fn channel_flow_ok(&self, options: ChannelFlowOkOptions) -> Result<()> {
1313        if !self.status.connected() {
1314            return Err(Error::InvalidChannelState(self.status.state()));
1315        }
1316
1317        let ChannelFlowOkOptions { active } = options;
1318        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(
1319            protocol::channel::FlowOk { active },
1320        ));
1321
1322        let (promise, send_resolver) = Promise::new();
1323        if level_enabled!(Level::TRACE) {
1324            promise.set_marker("channel.flow-ok".into());
1325        }
1326        self.send_method_frame(method, send_resolver, None);
1327        promise.await
1328    }
1329    fn receive_channel_flow_ok(&self, method: protocol::channel::FlowOk) -> Result<()> {
1330        if !self.status.can_receive_messages() {
1331            return Err(Error::InvalidChannelState(self.status.state()));
1332        }
1333
1334        match self.frames.find_expected_reply(self.id, |reply| {
1335            matches!(&reply.0, Reply::ChannelFlowOk(..))
1336        }) {
1337            Some(Reply::ChannelFlowOk(resolver)) => {
1338                self.on_channel_flow_ok_received(method, resolver)
1339            }
1340            unexpected => self.handle_invalid_contents(
1341                format!(
1342                    "unexpected channel flow-ok received on channel {}, was awaiting for {:?}",
1343                    self.id, unexpected
1344                ),
1345                method.get_amqp_class_id(),
1346                method.get_amqp_method_id(),
1347            ),
1348        }
1349    }
1350    async fn do_channel_close(
1351        &self,
1352        reply_code: ShortUInt,
1353        reply_text: &str,
1354        class_id: ShortUInt,
1355        method_id: ShortUInt,
1356    ) -> Result<()> {
1357        if !self.status.connected() {
1358            return Err(Error::InvalidChannelState(self.status.state()));
1359        }
1360
1361        self.before_channel_close();
1362        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Close(
1363            protocol::channel::Close {
1364                reply_code,
1365                reply_text: reply_text.into(),
1366                class_id,
1367                method_id,
1368            },
1369        ));
1370
1371        let (promise, send_resolver) = Promise::new();
1372        if level_enabled!(Level::TRACE) {
1373            promise.set_marker("channel.close".into());
1374        }
1375        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1376        if level_enabled!(Level::TRACE) {
1377            promise.set_marker("channel.close.Ok".into());
1378        }
1379        self.send_method_frame(
1380            method,
1381            send_resolver,
1382            Some(ExpectedReply(
1383                Reply::ChannelCloseOk(resolver.clone()),
1384                Box::new(resolver),
1385            )),
1386        );
1387        promise_out.await?;
1388        promise.await
1389    }
1390    fn receive_channel_close(&self, method: protocol::channel::Close) -> Result<()> {
1391        if !self.status.can_receive_messages() {
1392            return Err(Error::InvalidChannelState(self.status.state()));
1393        }
1394        self.on_channel_close_received(method)
1395    }
1396    async fn channel_close_ok(&self, error: Option<Error>) -> Result<()> {
1397        if !self.status.closing() {
1398            return Err(Error::InvalidChannelState(self.status.state()));
1399        }
1400
1401        let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(
1402            protocol::channel::CloseOk {},
1403        ));
1404
1405        let (promise, send_resolver) = Promise::new();
1406        if level_enabled!(Level::TRACE) {
1407            promise.set_marker("channel.close-ok".into());
1408        }
1409        self.send_method_frame(method, send_resolver, None);
1410        self.on_channel_close_ok_sent(error);
1411        promise.await
1412    }
1413    fn receive_channel_close_ok(&self, method: protocol::channel::CloseOk) -> Result<()> {
1414        if !self.status.can_receive_messages() {
1415            return Err(Error::InvalidChannelState(self.status.state()));
1416        }
1417
1418        match self.next_expected_close_ok_reply() {
1419            Some(Reply::ChannelCloseOk(resolver)) => {
1420                let res = self.on_channel_close_ok_received();
1421                resolver.swear(res.clone());
1422                res
1423            }
1424            unexpected => self.handle_invalid_contents(
1425                format!(
1426                    "unexpected channel close-ok received on channel {}, was awaiting for {:?}",
1427                    self.id, unexpected
1428                ),
1429                method.get_amqp_class_id(),
1430                method.get_amqp_method_id(),
1431            ),
1432        }
1433    }
1434    pub async fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Result<()> {
1435        if !self.status.connected() {
1436            return Err(Error::InvalidChannelState(self.status.state()));
1437        }
1438
1439        let AccessRequestOptions {
1440            exclusive,
1441            passive,
1442            active,
1443            write,
1444            read,
1445        } = options;
1446        let method = AMQPClass::Access(protocol::access::AMQPMethod::Request(
1447            protocol::access::Request {
1448                realm: realm.into(),
1449                exclusive,
1450                passive,
1451                active,
1452                write,
1453                read,
1454            },
1455        ));
1456
1457        let (promise, send_resolver) = Promise::new();
1458        if level_enabled!(Level::TRACE) {
1459            promise.set_marker("access.request".into());
1460        }
1461        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1462        if level_enabled!(Level::TRACE) {
1463            promise.set_marker("access.request.Ok".into());
1464        }
1465        self.send_method_frame(
1466            method,
1467            send_resolver,
1468            Some(ExpectedReply(
1469                Reply::AccessRequestOk(resolver.clone()),
1470                Box::new(resolver),
1471            )),
1472        );
1473        promise_out.await?;
1474        promise.await
1475    }
1476    fn receive_access_request_ok(&self, method: protocol::access::RequestOk) -> Result<()> {
1477        if !self.status.can_receive_messages() {
1478            return Err(Error::InvalidChannelState(self.status.state()));
1479        }
1480
1481        match self.frames.find_expected_reply(self.id, |reply| {
1482            matches!(&reply.0, Reply::AccessRequestOk(..))
1483        }) {
1484            Some(Reply::AccessRequestOk(resolver)) => {
1485                let res = self.on_access_request_ok_received(method);
1486                resolver.swear(res.clone());
1487                res
1488            }
1489            unexpected => self.handle_invalid_contents(
1490                format!(
1491                    "unexpected access request-ok received on channel {}, was awaiting for {:?}",
1492                    self.id, unexpected
1493                ),
1494                method.get_amqp_class_id(),
1495                method.get_amqp_method_id(),
1496            ),
1497        }
1498    }
1499    async fn do_exchange_declare(
1500        &self,
1501        exchange: &str,
1502        kind: &str,
1503        options: ExchangeDeclareOptions,
1504        arguments: FieldTable,
1505        exchange_kind: ExchangeKind,
1506    ) -> Result<()> {
1507        if !self.status.connected() {
1508            return Err(Error::InvalidChannelState(self.status.state()));
1509        }
1510
1511        let creation_arguments = arguments.clone();
1512        let ExchangeDeclareOptions {
1513            passive,
1514            durable,
1515            auto_delete,
1516            internal,
1517            nowait,
1518        } = options;
1519        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Declare(
1520            protocol::exchange::Declare {
1521                exchange: exchange.into(),
1522                kind: kind.into(),
1523                passive,
1524                durable,
1525                auto_delete,
1526                internal,
1527                nowait,
1528                arguments,
1529            },
1530        ));
1531
1532        let (promise, send_resolver) = Promise::new();
1533        if level_enabled!(Level::TRACE) {
1534            promise.set_marker("exchange.declare".into());
1535        }
1536        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1537        if level_enabled!(Level::TRACE) {
1538            promise.set_marker("exchange.declare.Ok".into());
1539        }
1540        self.send_method_frame(
1541            method,
1542            send_resolver,
1543            Some(ExpectedReply(
1544                Reply::ExchangeDeclareOk(
1545                    resolver.clone(),
1546                    exchange.into(),
1547                    exchange_kind,
1548                    options,
1549                    creation_arguments,
1550                ),
1551                Box::new(resolver),
1552            )),
1553        );
1554        if nowait {
1555            self.receive_exchange_declare_ok(protocol::exchange::DeclareOk {})?;
1556        }
1557        promise_out.await?;
1558        promise.await
1559    }
1560    fn receive_exchange_declare_ok(&self, method: protocol::exchange::DeclareOk) -> Result<()> {
1561        if !self.status.can_receive_messages() {
1562            return Err(Error::InvalidChannelState(self.status.state()));
1563        }
1564
1565        match self.frames.find_expected_reply(self.id, |reply| {
1566            matches!(&reply.0, Reply::ExchangeDeclareOk(..))
1567        }) {
1568            Some(Reply::ExchangeDeclareOk(
1569                resolver,
1570                exchange,
1571                exchange_kind,
1572                options,
1573                creation_arguments,
1574            )) => self.on_exchange_declare_ok_received(
1575                resolver,
1576                exchange,
1577                exchange_kind,
1578                options,
1579                creation_arguments,
1580            ),
1581            unexpected => self.handle_invalid_contents(
1582                format!(
1583                    "unexpected exchange declare-ok received on channel {}, was awaiting for {:?}",
1584                    self.id, unexpected
1585                ),
1586                method.get_amqp_class_id(),
1587                method.get_amqp_method_id(),
1588            ),
1589        }
1590    }
1591    /// Delete an exchange
1592    pub async fn exchange_delete(
1593        &self,
1594        exchange: &str,
1595        options: ExchangeDeleteOptions,
1596    ) -> Result<()> {
1597        if !self.status.connected() {
1598            return Err(Error::InvalidChannelState(self.status.state()));
1599        }
1600
1601        let ExchangeDeleteOptions { if_unused, nowait } = options;
1602        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Delete(
1603            protocol::exchange::Delete {
1604                exchange: exchange.into(),
1605                if_unused,
1606                nowait,
1607            },
1608        ));
1609
1610        let (promise, send_resolver) = Promise::new();
1611        if level_enabled!(Level::TRACE) {
1612            promise.set_marker("exchange.delete".into());
1613        }
1614        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1615        if level_enabled!(Level::TRACE) {
1616            promise.set_marker("exchange.delete.Ok".into());
1617        }
1618        self.send_method_frame(
1619            method,
1620            send_resolver,
1621            Some(ExpectedReply(
1622                Reply::ExchangeDeleteOk(resolver.clone(), exchange.into()),
1623                Box::new(resolver),
1624            )),
1625        );
1626        if nowait {
1627            self.receive_exchange_delete_ok(protocol::exchange::DeleteOk {})?;
1628        }
1629        promise_out.await?;
1630        promise.await
1631    }
1632    fn receive_exchange_delete_ok(&self, method: protocol::exchange::DeleteOk) -> Result<()> {
1633        if !self.status.can_receive_messages() {
1634            return Err(Error::InvalidChannelState(self.status.state()));
1635        }
1636
1637        match self.frames.find_expected_reply(self.id, |reply| {
1638            matches!(&reply.0, Reply::ExchangeDeleteOk(..))
1639        }) {
1640            Some(Reply::ExchangeDeleteOk(resolver, exchange)) => {
1641                let res = self.on_exchange_delete_ok_received(exchange);
1642                resolver.swear(res.clone());
1643                res
1644            }
1645            unexpected => self.handle_invalid_contents(
1646                format!(
1647                    "unexpected exchange delete-ok received on channel {}, was awaiting for {:?}",
1648                    self.id, unexpected
1649                ),
1650                method.get_amqp_class_id(),
1651                method.get_amqp_method_id(),
1652            ),
1653        }
1654    }
1655    pub async fn exchange_bind(
1656        &self,
1657        destination: &str,
1658        source: &str,
1659        routing_key: &str,
1660        options: ExchangeBindOptions,
1661        arguments: FieldTable,
1662    ) -> Result<()> {
1663        if !self.status.connected() {
1664            return Err(Error::InvalidChannelState(self.status.state()));
1665        }
1666
1667        let creation_arguments = arguments.clone();
1668        let ExchangeBindOptions { nowait } = options;
1669        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Bind(
1670            protocol::exchange::Bind {
1671                destination: destination.into(),
1672                source: source.into(),
1673                routing_key: routing_key.into(),
1674                nowait,
1675                arguments,
1676            },
1677        ));
1678
1679        let (promise, send_resolver) = Promise::new();
1680        if level_enabled!(Level::TRACE) {
1681            promise.set_marker("exchange.bind".into());
1682        }
1683        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1684        if level_enabled!(Level::TRACE) {
1685            promise.set_marker("exchange.bind.Ok".into());
1686        }
1687        self.send_method_frame(
1688            method,
1689            send_resolver,
1690            Some(ExpectedReply(
1691                Reply::ExchangeBindOk(
1692                    resolver.clone(),
1693                    destination.into(),
1694                    source.into(),
1695                    routing_key.into(),
1696                    creation_arguments,
1697                ),
1698                Box::new(resolver),
1699            )),
1700        );
1701        if nowait {
1702            self.receive_exchange_bind_ok(protocol::exchange::BindOk {})?;
1703        }
1704        promise_out.await?;
1705        promise.await
1706    }
1707    fn receive_exchange_bind_ok(&self, method: protocol::exchange::BindOk) -> Result<()> {
1708        if !self.status.can_receive_messages() {
1709            return Err(Error::InvalidChannelState(self.status.state()));
1710        }
1711
1712        match self.frames.find_expected_reply(self.id, |reply| {
1713            matches!(&reply.0, Reply::ExchangeBindOk(..))
1714        }) {
1715            Some(Reply::ExchangeBindOk(
1716                resolver,
1717                destination,
1718                source,
1719                routing_key,
1720                creation_arguments,
1721            )) => {
1722                let res = self.on_exchange_bind_ok_received(
1723                    destination,
1724                    source,
1725                    routing_key,
1726                    creation_arguments,
1727                );
1728                resolver.swear(res.clone());
1729                res
1730            }
1731            unexpected => self.handle_invalid_contents(
1732                format!(
1733                    "unexpected exchange bind-ok received on channel {}, was awaiting for {:?}",
1734                    self.id, unexpected
1735                ),
1736                method.get_amqp_class_id(),
1737                method.get_amqp_method_id(),
1738            ),
1739        }
1740    }
1741    pub async fn exchange_unbind(
1742        &self,
1743        destination: &str,
1744        source: &str,
1745        routing_key: &str,
1746        options: ExchangeUnbindOptions,
1747        arguments: FieldTable,
1748    ) -> Result<()> {
1749        if !self.status.connected() {
1750            return Err(Error::InvalidChannelState(self.status.state()));
1751        }
1752
1753        let creation_arguments = arguments.clone();
1754        let ExchangeUnbindOptions { nowait } = options;
1755        let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Unbind(
1756            protocol::exchange::Unbind {
1757                destination: destination.into(),
1758                source: source.into(),
1759                routing_key: routing_key.into(),
1760                nowait,
1761                arguments,
1762            },
1763        ));
1764
1765        let (promise, send_resolver) = Promise::new();
1766        if level_enabled!(Level::TRACE) {
1767            promise.set_marker("exchange.unbind".into());
1768        }
1769        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1770        if level_enabled!(Level::TRACE) {
1771            promise.set_marker("exchange.unbind.Ok".into());
1772        }
1773        self.send_method_frame(
1774            method,
1775            send_resolver,
1776            Some(ExpectedReply(
1777                Reply::ExchangeUnbindOk(
1778                    resolver.clone(),
1779                    destination.into(),
1780                    source.into(),
1781                    routing_key.into(),
1782                    creation_arguments,
1783                ),
1784                Box::new(resolver),
1785            )),
1786        );
1787        if nowait {
1788            self.receive_exchange_unbind_ok(protocol::exchange::UnbindOk {})?;
1789        }
1790        promise_out.await?;
1791        promise.await
1792    }
1793    fn receive_exchange_unbind_ok(&self, method: protocol::exchange::UnbindOk) -> Result<()> {
1794        if !self.status.can_receive_messages() {
1795            return Err(Error::InvalidChannelState(self.status.state()));
1796        }
1797
1798        match self.frames.find_expected_reply(self.id, |reply| {
1799            matches!(&reply.0, Reply::ExchangeUnbindOk(..))
1800        }) {
1801            Some(Reply::ExchangeUnbindOk(
1802                resolver,
1803                destination,
1804                source,
1805                routing_key,
1806                creation_arguments,
1807            )) => {
1808                let res = self.on_exchange_unbind_ok_received(
1809                    destination,
1810                    source,
1811                    routing_key,
1812                    creation_arguments,
1813                );
1814                resolver.swear(res.clone());
1815                res
1816            }
1817            unexpected => self.handle_invalid_contents(
1818                format!(
1819                    "unexpected exchange unbind-ok received on channel {}, was awaiting for {:?}",
1820                    self.id, unexpected
1821                ),
1822                method.get_amqp_class_id(),
1823                method.get_amqp_method_id(),
1824            ),
1825        }
1826    }
1827    pub async fn queue_declare(
1828        &self,
1829        queue: &str,
1830        options: QueueDeclareOptions,
1831        arguments: FieldTable,
1832    ) -> Result<Queue> {
1833        if !self.status.connected() {
1834            return Err(Error::InvalidChannelState(self.status.state()));
1835        }
1836
1837        let creation_arguments = arguments.clone();
1838        let QueueDeclareOptions {
1839            passive,
1840            durable,
1841            exclusive,
1842            auto_delete,
1843            nowait,
1844        } = options;
1845        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Declare(
1846            protocol::queue::Declare {
1847                queue: queue.into(),
1848                passive,
1849                durable,
1850                exclusive,
1851                auto_delete,
1852                nowait,
1853                arguments,
1854            },
1855        ));
1856
1857        let (promise, send_resolver) = Promise::new();
1858        if level_enabled!(Level::TRACE) {
1859            promise.set_marker("queue.declare".into());
1860        }
1861        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1862        if level_enabled!(Level::TRACE) {
1863            promise.set_marker("queue.declare.Ok".into());
1864        }
1865        self.send_method_frame(
1866            method,
1867            send_resolver,
1868            Some(ExpectedReply(
1869                Reply::QueueDeclareOk(resolver.clone(), options, creation_arguments),
1870                Box::new(resolver),
1871            )),
1872        );
1873        if nowait {
1874            self.receive_queue_declare_ok(protocol::queue::DeclareOk {
1875                queue: queue.into(),
1876                ..Default::default()
1877            })?;
1878        }
1879        promise_out.await?;
1880        promise.await
1881    }
1882    fn receive_queue_declare_ok(&self, method: protocol::queue::DeclareOk) -> Result<()> {
1883        if !self.status.can_receive_messages() {
1884            return Err(Error::InvalidChannelState(self.status.state()));
1885        }
1886
1887        match self.frames.find_expected_reply(self.id, |reply| {
1888            matches!(&reply.0, Reply::QueueDeclareOk(..))
1889        }) {
1890            Some(Reply::QueueDeclareOk(resolver, options, creation_arguments)) => {
1891                self.on_queue_declare_ok_received(method, resolver, options, creation_arguments)
1892            }
1893            unexpected => self.handle_invalid_contents(
1894                format!(
1895                    "unexpected queue declare-ok received on channel {}, was awaiting for {:?}",
1896                    self.id, unexpected
1897                ),
1898                method.get_amqp_class_id(),
1899                method.get_amqp_method_id(),
1900            ),
1901        }
1902    }
1903    pub async fn queue_bind(
1904        &self,
1905        queue: &str,
1906        exchange: &str,
1907        routing_key: &str,
1908        options: QueueBindOptions,
1909        arguments: FieldTable,
1910    ) -> Result<()> {
1911        if !self.status.connected() {
1912            return Err(Error::InvalidChannelState(self.status.state()));
1913        }
1914
1915        let creation_arguments = arguments.clone();
1916        let QueueBindOptions { nowait } = options;
1917        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Bind(protocol::queue::Bind {
1918            queue: queue.into(),
1919            exchange: exchange.into(),
1920            routing_key: routing_key.into(),
1921            nowait,
1922            arguments,
1923        }));
1924
1925        let (promise, send_resolver) = Promise::new();
1926        if level_enabled!(Level::TRACE) {
1927            promise.set_marker("queue.bind".into());
1928        }
1929        let ((promise, resolver), promise_out) = (Promise::new(), promise);
1930        if level_enabled!(Level::TRACE) {
1931            promise.set_marker("queue.bind.Ok".into());
1932        }
1933        self.send_method_frame(
1934            method,
1935            send_resolver,
1936            Some(ExpectedReply(
1937                Reply::QueueBindOk(
1938                    resolver.clone(),
1939                    queue.into(),
1940                    exchange.into(),
1941                    routing_key.into(),
1942                    creation_arguments,
1943                ),
1944                Box::new(resolver),
1945            )),
1946        );
1947        if nowait {
1948            self.receive_queue_bind_ok(protocol::queue::BindOk {})?;
1949        }
1950        promise_out.await?;
1951        promise.await
1952    }
1953    fn receive_queue_bind_ok(&self, method: protocol::queue::BindOk) -> Result<()> {
1954        if !self.status.can_receive_messages() {
1955            return Err(Error::InvalidChannelState(self.status.state()));
1956        }
1957
1958        match self
1959            .frames
1960            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueueBindOk(..)))
1961        {
1962            Some(Reply::QueueBindOk(
1963                resolver,
1964                queue,
1965                exchange,
1966                routing_key,
1967                creation_arguments,
1968            )) => {
1969                let res = self.on_queue_bind_ok_received(
1970                    queue,
1971                    exchange,
1972                    routing_key,
1973                    creation_arguments,
1974                );
1975                resolver.swear(res.clone());
1976                res
1977            }
1978            unexpected => self.handle_invalid_contents(
1979                format!(
1980                    "unexpected queue bind-ok received on channel {}, was awaiting for {:?}",
1981                    self.id, unexpected
1982                ),
1983                method.get_amqp_class_id(),
1984                method.get_amqp_method_id(),
1985            ),
1986        }
1987    }
1988    pub async fn queue_purge(
1989        &self,
1990        queue: &str,
1991        options: QueuePurgeOptions,
1992    ) -> Result<MessageCount> {
1993        if !self.status.connected() {
1994            return Err(Error::InvalidChannelState(self.status.state()));
1995        }
1996
1997        let QueuePurgeOptions { nowait } = options;
1998        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Purge(protocol::queue::Purge {
1999            queue: queue.into(),
2000            nowait,
2001        }));
2002
2003        let (promise, send_resolver) = Promise::new();
2004        if level_enabled!(Level::TRACE) {
2005            promise.set_marker("queue.purge".into());
2006        }
2007        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2008        if level_enabled!(Level::TRACE) {
2009            promise.set_marker("queue.purge.Ok".into());
2010        }
2011        self.send_method_frame(
2012            method,
2013            send_resolver,
2014            Some(ExpectedReply(
2015                Reply::QueuePurgeOk(resolver.clone()),
2016                Box::new(resolver),
2017            )),
2018        );
2019        promise_out.await?;
2020        promise.await
2021    }
2022    fn receive_queue_purge_ok(&self, method: protocol::queue::PurgeOk) -> Result<()> {
2023        if !self.status.can_receive_messages() {
2024            return Err(Error::InvalidChannelState(self.status.state()));
2025        }
2026
2027        match self
2028            .frames
2029            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueuePurgeOk(..)))
2030        {
2031            Some(Reply::QueuePurgeOk(resolver)) => {
2032                self.on_queue_purge_ok_received(method, resolver)
2033            }
2034            unexpected => self.handle_invalid_contents(
2035                format!(
2036                    "unexpected queue purge-ok received on channel {}, was awaiting for {:?}",
2037                    self.id, unexpected
2038                ),
2039                method.get_amqp_class_id(),
2040                method.get_amqp_method_id(),
2041            ),
2042        }
2043    }
2044    pub async fn queue_delete(
2045        &self,
2046        queue: &str,
2047        options: QueueDeleteOptions,
2048    ) -> Result<MessageCount> {
2049        if !self.status.connected() {
2050            return Err(Error::InvalidChannelState(self.status.state()));
2051        }
2052
2053        let QueueDeleteOptions {
2054            if_unused,
2055            if_empty,
2056            nowait,
2057        } = options;
2058        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Delete(
2059            protocol::queue::Delete {
2060                queue: queue.into(),
2061                if_unused,
2062                if_empty,
2063                nowait,
2064            },
2065        ));
2066
2067        let (promise, send_resolver) = Promise::new();
2068        if level_enabled!(Level::TRACE) {
2069            promise.set_marker("queue.delete".into());
2070        }
2071        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2072        if level_enabled!(Level::TRACE) {
2073            promise.set_marker("queue.delete.Ok".into());
2074        }
2075        self.send_method_frame(
2076            method,
2077            send_resolver,
2078            Some(ExpectedReply(
2079                Reply::QueueDeleteOk(resolver.clone(), queue.into()),
2080                Box::new(resolver),
2081            )),
2082        );
2083        if nowait {
2084            self.receive_queue_delete_ok(protocol::queue::DeleteOk {
2085                ..Default::default()
2086            })?;
2087        }
2088        promise_out.await?;
2089        promise.await
2090    }
2091    fn receive_queue_delete_ok(&self, method: protocol::queue::DeleteOk) -> Result<()> {
2092        if !self.status.can_receive_messages() {
2093            return Err(Error::InvalidChannelState(self.status.state()));
2094        }
2095
2096        match self.frames.find_expected_reply(self.id, |reply| {
2097            matches!(&reply.0, Reply::QueueDeleteOk(..))
2098        }) {
2099            Some(Reply::QueueDeleteOk(resolver, queue)) => {
2100                self.on_queue_delete_ok_received(method, resolver, queue)
2101            }
2102            unexpected => self.handle_invalid_contents(
2103                format!(
2104                    "unexpected queue delete-ok received on channel {}, was awaiting for {:?}",
2105                    self.id, unexpected
2106                ),
2107                method.get_amqp_class_id(),
2108                method.get_amqp_method_id(),
2109            ),
2110        }
2111    }
2112    pub async fn queue_unbind(
2113        &self,
2114        queue: &str,
2115        exchange: &str,
2116        routing_key: &str,
2117        arguments: FieldTable,
2118    ) -> Result<()> {
2119        if !self.status.connected() {
2120            return Err(Error::InvalidChannelState(self.status.state()));
2121        }
2122
2123        let creation_arguments = arguments.clone();
2124        let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Unbind(
2125            protocol::queue::Unbind {
2126                queue: queue.into(),
2127                exchange: exchange.into(),
2128                routing_key: routing_key.into(),
2129                arguments,
2130            },
2131        ));
2132
2133        let (promise, send_resolver) = Promise::new();
2134        if level_enabled!(Level::TRACE) {
2135            promise.set_marker("queue.unbind".into());
2136        }
2137        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2138        if level_enabled!(Level::TRACE) {
2139            promise.set_marker("queue.unbind.Ok".into());
2140        }
2141        self.send_method_frame(
2142            method,
2143            send_resolver,
2144            Some(ExpectedReply(
2145                Reply::QueueUnbindOk(
2146                    resolver.clone(),
2147                    queue.into(),
2148                    exchange.into(),
2149                    routing_key.into(),
2150                    creation_arguments,
2151                ),
2152                Box::new(resolver),
2153            )),
2154        );
2155        promise_out.await?;
2156        promise.await
2157    }
2158    fn receive_queue_unbind_ok(&self, method: protocol::queue::UnbindOk) -> Result<()> {
2159        if !self.status.can_receive_messages() {
2160            return Err(Error::InvalidChannelState(self.status.state()));
2161        }
2162
2163        match self.frames.find_expected_reply(self.id, |reply| {
2164            matches!(&reply.0, Reply::QueueUnbindOk(..))
2165        }) {
2166            Some(Reply::QueueUnbindOk(
2167                resolver,
2168                queue,
2169                exchange,
2170                routing_key,
2171                creation_arguments,
2172            )) => {
2173                let res = self.on_queue_unbind_ok_received(
2174                    queue,
2175                    exchange,
2176                    routing_key,
2177                    creation_arguments,
2178                );
2179                resolver.swear(res.clone());
2180                res
2181            }
2182            unexpected => self.handle_invalid_contents(
2183                format!(
2184                    "unexpected queue unbind-ok received on channel {}, was awaiting for {:?}",
2185                    self.id, unexpected
2186                ),
2187                method.get_amqp_class_id(),
2188                method.get_amqp_method_id(),
2189            ),
2190        }
2191    }
2192    pub async fn tx_select(&self) -> Result<()> {
2193        if !self.status.connected() {
2194            return Err(Error::InvalidChannelState(self.status.state()));
2195        }
2196
2197        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Select(protocol::tx::Select {}));
2198
2199        let (promise, send_resolver) = Promise::new();
2200        if level_enabled!(Level::TRACE) {
2201            promise.set_marker("tx.select".into());
2202        }
2203        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2204        if level_enabled!(Level::TRACE) {
2205            promise.set_marker("tx.select.Ok".into());
2206        }
2207        self.send_method_frame(
2208            method,
2209            send_resolver,
2210            Some(ExpectedReply(
2211                Reply::TxSelectOk(resolver.clone()),
2212                Box::new(resolver),
2213            )),
2214        );
2215        promise_out.await?;
2216        promise.await
2217    }
2218    fn receive_tx_select_ok(&self, method: protocol::tx::SelectOk) -> Result<()> {
2219        if !self.status.can_receive_messages() {
2220            return Err(Error::InvalidChannelState(self.status.state()));
2221        }
2222
2223        match self
2224            .frames
2225            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxSelectOk(..)))
2226        {
2227            Some(Reply::TxSelectOk(resolver)) => {
2228                let res = Ok(());
2229                resolver.swear(res.clone());
2230                res
2231            }
2232            unexpected => self.handle_invalid_contents(
2233                format!(
2234                    "unexpected tx select-ok received on channel {}, was awaiting for {:?}",
2235                    self.id, unexpected
2236                ),
2237                method.get_amqp_class_id(),
2238                method.get_amqp_method_id(),
2239            ),
2240        }
2241    }
2242    pub async fn tx_commit(&self) -> Result<()> {
2243        if !self.status.connected() {
2244            return Err(Error::InvalidChannelState(self.status.state()));
2245        }
2246
2247        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Commit(protocol::tx::Commit {}));
2248
2249        let (promise, send_resolver) = Promise::new();
2250        if level_enabled!(Level::TRACE) {
2251            promise.set_marker("tx.commit".into());
2252        }
2253        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2254        if level_enabled!(Level::TRACE) {
2255            promise.set_marker("tx.commit.Ok".into());
2256        }
2257        self.send_method_frame(
2258            method,
2259            send_resolver,
2260            Some(ExpectedReply(
2261                Reply::TxCommitOk(resolver.clone()),
2262                Box::new(resolver),
2263            )),
2264        );
2265        promise_out.await?;
2266        promise.await
2267    }
2268    fn receive_tx_commit_ok(&self, method: protocol::tx::CommitOk) -> Result<()> {
2269        if !self.status.can_receive_messages() {
2270            return Err(Error::InvalidChannelState(self.status.state()));
2271        }
2272
2273        match self
2274            .frames
2275            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxCommitOk(..)))
2276        {
2277            Some(Reply::TxCommitOk(resolver)) => {
2278                let res = Ok(());
2279                resolver.swear(res.clone());
2280                res
2281            }
2282            unexpected => self.handle_invalid_contents(
2283                format!(
2284                    "unexpected tx commit-ok received on channel {}, was awaiting for {:?}",
2285                    self.id, unexpected
2286                ),
2287                method.get_amqp_class_id(),
2288                method.get_amqp_method_id(),
2289            ),
2290        }
2291    }
2292    pub async fn tx_rollback(&self) -> Result<()> {
2293        if !self.status.connected() {
2294            return Err(Error::InvalidChannelState(self.status.state()));
2295        }
2296
2297        let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Rollback(
2298            protocol::tx::Rollback {},
2299        ));
2300
2301        let (promise, send_resolver) = Promise::new();
2302        if level_enabled!(Level::TRACE) {
2303            promise.set_marker("tx.rollback".into());
2304        }
2305        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2306        if level_enabled!(Level::TRACE) {
2307            promise.set_marker("tx.rollback.Ok".into());
2308        }
2309        self.send_method_frame(
2310            method,
2311            send_resolver,
2312            Some(ExpectedReply(
2313                Reply::TxRollbackOk(resolver.clone()),
2314                Box::new(resolver),
2315            )),
2316        );
2317        promise_out.await?;
2318        promise.await
2319    }
2320    fn receive_tx_rollback_ok(&self, method: protocol::tx::RollbackOk) -> Result<()> {
2321        if !self.status.can_receive_messages() {
2322            return Err(Error::InvalidChannelState(self.status.state()));
2323        }
2324
2325        match self
2326            .frames
2327            .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxRollbackOk(..)))
2328        {
2329            Some(Reply::TxRollbackOk(resolver)) => {
2330                let res = Ok(());
2331                resolver.swear(res.clone());
2332                res
2333            }
2334            unexpected => self.handle_invalid_contents(
2335                format!(
2336                    "unexpected tx rollback-ok received on channel {}, was awaiting for {:?}",
2337                    self.id, unexpected
2338                ),
2339                method.get_amqp_class_id(),
2340                method.get_amqp_method_id(),
2341            ),
2342        }
2343    }
2344    pub async fn confirm_select(&self, options: ConfirmSelectOptions) -> Result<()> {
2345        if !self.status.connected() {
2346            return Err(Error::InvalidChannelState(self.status.state()));
2347        }
2348
2349        let ConfirmSelectOptions { nowait } = options;
2350        let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::Select(
2351            protocol::confirm::Select { nowait },
2352        ));
2353
2354        let (promise, send_resolver) = Promise::new();
2355        if level_enabled!(Level::TRACE) {
2356            promise.set_marker("confirm.select".into());
2357        }
2358        let ((promise, resolver), promise_out) = (Promise::new(), promise);
2359        if level_enabled!(Level::TRACE) {
2360            promise.set_marker("confirm.select.Ok".into());
2361        }
2362        self.send_method_frame(
2363            method,
2364            send_resolver,
2365            Some(ExpectedReply(
2366                Reply::ConfirmSelectOk(resolver.clone()),
2367                Box::new(resolver),
2368            )),
2369        );
2370        promise_out.await?;
2371        promise.await
2372    }
2373    fn receive_confirm_select_ok(&self, method: protocol::confirm::SelectOk) -> Result<()> {
2374        if !self.status.can_receive_messages() {
2375            return Err(Error::InvalidChannelState(self.status.state()));
2376        }
2377
2378        match self.frames.find_expected_reply(self.id, |reply| {
2379            matches!(&reply.0, Reply::ConfirmSelectOk(..))
2380        }) {
2381            Some(Reply::ConfirmSelectOk(resolver)) => {
2382                let res = self.on_confirm_select_ok_received();
2383                resolver.swear(res.clone());
2384                res
2385            }
2386            unexpected => self.handle_invalid_contents(
2387                format!(
2388                    "unexpected confirm select-ok received on channel {}, was awaiting for {:?}",
2389                    self.id, unexpected
2390                ),
2391                method.get_amqp_class_id(),
2392                method.get_amqp_method_id(),
2393            ),
2394        }
2395    }
2396}