1pub mod options {
2 use super::*;
3
4 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
5 pub struct BasicQosOptions {
6 #[serde(default)]
7 pub global: Boolean,
8 }
9
10 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
11 pub struct BasicConsumeOptions {
12 #[serde(default)]
13 pub no_local: Boolean,
14 #[serde(default)]
15 pub no_ack: Boolean,
16 #[serde(default)]
17 pub exclusive: Boolean,
18 #[serde(default)]
19 pub nowait: Boolean,
20 }
21
22 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
23 pub struct BasicCancelOptions {
24 #[serde(default)]
25 pub nowait: Boolean,
26 }
27
28 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
29 pub struct BasicPublishOptions {
30 #[serde(default)]
31 pub mandatory: Boolean,
32 #[serde(default)]
33 pub immediate: Boolean,
34 }
35
36 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
37 pub struct BasicDeliverOptions {
38 #[serde(default)]
39 pub redelivered: Boolean,
40 }
41
42 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
43 pub struct BasicGetOptions {
44 #[serde(default)]
45 pub no_ack: Boolean,
46 }
47
48 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
49 pub struct BasicGetOkOptions {
50 #[serde(default)]
51 pub redelivered: Boolean,
52 }
53
54 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
55 pub struct BasicAckOptions {
56 #[serde(default)]
57 pub multiple: Boolean,
58 }
59
60 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
61 pub struct BasicRejectOptions {
62 #[serde(default)]
63 pub requeue: Boolean,
64 }
65
66 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
67 pub struct BasicRecoverAsyncOptions {
68 #[serde(default)]
69 pub requeue: Boolean,
70 }
71
72 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
73 pub struct BasicRecoverOptions {
74 #[serde(default)]
75 pub requeue: Boolean,
76 }
77
78 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
79 pub struct BasicNackOptions {
80 #[serde(default)]
81 pub multiple: Boolean,
82 #[serde(default)]
83 pub requeue: Boolean,
84 }
85
86 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
87 pub struct ChannelFlowOptions {
88 #[serde(default)]
89 pub active: Boolean,
90 }
91
92 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
93 pub struct ChannelFlowOkOptions {
94 #[serde(default)]
95 pub active: Boolean,
96 }
97
98 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
99 pub struct AccessRequestOptions {
100 #[serde(default)]
101 pub exclusive: Boolean,
102 #[serde(default)]
103 pub passive: Boolean,
104 #[serde(default)]
105 pub active: Boolean,
106 #[serde(default)]
107 pub write: Boolean,
108 #[serde(default)]
109 pub read: Boolean,
110 }
111
112 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
113 pub struct ExchangeDeclareOptions {
114 #[serde(default)]
115 pub passive: Boolean,
116 #[serde(default)]
117 pub durable: Boolean,
118 #[serde(default)]
119 pub auto_delete: Boolean,
120 #[serde(default)]
121 pub internal: Boolean,
122 #[serde(default)]
123 pub nowait: Boolean,
124 }
125
126 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
127 pub struct ExchangeDeleteOptions {
128 #[serde(default)]
129 pub if_unused: Boolean,
130 #[serde(default)]
131 pub nowait: Boolean,
132 }
133
134 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
135 pub struct ExchangeBindOptions {
136 #[serde(default)]
137 pub nowait: Boolean,
138 }
139
140 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
141 pub struct ExchangeUnbindOptions {
142 #[serde(default)]
143 pub nowait: Boolean,
144 }
145
146 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
147 pub struct QueueDeclareOptions {
148 #[serde(default)]
149 pub passive: Boolean,
150 #[serde(default)]
151 pub durable: Boolean,
152 #[serde(default)]
153 pub exclusive: Boolean,
154 #[serde(default)]
155 pub auto_delete: Boolean,
156 #[serde(default)]
157 pub nowait: Boolean,
158 }
159
160 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
161 pub struct QueueBindOptions {
162 #[serde(default)]
163 pub nowait: Boolean,
164 }
165
166 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
167 pub struct QueuePurgeOptions {
168 #[serde(default)]
169 pub nowait: Boolean,
170 }
171
172 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
173 pub struct QueueDeleteOptions {
174 #[serde(default)]
175 pub if_unused: Boolean,
176 #[serde(default)]
177 pub if_empty: Boolean,
178 #[serde(default)]
179 pub nowait: Boolean,
180 }
181
182 #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
183 pub struct ConfirmSelectOptions {
184 #[serde(default)]
185 pub nowait: Boolean,
186 }
187}
188
189use options::*;
190
191#[derive(Debug)]
192#[allow(clippy::enum_variant_names)]
193pub(crate) enum Reply {
194 BasicQosOk(PromiseResolver<()>),
195 BasicConsumeOk(
196 PromiseResolver<Consumer>,
197 Option<Arc<ChannelCloser>>,
198 ShortString,
199 BasicConsumeOptions,
200 FieldTable,
201 Option<Consumer>,
202 ),
203 BasicCancelOk(PromiseResolver<()>),
204 BasicGetOk(
205 PromiseResolver<Option<BasicGetMessage>>,
206 ShortString,
207 BasicGetOptions,
208 ),
209 BasicRecoverOk(PromiseResolver<()>),
210 ConnectionOpenOk(PromiseResolver<()>, Connection),
211 ConnectionCloseOk(PromiseResolver<()>),
212 ConnectionUpdateSecretOk(PromiseResolver<()>),
213 ChannelOpenOk(PromiseResolver<Channel>, Channel),
214 ChannelFlowOk(PromiseResolver<Boolean>),
215 ChannelCloseOk(PromiseResolver<()>),
216 AccessRequestOk(PromiseResolver<()>),
217 ExchangeDeclareOk(
218 PromiseResolver<()>,
219 ShortString,
220 ExchangeKind,
221 ExchangeDeclareOptions,
222 FieldTable,
223 ),
224 ExchangeDeleteOk(PromiseResolver<()>, ShortString),
225 ExchangeBindOk(
226 PromiseResolver<()>,
227 ShortString,
228 ShortString,
229 ShortString,
230 FieldTable,
231 ),
232 ExchangeUnbindOk(
233 PromiseResolver<()>,
234 ShortString,
235 ShortString,
236 ShortString,
237 FieldTable,
238 ),
239 QueueDeclareOk(PromiseResolver<Queue>, QueueDeclareOptions, FieldTable),
240 QueueBindOk(
241 PromiseResolver<()>,
242 ShortString,
243 ShortString,
244 ShortString,
245 FieldTable,
246 ),
247 QueuePurgeOk(PromiseResolver<MessageCount>),
248 QueueDeleteOk(PromiseResolver<MessageCount>, ShortString),
249 QueueUnbindOk(
250 PromiseResolver<()>,
251 ShortString,
252 ShortString,
253 ShortString,
254 FieldTable,
255 ),
256 TxSelectOk(PromiseResolver<()>),
257 TxCommitOk(PromiseResolver<()>),
258 TxRollbackOk(PromiseResolver<()>),
259 ConfirmSelectOk(PromiseResolver<()>),
260}
261
262impl Channel {
263 pub(crate) fn receive_method(&self, method: AMQPClass) -> Result<()> {
264 match method {
265 AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk(m)) => self.receive_basic_qos_ok(m),
266 AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk(m)) => {
267 self.receive_basic_consume_ok(m)
268 }
269 AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(m)) => {
270 self.receive_basic_cancel(m)
271 }
272 AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(m)) => {
273 self.receive_basic_cancel_ok(m)
274 }
275 AMQPClass::Basic(protocol::basic::AMQPMethod::Return(m)) => {
276 self.receive_basic_return(m)
277 }
278 AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver(m)) => {
279 self.receive_basic_deliver(m)
280 }
281 AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk(m)) => self.receive_basic_get_ok(m),
282 AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty(m)) => {
283 self.receive_basic_get_empty(m)
284 }
285 AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(m)) => self.receive_basic_ack(m),
286 AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk(m)) => {
287 self.receive_basic_recover_ok(m)
288 }
289 AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(m)) => self.receive_basic_nack(m),
290 AMQPClass::Connection(protocol::connection::AMQPMethod::Start(m)) => {
291 self.receive_connection_start(m)
292 }
293 AMQPClass::Connection(protocol::connection::AMQPMethod::Secure(m)) => {
294 self.receive_connection_secure(m)
295 }
296 AMQPClass::Connection(protocol::connection::AMQPMethod::Tune(m)) => {
297 self.receive_connection_tune(m)
298 }
299 AMQPClass::Connection(protocol::connection::AMQPMethod::OpenOk(m)) => {
300 self.receive_connection_open_ok(m)
301 }
302 AMQPClass::Connection(protocol::connection::AMQPMethod::Close(m)) => {
303 self.receive_connection_close(m)
304 }
305 AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(m)) => {
306 self.receive_connection_close_ok(m)
307 }
308 AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(m)) => {
309 self.receive_connection_blocked(m)
310 }
311 AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(m)) => {
312 self.receive_connection_unblocked(m)
313 }
314 AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecretOk(m)) => {
315 self.receive_connection_update_secret_ok(m)
316 }
317 AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk(m)) => {
318 self.receive_channel_open_ok(m)
319 }
320 AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(m)) => {
321 self.receive_channel_flow(m)
322 }
323 AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(m)) => {
324 self.receive_channel_flow_ok(m)
325 }
326 AMQPClass::Channel(protocol::channel::AMQPMethod::Close(m)) => {
327 self.receive_channel_close(m)
328 }
329 AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(m)) => {
330 self.receive_channel_close_ok(m)
331 }
332 AMQPClass::Access(protocol::access::AMQPMethod::RequestOk(m)) => {
333 self.receive_access_request_ok(m)
334 }
335 AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk(m)) => {
336 self.receive_exchange_declare_ok(m)
337 }
338 AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk(m)) => {
339 self.receive_exchange_delete_ok(m)
340 }
341 AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk(m)) => {
342 self.receive_exchange_bind_ok(m)
343 }
344 AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk(m)) => {
345 self.receive_exchange_unbind_ok(m)
346 }
347 AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk(m)) => {
348 self.receive_queue_declare_ok(m)
349 }
350 AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk(m)) => {
351 self.receive_queue_bind_ok(m)
352 }
353 AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk(m)) => {
354 self.receive_queue_purge_ok(m)
355 }
356 AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk(m)) => {
357 self.receive_queue_delete_ok(m)
358 }
359 AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk(m)) => {
360 self.receive_queue_unbind_ok(m)
361 }
362 AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk(m)) => self.receive_tx_select_ok(m),
363 AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk(m)) => self.receive_tx_commit_ok(m),
364 AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk(m)) => {
365 self.receive_tx_rollback_ok(m)
366 }
367 AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk(m)) => {
368 self.receive_confirm_select_ok(m)
369 }
370 m => {
371 error!(method=?m, "The client should not receive this method");
372 self.handle_invalid_contents(
373 format!("unexpected method received on channel {}", self.id),
374 m.get_amqp_class_id(),
375 m.get_amqp_method_id(),
376 )
377 }
378 }
379 }
380
381 pub async fn basic_qos(
382 &self,
383 prefetch_count: ShortUInt,
384 options: BasicQosOptions,
385 ) -> Result<()> {
386 if !self.status.connected() {
387 return Err(Error::InvalidChannelState(self.status.state()));
388 }
389
390 let BasicQosOptions { global } = options;
391 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Qos(protocol::basic::Qos {
392 prefetch_count,
393 global,
394 }));
395
396 let (promise, send_resolver) = Promise::new();
397 if level_enabled!(Level::TRACE) {
398 promise.set_marker("basic.qos".into());
399 }
400 let ((promise, resolver), promise_out) = (Promise::new(), promise);
401 if level_enabled!(Level::TRACE) {
402 promise.set_marker("basic.qos.Ok".into());
403 }
404 self.send_method_frame(
405 method,
406 send_resolver,
407 Some(ExpectedReply(
408 Reply::BasicQosOk(resolver.clone()),
409 Box::new(resolver),
410 )),
411 );
412 promise_out.await?;
413 promise.await
414 }
415 fn receive_basic_qos_ok(&self, method: protocol::basic::QosOk) -> Result<()> {
416 if !self.status.can_receive_messages() {
417 return Err(Error::InvalidChannelState(self.status.state()));
418 }
419
420 match self
421 .frames
422 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicQosOk(..)))
423 {
424 Some(Reply::BasicQosOk(resolver)) => {
425 let res = Ok(());
426 resolver.swear(res.clone());
427 res
428 }
429 unexpected => self.handle_invalid_contents(
430 format!(
431 "unexpected basic qos-ok received on channel {}, was awaiting for {:?}",
432 self.id, unexpected
433 ),
434 method.get_amqp_class_id(),
435 method.get_amqp_method_id(),
436 ),
437 }
438 }
439 async fn do_basic_consume(
440 &self,
441 queue: &str,
442 consumer_tag: &str,
443 options: BasicConsumeOptions,
444 arguments: FieldTable,
445 original: Option<Consumer>,
446 ) -> Result<Consumer> {
447 if !self.status.connected() {
448 return Err(Error::InvalidChannelState(self.status.state()));
449 }
450
451 let creation_arguments = arguments.clone();
452 let BasicConsumeOptions {
453 no_local,
454 no_ack,
455 exclusive,
456 nowait,
457 } = options;
458 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Consume(
459 protocol::basic::Consume {
460 queue: queue.into(),
461 consumer_tag: consumer_tag.into(),
462 no_local,
463 no_ack,
464 exclusive,
465 nowait,
466 arguments,
467 },
468 ));
469
470 let (promise, send_resolver) = Promise::new();
471 if level_enabled!(Level::TRACE) {
472 promise.set_marker("basic.consume".into());
473 }
474 let ((promise, resolver), promise_out) = (Promise::new(), promise);
475 if level_enabled!(Level::TRACE) {
476 promise.set_marker("basic.consume.Ok".into());
477 }
478 self.send_method_frame(
479 method,
480 send_resolver,
481 Some(ExpectedReply(
482 Reply::BasicConsumeOk(
483 resolver.clone(),
484 self.channel_closer.clone(),
485 queue.into(),
486 options,
487 creation_arguments,
488 original,
489 ),
490 Box::new(resolver),
491 )),
492 );
493 if nowait {
494 self.receive_basic_consume_ok(protocol::basic::ConsumeOk {
495 consumer_tag: consumer_tag.into(),
496 })?;
497 }
498 promise_out.await?;
499 promise.await
500 }
501 fn receive_basic_consume_ok(&self, method: protocol::basic::ConsumeOk) -> Result<()> {
502 if !self.status.can_receive_messages() {
503 return Err(Error::InvalidChannelState(self.status.state()));
504 }
505
506 match self.frames.find_expected_reply(self.id, |reply| {
507 matches!(&reply.0, Reply::BasicConsumeOk(..))
508 }) {
509 Some(Reply::BasicConsumeOk(
510 resolver,
511 channel_closer,
512 queue,
513 options,
514 creation_arguments,
515 original,
516 )) => self.on_basic_consume_ok_received(
517 method,
518 resolver,
519 channel_closer,
520 queue,
521 options,
522 creation_arguments,
523 original,
524 ),
525 unexpected => self.handle_invalid_contents(
526 format!(
527 "unexpected basic consume-ok received on channel {}, was awaiting for {:?}",
528 self.id, unexpected
529 ),
530 method.get_amqp_class_id(),
531 method.get_amqp_method_id(),
532 ),
533 }
534 }
535 pub async fn basic_cancel(
536 &self,
537 consumer_tag: &str,
538 options: BasicCancelOptions,
539 ) -> Result<()> {
540 if !self.status.connected() {
541 return Err(Error::InvalidChannelState(self.status.state()));
542 }
543
544 self.before_basic_cancel(consumer_tag);
545 let BasicCancelOptions { nowait } = options;
546 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(
547 protocol::basic::Cancel {
548 consumer_tag: consumer_tag.into(),
549 nowait,
550 },
551 ));
552
553 let (promise, send_resolver) = Promise::new();
554 if level_enabled!(Level::TRACE) {
555 promise.set_marker("basic.cancel".into());
556 }
557 let ((promise, resolver), promise_out) = (Promise::new(), promise);
558 if level_enabled!(Level::TRACE) {
559 promise.set_marker("basic.cancel.Ok".into());
560 }
561 self.send_method_frame(
562 method,
563 send_resolver,
564 Some(ExpectedReply(
565 Reply::BasicCancelOk(resolver.clone()),
566 Box::new(resolver),
567 )),
568 );
569 if nowait {
570 self.receive_basic_cancel_ok(protocol::basic::CancelOk {
571 consumer_tag: consumer_tag.into(),
572 })?;
573 }
574 promise_out.await?;
575 promise.await
576 }
577 fn receive_basic_cancel(&self, method: protocol::basic::Cancel) -> Result<()> {
578 if !self.status.can_receive_messages() {
579 return Err(Error::InvalidChannelState(self.status.state()));
580 }
581 self.on_basic_cancel_received(method)
582 }
583 async fn basic_cancel_ok(&self, consumer_tag: &str) -> Result<()> {
584 if !self.status.connected() {
585 return Err(Error::InvalidChannelState(self.status.state()));
586 }
587
588 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(
589 protocol::basic::CancelOk {
590 consumer_tag: consumer_tag.into(),
591 },
592 ));
593
594 let (promise, send_resolver) = Promise::new();
595 if level_enabled!(Level::TRACE) {
596 promise.set_marker("basic.cancel-ok".into());
597 }
598 self.send_method_frame(method, send_resolver, None);
599 promise.await
600 }
601 fn receive_basic_cancel_ok(&self, method: protocol::basic::CancelOk) -> Result<()> {
602 if !self.status.can_receive_messages() {
603 return Err(Error::InvalidChannelState(self.status.state()));
604 }
605
606 match self.frames.find_expected_reply(self.id, |reply| {
607 matches!(&reply.0, Reply::BasicCancelOk(..))
608 }) {
609 Some(Reply::BasicCancelOk(resolver)) => {
610 let res = self.on_basic_cancel_ok_received(method);
611 resolver.swear(res.clone());
612 res
613 }
614 unexpected => self.handle_invalid_contents(
615 format!(
616 "unexpected basic cancel-ok received on channel {}, was awaiting for {:?}",
617 self.id, unexpected
618 ),
619 method.get_amqp_class_id(),
620 method.get_amqp_method_id(),
621 ),
622 }
623 }
624 pub async fn basic_publish(
625 &self,
626 exchange: &str,
627 routing_key: &str,
628 options: BasicPublishOptions,
629 payload: &[u8],
630 properties: BasicProperties,
631 ) -> Result<PublisherConfirm> {
632 if !self.status.connected() {
633 return Err(Error::InvalidChannelState(self.status.state()));
634 }
635
636 let start_hook_res = self.before_basic_publish();
637 let BasicPublishOptions {
638 mandatory,
639 immediate,
640 } = options;
641 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Publish(
642 protocol::basic::Publish {
643 exchange: exchange.into(),
644 routing_key: routing_key.into(),
645 mandatory,
646 immediate,
647 },
648 ));
649
650 self.send_method_frame_with_body(method, payload, properties, start_hook_res)
651 .await
652 }
653 fn receive_basic_return(&self, method: protocol::basic::Return) -> Result<()> {
654 if !self.status.can_receive_messages() {
655 return Err(Error::InvalidChannelState(self.status.state()));
656 }
657 self.on_basic_return_received(method)
658 }
659 fn receive_basic_deliver(&self, method: protocol::basic::Deliver) -> Result<()> {
660 if !self.status.can_receive_messages() {
661 return Err(Error::InvalidChannelState(self.status.state()));
662 }
663 self.on_basic_deliver_received(method)
664 }
665 async fn do_basic_get(
666 &self,
667 queue: &str,
668 options: BasicGetOptions,
669 original: Option<PromiseResolver<Option<BasicGetMessage>>>,
670 ) -> Result<Option<BasicGetMessage>> {
671 if !self.status.connected() {
672 return Err(Error::InvalidChannelState(self.status.state()));
673 }
674
675 let BasicGetOptions { no_ack } = options;
676 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Get(protocol::basic::Get {
677 queue: queue.into(),
678 no_ack,
679 }));
680
681 let (promise, send_resolver) = Promise::new();
682 if level_enabled!(Level::TRACE) {
683 promise.set_marker("basic.get".into());
684 }
685 let ((promise, resolver), promise_out) = (Promise::new(), promise);
686 if level_enabled!(Level::TRACE) {
687 promise.set_marker("basic.get.Ok".into());
688 }
689 let resolver = original.unwrap_or(resolver);
690 self.send_method_frame(
691 method,
692 send_resolver,
693 Some(ExpectedReply(
694 Reply::BasicGetOk(resolver.clone(), queue.into(), options),
695 Box::new(resolver),
696 )),
697 );
698 promise_out.await?;
699 promise.await
700 }
701 fn receive_basic_get_ok(&self, method: protocol::basic::GetOk) -> Result<()> {
702 if !self.status.can_receive_messages() {
703 return Err(Error::InvalidChannelState(self.status.state()));
704 }
705
706 match self
707 .frames
708 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
709 {
710 Some(Reply::BasicGetOk(resolver, queue, options)) => {
711 self.on_basic_get_ok_received(method, resolver, queue, options)
712 }
713 unexpected => self.handle_invalid_contents(
714 format!(
715 "unexpected basic get-ok received on channel {}, was awaiting for {:?}",
716 self.id, unexpected
717 ),
718 method.get_amqp_class_id(),
719 method.get_amqp_method_id(),
720 ),
721 }
722 }
723 fn receive_basic_get_empty(&self, method: protocol::basic::GetEmpty) -> Result<()> {
724 if !self.status.can_receive_messages() {
725 return Err(Error::InvalidChannelState(self.status.state()));
726 }
727 self.on_basic_get_empty_received(method)
728 }
729 pub async fn basic_ack(
730 &self,
731 delivery_tag: LongLongUInt,
732 options: BasicAckOptions,
733 ) -> Result<()> {
734 if !self.status.connected() {
735 return Err(Error::InvalidChannelState(self.status.state()));
736 }
737
738 let BasicAckOptions { multiple } = options;
739 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(protocol::basic::Ack {
740 delivery_tag,
741 multiple,
742 }));
743
744 let (promise, send_resolver) = Promise::new();
745 if level_enabled!(Level::TRACE) {
746 promise.set_marker("basic.ack".into());
747 }
748 self.send_method_frame(method, send_resolver, None);
749 self.on_basic_ack_sent(multiple, delivery_tag);
750 promise.await
751 }
752 fn receive_basic_ack(&self, method: protocol::basic::Ack) -> Result<()> {
753 if !self.status.can_receive_messages() {
754 return Err(Error::InvalidChannelState(self.status.state()));
755 }
756 self.on_basic_ack_received(method)
757 }
758 pub async fn basic_reject(
759 &self,
760 delivery_tag: LongLongUInt,
761 options: BasicRejectOptions,
762 ) -> Result<()> {
763 if !self.status.connected() {
764 return Err(Error::InvalidChannelState(self.status.state()));
765 }
766
767 let BasicRejectOptions { requeue } = options;
768 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Reject(
769 protocol::basic::Reject {
770 delivery_tag,
771 requeue,
772 },
773 ));
774
775 let (promise, send_resolver) = Promise::new();
776 if level_enabled!(Level::TRACE) {
777 promise.set_marker("basic.reject".into());
778 }
779 self.send_method_frame(method, send_resolver, None);
780 promise.await
781 }
782 pub async fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> Result<()> {
783 if !self.status.connected() {
784 return Err(Error::InvalidChannelState(self.status.state()));
785 }
786
787 let BasicRecoverAsyncOptions { requeue } = options;
788 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverAsync(
789 protocol::basic::RecoverAsync { requeue },
790 ));
791
792 let (promise, send_resolver) = Promise::new();
793 if level_enabled!(Level::TRACE) {
794 promise.set_marker("basic.recover-async".into());
795 }
796 self.send_method_frame(method, send_resolver, None);
797 self.on_basic_recover_async_sent();
798 promise.await
799 }
800 pub async fn basic_recover(&self, options: BasicRecoverOptions) -> Result<()> {
801 if !self.status.connected() {
802 return Err(Error::InvalidChannelState(self.status.state()));
803 }
804
805 let BasicRecoverOptions { requeue } = options;
806 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Recover(
807 protocol::basic::Recover { requeue },
808 ));
809
810 let (promise, send_resolver) = Promise::new();
811 if level_enabled!(Level::TRACE) {
812 promise.set_marker("basic.recover".into());
813 }
814 let ((promise, resolver), promise_out) = (Promise::new(), promise);
815 if level_enabled!(Level::TRACE) {
816 promise.set_marker("basic.recover.Ok".into());
817 }
818 self.send_method_frame(
819 method,
820 send_resolver,
821 Some(ExpectedReply(
822 Reply::BasicRecoverOk(resolver.clone()),
823 Box::new(resolver),
824 )),
825 );
826 promise_out.await?;
827 promise.await
828 }
829 fn receive_basic_recover_ok(&self, method: protocol::basic::RecoverOk) -> Result<()> {
830 if !self.status.can_receive_messages() {
831 return Err(Error::InvalidChannelState(self.status.state()));
832 }
833
834 match self.frames.find_expected_reply(self.id, |reply| {
835 matches!(&reply.0, Reply::BasicRecoverOk(..))
836 }) {
837 Some(Reply::BasicRecoverOk(resolver)) => {
838 let res = self.on_basic_recover_ok_received();
839 resolver.swear(res.clone());
840 res
841 }
842 unexpected => self.handle_invalid_contents(
843 format!(
844 "unexpected basic recover-ok received on channel {}, was awaiting for {:?}",
845 self.id, unexpected
846 ),
847 method.get_amqp_class_id(),
848 method.get_amqp_method_id(),
849 ),
850 }
851 }
852 pub async fn basic_nack(
853 &self,
854 delivery_tag: LongLongUInt,
855 options: BasicNackOptions,
856 ) -> Result<()> {
857 if !self.status.connected() {
858 return Err(Error::InvalidChannelState(self.status.state()));
859 }
860
861 let BasicNackOptions { multiple, requeue } = options;
862 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(protocol::basic::Nack {
863 delivery_tag,
864 multiple,
865 requeue,
866 }));
867
868 let (promise, send_resolver) = Promise::new();
869 if level_enabled!(Level::TRACE) {
870 promise.set_marker("basic.nack".into());
871 }
872 self.send_method_frame(method, send_resolver, None);
873 self.on_basic_nack_sent(multiple, delivery_tag);
874 promise.await
875 }
876 fn receive_basic_nack(&self, method: protocol::basic::Nack) -> Result<()> {
877 if !self.status.can_receive_messages() {
878 return Err(Error::InvalidChannelState(self.status.state()));
879 }
880 self.on_basic_nack_received(method)
881 }
882 fn receive_connection_start(&self, method: protocol::connection::Start) -> Result<()> {
883 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
884 if !self.status.can_receive_messages() {
885 return Err(Error::InvalidChannelState(self.status.state()));
886 }
887 self.on_connection_start_received(method)
888 }
889 async fn connection_start_ok(
890 &self,
891 client_properties: FieldTable,
892 mechanism: &str,
893 response: &str,
894 locale: &str,
895 resolver: PromiseResolver<Connection>,
896 connection: Connection,
897 credentials: Credentials,
898 ) -> Result<()> {
899 if !self.status.connected() {
900 return Err(Error::InvalidChannelState(self.status.state()));
901 }
902
903 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::StartOk(
904 protocol::connection::StartOk {
905 client_properties,
906 mechanism: mechanism.into(),
907 response: response.into(),
908 locale: locale.into(),
909 },
910 ));
911
912 let (promise, send_resolver) = Promise::new();
913 if level_enabled!(Level::TRACE) {
914 promise.set_marker("connection.start-ok".into());
915 }
916 self.before_connection_start_ok(resolver, connection, credentials);
917 self.send_method_frame(method, send_resolver, None);
918 promise.await
919 }
920 fn receive_connection_secure(&self, method: protocol::connection::Secure) -> Result<()> {
921 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
922 if !self.status.can_receive_messages() {
923 return Err(Error::InvalidChannelState(self.status.state()));
924 }
925 self.on_connection_secure_received(method)
926 }
927 async fn connection_secure_ok(&self, response: &str) -> Result<()> {
928 if !self.status.connected() {
929 return Err(Error::InvalidChannelState(self.status.state()));
930 }
931
932 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::SecureOk(
933 protocol::connection::SecureOk {
934 response: response.into(),
935 },
936 ));
937
938 let (promise, send_resolver) = Promise::new();
939 if level_enabled!(Level::TRACE) {
940 promise.set_marker("connection.secure-ok".into());
941 }
942 self.send_method_frame(method, send_resolver, None);
943 promise.await
944 }
945 fn receive_connection_tune(&self, method: protocol::connection::Tune) -> Result<()> {
946 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
947 if !self.status.can_receive_messages() {
948 return Err(Error::InvalidChannelState(self.status.state()));
949 }
950 self.on_connection_tune_received(method)
951 }
952 async fn connection_tune_ok(
953 &self,
954 channel_max: ShortUInt,
955 frame_max: LongUInt,
956 heartbeat: ShortUInt,
957 ) -> Result<()> {
958 if !self.status.connected() {
959 return Err(Error::InvalidChannelState(self.status.state()));
960 }
961
962 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::TuneOk(
963 protocol::connection::TuneOk {
964 channel_max,
965 frame_max,
966 heartbeat,
967 },
968 ));
969
970 let (promise, send_resolver) = Promise::new();
971 if level_enabled!(Level::TRACE) {
972 promise.set_marker("connection.tune-ok".into());
973 }
974 self.send_method_frame(method, send_resolver, None);
975 promise.await
976 }
977 pub(crate) async fn connection_open(
978 &self,
979 virtual_host: &str,
980 connection: Connection,
981 conn_resolver: PromiseResolver<Connection>,
982 ) -> Result<()> {
983 if !self.status.connected() {
984 return Err(Error::InvalidChannelState(self.status.state()));
985 }
986
987 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Open(
988 protocol::connection::Open {
989 virtual_host: virtual_host.into(),
990 },
991 ));
992
993 let (promise, send_resolver) = Promise::new();
994 if level_enabled!(Level::TRACE) {
995 promise.set_marker("connection.open".into());
996 }
997 let ((promise, resolver), promise_out) = (Promise::new(), promise);
998 if level_enabled!(Level::TRACE) {
999 promise.set_marker("connection.open.Ok".into());
1000 }
1001 self.before_connection_open(conn_resolver);
1002 self.send_method_frame(
1003 method,
1004 send_resolver,
1005 Some(ExpectedReply(
1006 Reply::ConnectionOpenOk(resolver.clone(), connection),
1007 Box::new(resolver),
1008 )),
1009 );
1010 promise_out.await?;
1011 promise.await
1012 }
1013 fn receive_connection_open_ok(&self, method: protocol::connection::OpenOk) -> Result<()> {
1014 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1015 if !self.status.can_receive_messages() {
1016 return Err(Error::InvalidChannelState(self.status.state()));
1017 }
1018
1019 match self.frames.find_expected_reply(self.id, |reply| {
1020 matches!(&reply.0, Reply::ConnectionOpenOk(..))
1021 }) {
1022 Some(Reply::ConnectionOpenOk(resolver, connection)) => {
1023 let res = self.on_connection_open_ok_received(method, connection);
1024 resolver.swear(res.clone());
1025 res
1026 }
1027 unexpected => self.handle_invalid_contents(
1028 format!(
1029 "unexpected connection open-ok received on channel {}, was awaiting for {:?}",
1030 self.id, unexpected
1031 ),
1032 method.get_amqp_class_id(),
1033 method.get_amqp_method_id(),
1034 ),
1035 }
1036 }
1037 pub(crate) async fn connection_close(
1038 &self,
1039 reply_code: ShortUInt,
1040 reply_text: &str,
1041 class_id: ShortUInt,
1042 method_id: ShortUInt,
1043 ) -> Result<()> {
1044 if !self.status.closing() {
1045 return Err(Error::InvalidChannelState(self.status.state()));
1046 }
1047
1048 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Close(
1049 protocol::connection::Close {
1050 reply_code,
1051 reply_text: reply_text.into(),
1052 class_id,
1053 method_id,
1054 },
1055 ));
1056
1057 let (promise, send_resolver) = Promise::new();
1058 if level_enabled!(Level::TRACE) {
1059 promise.set_marker("connection.close".into());
1060 }
1061 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1062 if level_enabled!(Level::TRACE) {
1063 promise.set_marker("connection.close.Ok".into());
1064 }
1065 self.send_method_frame(
1066 method,
1067 send_resolver,
1068 Some(ExpectedReply(
1069 Reply::ConnectionCloseOk(resolver.clone()),
1070 Box::new(resolver),
1071 )),
1072 );
1073 promise_out.await?;
1074 promise.await
1075 }
1076 fn receive_connection_close(&self, method: protocol::connection::Close) -> Result<()> {
1077 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1078 if !self.status.can_receive_messages() {
1079 return Err(Error::InvalidChannelState(self.status.state()));
1080 }
1081 self.on_connection_close_received(method)
1082 }
1083 pub(crate) async fn connection_close_ok(&self, error: Error) -> Result<()> {
1084 if !self.status.closing() {
1085 return Err(Error::InvalidChannelState(self.status.state()));
1086 }
1087
1088 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(
1089 protocol::connection::CloseOk {},
1090 ));
1091
1092 let (promise, send_resolver) = Promise::new();
1093 if level_enabled!(Level::TRACE) {
1094 promise.set_marker("connection.close-ok".into());
1095 }
1096 self.send_method_frame(method, send_resolver, None);
1097 self.on_connection_close_ok_sent(error);
1098 promise.await
1099 }
1100 fn receive_connection_close_ok(&self, method: protocol::connection::CloseOk) -> Result<()> {
1101 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1102 if !self.status.can_receive_messages() {
1103 return Err(Error::InvalidChannelState(self.status.state()));
1104 }
1105
1106 match self.frames.find_expected_reply(self.id, |reply| {
1107 matches!(&reply.0, Reply::ConnectionCloseOk(..))
1108 }) {
1109 Some(Reply::ConnectionCloseOk(resolver)) => {
1110 let res = self.on_connection_close_ok_received();
1111 resolver.swear(res.clone());
1112 res
1113 }
1114 unexpected => self.handle_invalid_contents(
1115 format!(
1116 "unexpected connection close-ok received on channel {}, was awaiting for {:?}",
1117 self.id, unexpected
1118 ),
1119 method.get_amqp_class_id(),
1120 method.get_amqp_method_id(),
1121 ),
1122 }
1123 }
1124 pub(crate) async fn connection_blocked(&self, reason: &str) -> Result<()> {
1125 if !self.status.connected() {
1126 return Err(Error::InvalidChannelState(self.status.state()));
1127 }
1128
1129 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(
1130 protocol::connection::Blocked {
1131 reason: reason.into(),
1132 },
1133 ));
1134
1135 let (promise, send_resolver) = Promise::new();
1136 if level_enabled!(Level::TRACE) {
1137 promise.set_marker("connection.blocked".into());
1138 }
1139 self.send_method_frame(method, send_resolver, None);
1140 promise.await
1141 }
1142 fn receive_connection_blocked(&self, method: protocol::connection::Blocked) -> Result<()> {
1143 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1144 if !self.status.can_receive_messages() {
1145 return Err(Error::InvalidChannelState(self.status.state()));
1146 }
1147 self.on_connection_blocked_received(method)
1148 }
1149 pub(crate) async fn connection_unblocked(&self) -> Result<()> {
1150 if !self.status.connected() {
1151 return Err(Error::InvalidChannelState(self.status.state()));
1152 }
1153
1154 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(
1155 protocol::connection::Unblocked {},
1156 ));
1157
1158 let (promise, send_resolver) = Promise::new();
1159 if level_enabled!(Level::TRACE) {
1160 promise.set_marker("connection.unblocked".into());
1161 }
1162 self.send_method_frame(method, send_resolver, None);
1163 promise.await
1164 }
1165 fn receive_connection_unblocked(&self, method: protocol::connection::Unblocked) -> Result<()> {
1166 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1167 if !self.status.can_receive_messages() {
1168 return Err(Error::InvalidChannelState(self.status.state()));
1169 }
1170 self.on_connection_unblocked_received(method)
1171 }
1172 pub(crate) async fn connection_update_secret(
1173 &self,
1174 new_secret: &str,
1175 reason: &str,
1176 ) -> Result<()> {
1177 if !self.status.connected() {
1178 return Err(Error::InvalidChannelState(self.status.state()));
1179 }
1180
1181 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecret(
1182 protocol::connection::UpdateSecret {
1183 new_secret: new_secret.into(),
1184 reason: reason.into(),
1185 },
1186 ));
1187
1188 let (promise, send_resolver) = Promise::new();
1189 if level_enabled!(Level::TRACE) {
1190 promise.set_marker("connection.update-secret".into());
1191 }
1192 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1193 if level_enabled!(Level::TRACE) {
1194 promise.set_marker("connection.update-secret.Ok".into());
1195 }
1196 self.send_method_frame(
1197 method,
1198 send_resolver,
1199 Some(ExpectedReply(
1200 Reply::ConnectionUpdateSecretOk(resolver.clone()),
1201 Box::new(resolver),
1202 )),
1203 );
1204 promise_out.await?;
1205 promise.await
1206 }
1207 fn receive_connection_update_secret_ok(
1208 &self,
1209 method: protocol::connection::UpdateSecretOk,
1210 ) -> Result<()> {
1211 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1212 if !self.status.can_receive_messages() {
1213 return Err(Error::InvalidChannelState(self.status.state()));
1214 }
1215
1216 match self.frames.find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::ConnectionUpdateSecretOk(..))){
1217 Some(Reply::ConnectionUpdateSecretOk(resolver)) => {
1218 let res = Ok(())
1219;
1220 resolver.swear(res.clone());
1221 res
1222},
1223 unexpected => {
1224 self.handle_invalid_contents(format!("unexpected connection update-secret-ok received on channel {}, was awaiting for {:?}", self.id, unexpected), method.get_amqp_class_id(), method.get_amqp_method_id())
1225 },
1226 }
1227 }
1228 pub(crate) async fn channel_open(&self, channel: Channel) -> Result<Channel> {
1229 if !self.status.initializing() {
1230 return Err(Error::InvalidChannelState(self.status.state()));
1231 }
1232
1233 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Open(
1234 protocol::channel::Open {},
1235 ));
1236
1237 let (promise, send_resolver) = Promise::new();
1238 if level_enabled!(Level::TRACE) {
1239 promise.set_marker("channel.open".into());
1240 }
1241 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1242 if level_enabled!(Level::TRACE) {
1243 promise.set_marker("channel.open.Ok".into());
1244 }
1245 self.send_method_frame(
1246 method,
1247 send_resolver,
1248 Some(ExpectedReply(
1249 Reply::ChannelOpenOk(resolver.clone(), channel),
1250 Box::new(resolver),
1251 )),
1252 );
1253 promise_out.await?;
1254 promise.await
1255 }
1256 fn receive_channel_open_ok(&self, method: protocol::channel::OpenOk) -> Result<()> {
1257 if !self.status.initializing() {
1258 return Err(Error::InvalidChannelState(self.status.state()));
1259 }
1260
1261 match self.frames.find_expected_reply(self.id, |reply| {
1262 matches!(&reply.0, Reply::ChannelOpenOk(..))
1263 }) {
1264 Some(Reply::ChannelOpenOk(resolver, channel)) => {
1265 self.on_channel_open_ok_received(method, resolver, channel)
1266 }
1267 unexpected => self.handle_invalid_contents(
1268 format!(
1269 "unexpected channel open-ok received on channel {}, was awaiting for {:?}",
1270 self.id, unexpected
1271 ),
1272 method.get_amqp_class_id(),
1273 method.get_amqp_method_id(),
1274 ),
1275 }
1276 }
1277 pub async fn channel_flow(&self, options: ChannelFlowOptions) -> Result<Boolean> {
1278 if !self.status.connected() {
1279 return Err(Error::InvalidChannelState(self.status.state()));
1280 }
1281
1282 let ChannelFlowOptions { active } = options;
1283 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(
1284 protocol::channel::Flow { active },
1285 ));
1286
1287 let (promise, send_resolver) = Promise::new();
1288 if level_enabled!(Level::TRACE) {
1289 promise.set_marker("channel.flow".into());
1290 }
1291 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1292 if level_enabled!(Level::TRACE) {
1293 promise.set_marker("channel.flow.Ok".into());
1294 }
1295 self.send_method_frame(
1296 method,
1297 send_resolver,
1298 Some(ExpectedReply(
1299 Reply::ChannelFlowOk(resolver.clone()),
1300 Box::new(resolver),
1301 )),
1302 );
1303 promise_out.await?;
1304 promise.await
1305 }
1306 fn receive_channel_flow(&self, method: protocol::channel::Flow) -> Result<()> {
1307 if !self.status.can_receive_messages() {
1308 return Err(Error::InvalidChannelState(self.status.state()));
1309 }
1310 self.on_channel_flow_received(method)
1311 }
1312 async fn channel_flow_ok(&self, options: ChannelFlowOkOptions) -> Result<()> {
1313 if !self.status.connected() {
1314 return Err(Error::InvalidChannelState(self.status.state()));
1315 }
1316
1317 let ChannelFlowOkOptions { active } = options;
1318 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(
1319 protocol::channel::FlowOk { active },
1320 ));
1321
1322 let (promise, send_resolver) = Promise::new();
1323 if level_enabled!(Level::TRACE) {
1324 promise.set_marker("channel.flow-ok".into());
1325 }
1326 self.send_method_frame(method, send_resolver, None);
1327 promise.await
1328 }
1329 fn receive_channel_flow_ok(&self, method: protocol::channel::FlowOk) -> Result<()> {
1330 if !self.status.can_receive_messages() {
1331 return Err(Error::InvalidChannelState(self.status.state()));
1332 }
1333
1334 match self.frames.find_expected_reply(self.id, |reply| {
1335 matches!(&reply.0, Reply::ChannelFlowOk(..))
1336 }) {
1337 Some(Reply::ChannelFlowOk(resolver)) => {
1338 self.on_channel_flow_ok_received(method, resolver)
1339 }
1340 unexpected => self.handle_invalid_contents(
1341 format!(
1342 "unexpected channel flow-ok received on channel {}, was awaiting for {:?}",
1343 self.id, unexpected
1344 ),
1345 method.get_amqp_class_id(),
1346 method.get_amqp_method_id(),
1347 ),
1348 }
1349 }
1350 async fn do_channel_close(
1351 &self,
1352 reply_code: ShortUInt,
1353 reply_text: &str,
1354 class_id: ShortUInt,
1355 method_id: ShortUInt,
1356 ) -> Result<()> {
1357 if !self.status.connected() {
1358 return Err(Error::InvalidChannelState(self.status.state()));
1359 }
1360
1361 self.before_channel_close();
1362 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Close(
1363 protocol::channel::Close {
1364 reply_code,
1365 reply_text: reply_text.into(),
1366 class_id,
1367 method_id,
1368 },
1369 ));
1370
1371 let (promise, send_resolver) = Promise::new();
1372 if level_enabled!(Level::TRACE) {
1373 promise.set_marker("channel.close".into());
1374 }
1375 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1376 if level_enabled!(Level::TRACE) {
1377 promise.set_marker("channel.close.Ok".into());
1378 }
1379 self.send_method_frame(
1380 method,
1381 send_resolver,
1382 Some(ExpectedReply(
1383 Reply::ChannelCloseOk(resolver.clone()),
1384 Box::new(resolver),
1385 )),
1386 );
1387 promise_out.await?;
1388 promise.await
1389 }
1390 fn receive_channel_close(&self, method: protocol::channel::Close) -> Result<()> {
1391 if !self.status.can_receive_messages() {
1392 return Err(Error::InvalidChannelState(self.status.state()));
1393 }
1394 self.on_channel_close_received(method)
1395 }
1396 async fn channel_close_ok(&self, error: Option<Error>) -> Result<()> {
1397 if !self.status.closing() {
1398 return Err(Error::InvalidChannelState(self.status.state()));
1399 }
1400
1401 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(
1402 protocol::channel::CloseOk {},
1403 ));
1404
1405 let (promise, send_resolver) = Promise::new();
1406 if level_enabled!(Level::TRACE) {
1407 promise.set_marker("channel.close-ok".into());
1408 }
1409 self.send_method_frame(method, send_resolver, None);
1410 self.on_channel_close_ok_sent(error);
1411 promise.await
1412 }
1413 fn receive_channel_close_ok(&self, method: protocol::channel::CloseOk) -> Result<()> {
1414 if !self.status.can_receive_messages() {
1415 return Err(Error::InvalidChannelState(self.status.state()));
1416 }
1417
1418 match self.next_expected_close_ok_reply() {
1419 Some(Reply::ChannelCloseOk(resolver)) => {
1420 let res = self.on_channel_close_ok_received();
1421 resolver.swear(res.clone());
1422 res
1423 }
1424 unexpected => self.handle_invalid_contents(
1425 format!(
1426 "unexpected channel close-ok received on channel {}, was awaiting for {:?}",
1427 self.id, unexpected
1428 ),
1429 method.get_amqp_class_id(),
1430 method.get_amqp_method_id(),
1431 ),
1432 }
1433 }
1434 pub async fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Result<()> {
1435 if !self.status.connected() {
1436 return Err(Error::InvalidChannelState(self.status.state()));
1437 }
1438
1439 let AccessRequestOptions {
1440 exclusive,
1441 passive,
1442 active,
1443 write,
1444 read,
1445 } = options;
1446 let method = AMQPClass::Access(protocol::access::AMQPMethod::Request(
1447 protocol::access::Request {
1448 realm: realm.into(),
1449 exclusive,
1450 passive,
1451 active,
1452 write,
1453 read,
1454 },
1455 ));
1456
1457 let (promise, send_resolver) = Promise::new();
1458 if level_enabled!(Level::TRACE) {
1459 promise.set_marker("access.request".into());
1460 }
1461 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1462 if level_enabled!(Level::TRACE) {
1463 promise.set_marker("access.request.Ok".into());
1464 }
1465 self.send_method_frame(
1466 method,
1467 send_resolver,
1468 Some(ExpectedReply(
1469 Reply::AccessRequestOk(resolver.clone()),
1470 Box::new(resolver),
1471 )),
1472 );
1473 promise_out.await?;
1474 promise.await
1475 }
1476 fn receive_access_request_ok(&self, method: protocol::access::RequestOk) -> Result<()> {
1477 if !self.status.can_receive_messages() {
1478 return Err(Error::InvalidChannelState(self.status.state()));
1479 }
1480
1481 match self.frames.find_expected_reply(self.id, |reply| {
1482 matches!(&reply.0, Reply::AccessRequestOk(..))
1483 }) {
1484 Some(Reply::AccessRequestOk(resolver)) => {
1485 let res = self.on_access_request_ok_received(method);
1486 resolver.swear(res.clone());
1487 res
1488 }
1489 unexpected => self.handle_invalid_contents(
1490 format!(
1491 "unexpected access request-ok received on channel {}, was awaiting for {:?}",
1492 self.id, unexpected
1493 ),
1494 method.get_amqp_class_id(),
1495 method.get_amqp_method_id(),
1496 ),
1497 }
1498 }
1499 async fn do_exchange_declare(
1500 &self,
1501 exchange: &str,
1502 kind: &str,
1503 options: ExchangeDeclareOptions,
1504 arguments: FieldTable,
1505 exchange_kind: ExchangeKind,
1506 ) -> Result<()> {
1507 if !self.status.connected() {
1508 return Err(Error::InvalidChannelState(self.status.state()));
1509 }
1510
1511 let creation_arguments = arguments.clone();
1512 let ExchangeDeclareOptions {
1513 passive,
1514 durable,
1515 auto_delete,
1516 internal,
1517 nowait,
1518 } = options;
1519 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Declare(
1520 protocol::exchange::Declare {
1521 exchange: exchange.into(),
1522 kind: kind.into(),
1523 passive,
1524 durable,
1525 auto_delete,
1526 internal,
1527 nowait,
1528 arguments,
1529 },
1530 ));
1531
1532 let (promise, send_resolver) = Promise::new();
1533 if level_enabled!(Level::TRACE) {
1534 promise.set_marker("exchange.declare".into());
1535 }
1536 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1537 if level_enabled!(Level::TRACE) {
1538 promise.set_marker("exchange.declare.Ok".into());
1539 }
1540 self.send_method_frame(
1541 method,
1542 send_resolver,
1543 Some(ExpectedReply(
1544 Reply::ExchangeDeclareOk(
1545 resolver.clone(),
1546 exchange.into(),
1547 exchange_kind,
1548 options,
1549 creation_arguments,
1550 ),
1551 Box::new(resolver),
1552 )),
1553 );
1554 if nowait {
1555 self.receive_exchange_declare_ok(protocol::exchange::DeclareOk {})?;
1556 }
1557 promise_out.await?;
1558 promise.await
1559 }
1560 fn receive_exchange_declare_ok(&self, method: protocol::exchange::DeclareOk) -> Result<()> {
1561 if !self.status.can_receive_messages() {
1562 return Err(Error::InvalidChannelState(self.status.state()));
1563 }
1564
1565 match self.frames.find_expected_reply(self.id, |reply| {
1566 matches!(&reply.0, Reply::ExchangeDeclareOk(..))
1567 }) {
1568 Some(Reply::ExchangeDeclareOk(
1569 resolver,
1570 exchange,
1571 exchange_kind,
1572 options,
1573 creation_arguments,
1574 )) => self.on_exchange_declare_ok_received(
1575 resolver,
1576 exchange,
1577 exchange_kind,
1578 options,
1579 creation_arguments,
1580 ),
1581 unexpected => self.handle_invalid_contents(
1582 format!(
1583 "unexpected exchange declare-ok received on channel {}, was awaiting for {:?}",
1584 self.id, unexpected
1585 ),
1586 method.get_amqp_class_id(),
1587 method.get_amqp_method_id(),
1588 ),
1589 }
1590 }
1591 pub async fn exchange_delete(
1593 &self,
1594 exchange: &str,
1595 options: ExchangeDeleteOptions,
1596 ) -> Result<()> {
1597 if !self.status.connected() {
1598 return Err(Error::InvalidChannelState(self.status.state()));
1599 }
1600
1601 let ExchangeDeleteOptions { if_unused, nowait } = options;
1602 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Delete(
1603 protocol::exchange::Delete {
1604 exchange: exchange.into(),
1605 if_unused,
1606 nowait,
1607 },
1608 ));
1609
1610 let (promise, send_resolver) = Promise::new();
1611 if level_enabled!(Level::TRACE) {
1612 promise.set_marker("exchange.delete".into());
1613 }
1614 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1615 if level_enabled!(Level::TRACE) {
1616 promise.set_marker("exchange.delete.Ok".into());
1617 }
1618 self.send_method_frame(
1619 method,
1620 send_resolver,
1621 Some(ExpectedReply(
1622 Reply::ExchangeDeleteOk(resolver.clone(), exchange.into()),
1623 Box::new(resolver),
1624 )),
1625 );
1626 if nowait {
1627 self.receive_exchange_delete_ok(protocol::exchange::DeleteOk {})?;
1628 }
1629 promise_out.await?;
1630 promise.await
1631 }
1632 fn receive_exchange_delete_ok(&self, method: protocol::exchange::DeleteOk) -> Result<()> {
1633 if !self.status.can_receive_messages() {
1634 return Err(Error::InvalidChannelState(self.status.state()));
1635 }
1636
1637 match self.frames.find_expected_reply(self.id, |reply| {
1638 matches!(&reply.0, Reply::ExchangeDeleteOk(..))
1639 }) {
1640 Some(Reply::ExchangeDeleteOk(resolver, exchange)) => {
1641 let res = self.on_exchange_delete_ok_received(exchange);
1642 resolver.swear(res.clone());
1643 res
1644 }
1645 unexpected => self.handle_invalid_contents(
1646 format!(
1647 "unexpected exchange delete-ok received on channel {}, was awaiting for {:?}",
1648 self.id, unexpected
1649 ),
1650 method.get_amqp_class_id(),
1651 method.get_amqp_method_id(),
1652 ),
1653 }
1654 }
1655 pub async fn exchange_bind(
1656 &self,
1657 destination: &str,
1658 source: &str,
1659 routing_key: &str,
1660 options: ExchangeBindOptions,
1661 arguments: FieldTable,
1662 ) -> Result<()> {
1663 if !self.status.connected() {
1664 return Err(Error::InvalidChannelState(self.status.state()));
1665 }
1666
1667 let creation_arguments = arguments.clone();
1668 let ExchangeBindOptions { nowait } = options;
1669 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Bind(
1670 protocol::exchange::Bind {
1671 destination: destination.into(),
1672 source: source.into(),
1673 routing_key: routing_key.into(),
1674 nowait,
1675 arguments,
1676 },
1677 ));
1678
1679 let (promise, send_resolver) = Promise::new();
1680 if level_enabled!(Level::TRACE) {
1681 promise.set_marker("exchange.bind".into());
1682 }
1683 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1684 if level_enabled!(Level::TRACE) {
1685 promise.set_marker("exchange.bind.Ok".into());
1686 }
1687 self.send_method_frame(
1688 method,
1689 send_resolver,
1690 Some(ExpectedReply(
1691 Reply::ExchangeBindOk(
1692 resolver.clone(),
1693 destination.into(),
1694 source.into(),
1695 routing_key.into(),
1696 creation_arguments,
1697 ),
1698 Box::new(resolver),
1699 )),
1700 );
1701 if nowait {
1702 self.receive_exchange_bind_ok(protocol::exchange::BindOk {})?;
1703 }
1704 promise_out.await?;
1705 promise.await
1706 }
1707 fn receive_exchange_bind_ok(&self, method: protocol::exchange::BindOk) -> Result<()> {
1708 if !self.status.can_receive_messages() {
1709 return Err(Error::InvalidChannelState(self.status.state()));
1710 }
1711
1712 match self.frames.find_expected_reply(self.id, |reply| {
1713 matches!(&reply.0, Reply::ExchangeBindOk(..))
1714 }) {
1715 Some(Reply::ExchangeBindOk(
1716 resolver,
1717 destination,
1718 source,
1719 routing_key,
1720 creation_arguments,
1721 )) => {
1722 let res = self.on_exchange_bind_ok_received(
1723 destination,
1724 source,
1725 routing_key,
1726 creation_arguments,
1727 );
1728 resolver.swear(res.clone());
1729 res
1730 }
1731 unexpected => self.handle_invalid_contents(
1732 format!(
1733 "unexpected exchange bind-ok received on channel {}, was awaiting for {:?}",
1734 self.id, unexpected
1735 ),
1736 method.get_amqp_class_id(),
1737 method.get_amqp_method_id(),
1738 ),
1739 }
1740 }
1741 pub async fn exchange_unbind(
1742 &self,
1743 destination: &str,
1744 source: &str,
1745 routing_key: &str,
1746 options: ExchangeUnbindOptions,
1747 arguments: FieldTable,
1748 ) -> Result<()> {
1749 if !self.status.connected() {
1750 return Err(Error::InvalidChannelState(self.status.state()));
1751 }
1752
1753 let creation_arguments = arguments.clone();
1754 let ExchangeUnbindOptions { nowait } = options;
1755 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Unbind(
1756 protocol::exchange::Unbind {
1757 destination: destination.into(),
1758 source: source.into(),
1759 routing_key: routing_key.into(),
1760 nowait,
1761 arguments,
1762 },
1763 ));
1764
1765 let (promise, send_resolver) = Promise::new();
1766 if level_enabled!(Level::TRACE) {
1767 promise.set_marker("exchange.unbind".into());
1768 }
1769 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1770 if level_enabled!(Level::TRACE) {
1771 promise.set_marker("exchange.unbind.Ok".into());
1772 }
1773 self.send_method_frame(
1774 method,
1775 send_resolver,
1776 Some(ExpectedReply(
1777 Reply::ExchangeUnbindOk(
1778 resolver.clone(),
1779 destination.into(),
1780 source.into(),
1781 routing_key.into(),
1782 creation_arguments,
1783 ),
1784 Box::new(resolver),
1785 )),
1786 );
1787 if nowait {
1788 self.receive_exchange_unbind_ok(protocol::exchange::UnbindOk {})?;
1789 }
1790 promise_out.await?;
1791 promise.await
1792 }
1793 fn receive_exchange_unbind_ok(&self, method: protocol::exchange::UnbindOk) -> Result<()> {
1794 if !self.status.can_receive_messages() {
1795 return Err(Error::InvalidChannelState(self.status.state()));
1796 }
1797
1798 match self.frames.find_expected_reply(self.id, |reply| {
1799 matches!(&reply.0, Reply::ExchangeUnbindOk(..))
1800 }) {
1801 Some(Reply::ExchangeUnbindOk(
1802 resolver,
1803 destination,
1804 source,
1805 routing_key,
1806 creation_arguments,
1807 )) => {
1808 let res = self.on_exchange_unbind_ok_received(
1809 destination,
1810 source,
1811 routing_key,
1812 creation_arguments,
1813 );
1814 resolver.swear(res.clone());
1815 res
1816 }
1817 unexpected => self.handle_invalid_contents(
1818 format!(
1819 "unexpected exchange unbind-ok received on channel {}, was awaiting for {:?}",
1820 self.id, unexpected
1821 ),
1822 method.get_amqp_class_id(),
1823 method.get_amqp_method_id(),
1824 ),
1825 }
1826 }
1827 pub async fn queue_declare(
1828 &self,
1829 queue: &str,
1830 options: QueueDeclareOptions,
1831 arguments: FieldTable,
1832 ) -> Result<Queue> {
1833 if !self.status.connected() {
1834 return Err(Error::InvalidChannelState(self.status.state()));
1835 }
1836
1837 let creation_arguments = arguments.clone();
1838 let QueueDeclareOptions {
1839 passive,
1840 durable,
1841 exclusive,
1842 auto_delete,
1843 nowait,
1844 } = options;
1845 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Declare(
1846 protocol::queue::Declare {
1847 queue: queue.into(),
1848 passive,
1849 durable,
1850 exclusive,
1851 auto_delete,
1852 nowait,
1853 arguments,
1854 },
1855 ));
1856
1857 let (promise, send_resolver) = Promise::new();
1858 if level_enabled!(Level::TRACE) {
1859 promise.set_marker("queue.declare".into());
1860 }
1861 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1862 if level_enabled!(Level::TRACE) {
1863 promise.set_marker("queue.declare.Ok".into());
1864 }
1865 self.send_method_frame(
1866 method,
1867 send_resolver,
1868 Some(ExpectedReply(
1869 Reply::QueueDeclareOk(resolver.clone(), options, creation_arguments),
1870 Box::new(resolver),
1871 )),
1872 );
1873 if nowait {
1874 self.receive_queue_declare_ok(protocol::queue::DeclareOk {
1875 queue: queue.into(),
1876 ..Default::default()
1877 })?;
1878 }
1879 promise_out.await?;
1880 promise.await
1881 }
1882 fn receive_queue_declare_ok(&self, method: protocol::queue::DeclareOk) -> Result<()> {
1883 if !self.status.can_receive_messages() {
1884 return Err(Error::InvalidChannelState(self.status.state()));
1885 }
1886
1887 match self.frames.find_expected_reply(self.id, |reply| {
1888 matches!(&reply.0, Reply::QueueDeclareOk(..))
1889 }) {
1890 Some(Reply::QueueDeclareOk(resolver, options, creation_arguments)) => {
1891 self.on_queue_declare_ok_received(method, resolver, options, creation_arguments)
1892 }
1893 unexpected => self.handle_invalid_contents(
1894 format!(
1895 "unexpected queue declare-ok received on channel {}, was awaiting for {:?}",
1896 self.id, unexpected
1897 ),
1898 method.get_amqp_class_id(),
1899 method.get_amqp_method_id(),
1900 ),
1901 }
1902 }
1903 pub async fn queue_bind(
1904 &self,
1905 queue: &str,
1906 exchange: &str,
1907 routing_key: &str,
1908 options: QueueBindOptions,
1909 arguments: FieldTable,
1910 ) -> Result<()> {
1911 if !self.status.connected() {
1912 return Err(Error::InvalidChannelState(self.status.state()));
1913 }
1914
1915 let creation_arguments = arguments.clone();
1916 let QueueBindOptions { nowait } = options;
1917 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Bind(protocol::queue::Bind {
1918 queue: queue.into(),
1919 exchange: exchange.into(),
1920 routing_key: routing_key.into(),
1921 nowait,
1922 arguments,
1923 }));
1924
1925 let (promise, send_resolver) = Promise::new();
1926 if level_enabled!(Level::TRACE) {
1927 promise.set_marker("queue.bind".into());
1928 }
1929 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1930 if level_enabled!(Level::TRACE) {
1931 promise.set_marker("queue.bind.Ok".into());
1932 }
1933 self.send_method_frame(
1934 method,
1935 send_resolver,
1936 Some(ExpectedReply(
1937 Reply::QueueBindOk(
1938 resolver.clone(),
1939 queue.into(),
1940 exchange.into(),
1941 routing_key.into(),
1942 creation_arguments,
1943 ),
1944 Box::new(resolver),
1945 )),
1946 );
1947 if nowait {
1948 self.receive_queue_bind_ok(protocol::queue::BindOk {})?;
1949 }
1950 promise_out.await?;
1951 promise.await
1952 }
1953 fn receive_queue_bind_ok(&self, method: protocol::queue::BindOk) -> Result<()> {
1954 if !self.status.can_receive_messages() {
1955 return Err(Error::InvalidChannelState(self.status.state()));
1956 }
1957
1958 match self
1959 .frames
1960 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueueBindOk(..)))
1961 {
1962 Some(Reply::QueueBindOk(
1963 resolver,
1964 queue,
1965 exchange,
1966 routing_key,
1967 creation_arguments,
1968 )) => {
1969 let res = self.on_queue_bind_ok_received(
1970 queue,
1971 exchange,
1972 routing_key,
1973 creation_arguments,
1974 );
1975 resolver.swear(res.clone());
1976 res
1977 }
1978 unexpected => self.handle_invalid_contents(
1979 format!(
1980 "unexpected queue bind-ok received on channel {}, was awaiting for {:?}",
1981 self.id, unexpected
1982 ),
1983 method.get_amqp_class_id(),
1984 method.get_amqp_method_id(),
1985 ),
1986 }
1987 }
1988 pub async fn queue_purge(
1989 &self,
1990 queue: &str,
1991 options: QueuePurgeOptions,
1992 ) -> Result<MessageCount> {
1993 if !self.status.connected() {
1994 return Err(Error::InvalidChannelState(self.status.state()));
1995 }
1996
1997 let QueuePurgeOptions { nowait } = options;
1998 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Purge(protocol::queue::Purge {
1999 queue: queue.into(),
2000 nowait,
2001 }));
2002
2003 let (promise, send_resolver) = Promise::new();
2004 if level_enabled!(Level::TRACE) {
2005 promise.set_marker("queue.purge".into());
2006 }
2007 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2008 if level_enabled!(Level::TRACE) {
2009 promise.set_marker("queue.purge.Ok".into());
2010 }
2011 self.send_method_frame(
2012 method,
2013 send_resolver,
2014 Some(ExpectedReply(
2015 Reply::QueuePurgeOk(resolver.clone()),
2016 Box::new(resolver),
2017 )),
2018 );
2019 promise_out.await?;
2020 promise.await
2021 }
2022 fn receive_queue_purge_ok(&self, method: protocol::queue::PurgeOk) -> Result<()> {
2023 if !self.status.can_receive_messages() {
2024 return Err(Error::InvalidChannelState(self.status.state()));
2025 }
2026
2027 match self
2028 .frames
2029 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueuePurgeOk(..)))
2030 {
2031 Some(Reply::QueuePurgeOk(resolver)) => {
2032 self.on_queue_purge_ok_received(method, resolver)
2033 }
2034 unexpected => self.handle_invalid_contents(
2035 format!(
2036 "unexpected queue purge-ok received on channel {}, was awaiting for {:?}",
2037 self.id, unexpected
2038 ),
2039 method.get_amqp_class_id(),
2040 method.get_amqp_method_id(),
2041 ),
2042 }
2043 }
2044 pub async fn queue_delete(
2045 &self,
2046 queue: &str,
2047 options: QueueDeleteOptions,
2048 ) -> Result<MessageCount> {
2049 if !self.status.connected() {
2050 return Err(Error::InvalidChannelState(self.status.state()));
2051 }
2052
2053 let QueueDeleteOptions {
2054 if_unused,
2055 if_empty,
2056 nowait,
2057 } = options;
2058 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Delete(
2059 protocol::queue::Delete {
2060 queue: queue.into(),
2061 if_unused,
2062 if_empty,
2063 nowait,
2064 },
2065 ));
2066
2067 let (promise, send_resolver) = Promise::new();
2068 if level_enabled!(Level::TRACE) {
2069 promise.set_marker("queue.delete".into());
2070 }
2071 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2072 if level_enabled!(Level::TRACE) {
2073 promise.set_marker("queue.delete.Ok".into());
2074 }
2075 self.send_method_frame(
2076 method,
2077 send_resolver,
2078 Some(ExpectedReply(
2079 Reply::QueueDeleteOk(resolver.clone(), queue.into()),
2080 Box::new(resolver),
2081 )),
2082 );
2083 if nowait {
2084 self.receive_queue_delete_ok(protocol::queue::DeleteOk {
2085 ..Default::default()
2086 })?;
2087 }
2088 promise_out.await?;
2089 promise.await
2090 }
2091 fn receive_queue_delete_ok(&self, method: protocol::queue::DeleteOk) -> Result<()> {
2092 if !self.status.can_receive_messages() {
2093 return Err(Error::InvalidChannelState(self.status.state()));
2094 }
2095
2096 match self.frames.find_expected_reply(self.id, |reply| {
2097 matches!(&reply.0, Reply::QueueDeleteOk(..))
2098 }) {
2099 Some(Reply::QueueDeleteOk(resolver, queue)) => {
2100 self.on_queue_delete_ok_received(method, resolver, queue)
2101 }
2102 unexpected => self.handle_invalid_contents(
2103 format!(
2104 "unexpected queue delete-ok received on channel {}, was awaiting for {:?}",
2105 self.id, unexpected
2106 ),
2107 method.get_amqp_class_id(),
2108 method.get_amqp_method_id(),
2109 ),
2110 }
2111 }
2112 pub async fn queue_unbind(
2113 &self,
2114 queue: &str,
2115 exchange: &str,
2116 routing_key: &str,
2117 arguments: FieldTable,
2118 ) -> Result<()> {
2119 if !self.status.connected() {
2120 return Err(Error::InvalidChannelState(self.status.state()));
2121 }
2122
2123 let creation_arguments = arguments.clone();
2124 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Unbind(
2125 protocol::queue::Unbind {
2126 queue: queue.into(),
2127 exchange: exchange.into(),
2128 routing_key: routing_key.into(),
2129 arguments,
2130 },
2131 ));
2132
2133 let (promise, send_resolver) = Promise::new();
2134 if level_enabled!(Level::TRACE) {
2135 promise.set_marker("queue.unbind".into());
2136 }
2137 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2138 if level_enabled!(Level::TRACE) {
2139 promise.set_marker("queue.unbind.Ok".into());
2140 }
2141 self.send_method_frame(
2142 method,
2143 send_resolver,
2144 Some(ExpectedReply(
2145 Reply::QueueUnbindOk(
2146 resolver.clone(),
2147 queue.into(),
2148 exchange.into(),
2149 routing_key.into(),
2150 creation_arguments,
2151 ),
2152 Box::new(resolver),
2153 )),
2154 );
2155 promise_out.await?;
2156 promise.await
2157 }
2158 fn receive_queue_unbind_ok(&self, method: protocol::queue::UnbindOk) -> Result<()> {
2159 if !self.status.can_receive_messages() {
2160 return Err(Error::InvalidChannelState(self.status.state()));
2161 }
2162
2163 match self.frames.find_expected_reply(self.id, |reply| {
2164 matches!(&reply.0, Reply::QueueUnbindOk(..))
2165 }) {
2166 Some(Reply::QueueUnbindOk(
2167 resolver,
2168 queue,
2169 exchange,
2170 routing_key,
2171 creation_arguments,
2172 )) => {
2173 let res = self.on_queue_unbind_ok_received(
2174 queue,
2175 exchange,
2176 routing_key,
2177 creation_arguments,
2178 );
2179 resolver.swear(res.clone());
2180 res
2181 }
2182 unexpected => self.handle_invalid_contents(
2183 format!(
2184 "unexpected queue unbind-ok received on channel {}, was awaiting for {:?}",
2185 self.id, unexpected
2186 ),
2187 method.get_amqp_class_id(),
2188 method.get_amqp_method_id(),
2189 ),
2190 }
2191 }
2192 pub async fn tx_select(&self) -> Result<()> {
2193 if !self.status.connected() {
2194 return Err(Error::InvalidChannelState(self.status.state()));
2195 }
2196
2197 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Select(protocol::tx::Select {}));
2198
2199 let (promise, send_resolver) = Promise::new();
2200 if level_enabled!(Level::TRACE) {
2201 promise.set_marker("tx.select".into());
2202 }
2203 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2204 if level_enabled!(Level::TRACE) {
2205 promise.set_marker("tx.select.Ok".into());
2206 }
2207 self.send_method_frame(
2208 method,
2209 send_resolver,
2210 Some(ExpectedReply(
2211 Reply::TxSelectOk(resolver.clone()),
2212 Box::new(resolver),
2213 )),
2214 );
2215 promise_out.await?;
2216 promise.await
2217 }
2218 fn receive_tx_select_ok(&self, method: protocol::tx::SelectOk) -> Result<()> {
2219 if !self.status.can_receive_messages() {
2220 return Err(Error::InvalidChannelState(self.status.state()));
2221 }
2222
2223 match self
2224 .frames
2225 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxSelectOk(..)))
2226 {
2227 Some(Reply::TxSelectOk(resolver)) => {
2228 let res = Ok(());
2229 resolver.swear(res.clone());
2230 res
2231 }
2232 unexpected => self.handle_invalid_contents(
2233 format!(
2234 "unexpected tx select-ok received on channel {}, was awaiting for {:?}",
2235 self.id, unexpected
2236 ),
2237 method.get_amqp_class_id(),
2238 method.get_amqp_method_id(),
2239 ),
2240 }
2241 }
2242 pub async fn tx_commit(&self) -> Result<()> {
2243 if !self.status.connected() {
2244 return Err(Error::InvalidChannelState(self.status.state()));
2245 }
2246
2247 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Commit(protocol::tx::Commit {}));
2248
2249 let (promise, send_resolver) = Promise::new();
2250 if level_enabled!(Level::TRACE) {
2251 promise.set_marker("tx.commit".into());
2252 }
2253 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2254 if level_enabled!(Level::TRACE) {
2255 promise.set_marker("tx.commit.Ok".into());
2256 }
2257 self.send_method_frame(
2258 method,
2259 send_resolver,
2260 Some(ExpectedReply(
2261 Reply::TxCommitOk(resolver.clone()),
2262 Box::new(resolver),
2263 )),
2264 );
2265 promise_out.await?;
2266 promise.await
2267 }
2268 fn receive_tx_commit_ok(&self, method: protocol::tx::CommitOk) -> Result<()> {
2269 if !self.status.can_receive_messages() {
2270 return Err(Error::InvalidChannelState(self.status.state()));
2271 }
2272
2273 match self
2274 .frames
2275 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxCommitOk(..)))
2276 {
2277 Some(Reply::TxCommitOk(resolver)) => {
2278 let res = Ok(());
2279 resolver.swear(res.clone());
2280 res
2281 }
2282 unexpected => self.handle_invalid_contents(
2283 format!(
2284 "unexpected tx commit-ok received on channel {}, was awaiting for {:?}",
2285 self.id, unexpected
2286 ),
2287 method.get_amqp_class_id(),
2288 method.get_amqp_method_id(),
2289 ),
2290 }
2291 }
2292 pub async fn tx_rollback(&self) -> Result<()> {
2293 if !self.status.connected() {
2294 return Err(Error::InvalidChannelState(self.status.state()));
2295 }
2296
2297 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Rollback(
2298 protocol::tx::Rollback {},
2299 ));
2300
2301 let (promise, send_resolver) = Promise::new();
2302 if level_enabled!(Level::TRACE) {
2303 promise.set_marker("tx.rollback".into());
2304 }
2305 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2306 if level_enabled!(Level::TRACE) {
2307 promise.set_marker("tx.rollback.Ok".into());
2308 }
2309 self.send_method_frame(
2310 method,
2311 send_resolver,
2312 Some(ExpectedReply(
2313 Reply::TxRollbackOk(resolver.clone()),
2314 Box::new(resolver),
2315 )),
2316 );
2317 promise_out.await?;
2318 promise.await
2319 }
2320 fn receive_tx_rollback_ok(&self, method: protocol::tx::RollbackOk) -> Result<()> {
2321 if !self.status.can_receive_messages() {
2322 return Err(Error::InvalidChannelState(self.status.state()));
2323 }
2324
2325 match self
2326 .frames
2327 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxRollbackOk(..)))
2328 {
2329 Some(Reply::TxRollbackOk(resolver)) => {
2330 let res = Ok(());
2331 resolver.swear(res.clone());
2332 res
2333 }
2334 unexpected => self.handle_invalid_contents(
2335 format!(
2336 "unexpected tx rollback-ok received on channel {}, was awaiting for {:?}",
2337 self.id, unexpected
2338 ),
2339 method.get_amqp_class_id(),
2340 method.get_amqp_method_id(),
2341 ),
2342 }
2343 }
2344 pub async fn confirm_select(&self, options: ConfirmSelectOptions) -> Result<()> {
2345 if !self.status.connected() {
2346 return Err(Error::InvalidChannelState(self.status.state()));
2347 }
2348
2349 let ConfirmSelectOptions { nowait } = options;
2350 let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::Select(
2351 protocol::confirm::Select { nowait },
2352 ));
2353
2354 let (promise, send_resolver) = Promise::new();
2355 if level_enabled!(Level::TRACE) {
2356 promise.set_marker("confirm.select".into());
2357 }
2358 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2359 if level_enabled!(Level::TRACE) {
2360 promise.set_marker("confirm.select.Ok".into());
2361 }
2362 self.send_method_frame(
2363 method,
2364 send_resolver,
2365 Some(ExpectedReply(
2366 Reply::ConfirmSelectOk(resolver.clone()),
2367 Box::new(resolver),
2368 )),
2369 );
2370 promise_out.await?;
2371 promise.await
2372 }
2373 fn receive_confirm_select_ok(&self, method: protocol::confirm::SelectOk) -> Result<()> {
2374 if !self.status.can_receive_messages() {
2375 return Err(Error::InvalidChannelState(self.status.state()));
2376 }
2377
2378 match self.frames.find_expected_reply(self.id, |reply| {
2379 matches!(&reply.0, Reply::ConfirmSelectOk(..))
2380 }) {
2381 Some(Reply::ConfirmSelectOk(resolver)) => {
2382 let res = self.on_confirm_select_ok_received();
2383 resolver.swear(res.clone());
2384 res
2385 }
2386 unexpected => self.handle_invalid_contents(
2387 format!(
2388 "unexpected confirm select-ok received on channel {}, was awaiting for {:?}",
2389 self.id, unexpected
2390 ),
2391 method.get_amqp_class_id(),
2392 method.get_amqp_method_id(),
2393 ),
2394 }
2395 }
2396}