1pub mod options {
2 use super::*;
3
4 #[derive(Copy, Clone, Debug, Default, PartialEq)]
5 pub struct BasicQosOptions {
6 pub global: Boolean,
7 }
8
9 #[derive(Copy, Clone, Debug, Default, PartialEq)]
10 pub struct BasicConsumeOptions {
11 pub no_local: Boolean,
12 pub no_ack: Boolean,
13 pub exclusive: Boolean,
14 pub nowait: Boolean,
15 }
16
17 #[derive(Copy, Clone, Debug, Default, PartialEq)]
18 pub struct BasicCancelOptions {
19 pub nowait: Boolean,
20 }
21
22 #[derive(Copy, Clone, Debug, Default, PartialEq)]
23 pub struct BasicPublishOptions {
24 pub mandatory: Boolean,
25 pub immediate: Boolean,
26 }
27
28 #[derive(Copy, Clone, Debug, Default, PartialEq)]
29 pub struct BasicDeliverOptions {
30 pub redelivered: Boolean,
31 }
32
33 #[derive(Copy, Clone, Debug, Default, PartialEq)]
34 pub struct BasicGetOptions {
35 pub no_ack: Boolean,
36 }
37
38 #[derive(Copy, Clone, Debug, Default, PartialEq)]
39 pub struct BasicGetOkOptions {
40 pub redelivered: Boolean,
41 }
42
43 #[derive(Copy, Clone, Debug, Default, PartialEq)]
44 pub struct BasicAckOptions {
45 pub multiple: Boolean,
46 }
47
48 #[derive(Copy, Clone, Debug, Default, PartialEq)]
49 pub struct BasicRejectOptions {
50 pub requeue: Boolean,
51 }
52
53 #[derive(Copy, Clone, Debug, Default, PartialEq)]
54 pub struct BasicRecoverAsyncOptions {
55 pub requeue: Boolean,
56 }
57
58 #[derive(Copy, Clone, Debug, Default, PartialEq)]
59 pub struct BasicRecoverOptions {
60 pub requeue: Boolean,
61 }
62
63 #[derive(Copy, Clone, Debug, Default, PartialEq)]
64 pub struct BasicNackOptions {
65 pub multiple: Boolean,
66 pub requeue: Boolean,
67 }
68
69 #[derive(Copy, Clone, Debug, Default, PartialEq)]
70 pub struct ChannelFlowOptions {
71 pub active: Boolean,
72 }
73
74 #[derive(Copy, Clone, Debug, Default, PartialEq)]
75 pub struct ChannelFlowOkOptions {
76 pub active: Boolean,
77 }
78
79 #[derive(Copy, Clone, Debug, Default, PartialEq)]
80 pub struct AccessRequestOptions {
81 pub exclusive: Boolean,
82 pub passive: Boolean,
83 pub active: Boolean,
84 pub write: Boolean,
85 pub read: Boolean,
86 }
87
88 #[derive(Copy, Clone, Debug, Default, PartialEq)]
89 pub struct ExchangeDeclareOptions {
90 pub passive: Boolean,
91 pub durable: Boolean,
92 pub auto_delete: Boolean,
93 pub internal: Boolean,
94 pub nowait: Boolean,
95 }
96
97 #[derive(Copy, Clone, Debug, Default, PartialEq)]
98 pub struct ExchangeDeleteOptions {
99 pub if_unused: Boolean,
100 pub nowait: Boolean,
101 }
102
103 #[derive(Copy, Clone, Debug, Default, PartialEq)]
104 pub struct ExchangeBindOptions {
105 pub nowait: Boolean,
106 }
107
108 #[derive(Copy, Clone, Debug, Default, PartialEq)]
109 pub struct ExchangeUnbindOptions {
110 pub nowait: Boolean,
111 }
112
113 #[derive(Copy, Clone, Debug, Default, PartialEq)]
114 pub struct QueueDeclareOptions {
115 pub passive: Boolean,
116 pub durable: Boolean,
117 pub exclusive: Boolean,
118 pub auto_delete: Boolean,
119 pub nowait: Boolean,
120 }
121
122 #[derive(Copy, Clone, Debug, Default, PartialEq)]
123 pub struct QueueBindOptions {
124 pub nowait: Boolean,
125 }
126
127 #[derive(Copy, Clone, Debug, Default, PartialEq)]
128 pub struct QueuePurgeOptions {
129 pub nowait: Boolean,
130 }
131
132 #[derive(Copy, Clone, Debug, Default, PartialEq)]
133 pub struct QueueDeleteOptions {
134 pub if_unused: Boolean,
135 pub if_empty: Boolean,
136 pub nowait: Boolean,
137 }
138
139 #[derive(Copy, Clone, Debug, Default, PartialEq)]
140 pub struct ConfirmSelectOptions {
141 pub nowait: Boolean,
142 }
143}
144
145use options::*;
146
147#[derive(Debug)]
148#[allow(clippy::enum_variant_names)]
149pub(crate) enum Reply {
150 BasicQosOk(PromiseResolver<()>),
151 BasicConsumeOk(
152 PromiseResolver<Consumer>,
153 Option<Arc<ChannelCloser>>,
154 ShortString,
155 BasicConsumeOptions,
156 FieldTable,
157 Option<Consumer>,
158 ),
159 BasicCancelOk(PromiseResolver<()>),
160 BasicGetOk(PromiseResolver<Option<BasicGetMessage>>),
161 BasicRecoverOk(PromiseResolver<()>),
162 ConnectionOpenOk(PromiseResolver<()>, Box<Connection>),
163 ConnectionCloseOk(PromiseResolver<()>),
164 ConnectionUpdateSecretOk(PromiseResolver<()>),
165 ChannelOpenOk(PromiseResolver<Channel>, Channel),
166 ChannelFlowOk(PromiseResolver<Boolean>),
167 ChannelCloseOk(PromiseResolver<()>),
168 AccessRequestOk(PromiseResolver<()>),
169 ExchangeDeclareOk(
170 PromiseResolver<()>,
171 ShortString,
172 ExchangeKind,
173 ExchangeDeclareOptions,
174 FieldTable,
175 ),
176 ExchangeDeleteOk(PromiseResolver<()>, ShortString),
177 ExchangeBindOk(
178 PromiseResolver<()>,
179 ShortString,
180 ShortString,
181 ShortString,
182 FieldTable,
183 ),
184 ExchangeUnbindOk(
185 PromiseResolver<()>,
186 ShortString,
187 ShortString,
188 ShortString,
189 FieldTable,
190 ),
191 QueueDeclareOk(PromiseResolver<Queue>, QueueDeclareOptions, FieldTable),
192 QueueBindOk(
193 PromiseResolver<()>,
194 ShortString,
195 ShortString,
196 ShortString,
197 FieldTable,
198 ),
199 QueuePurgeOk(PromiseResolver<MessageCount>),
200 QueueDeleteOk(PromiseResolver<MessageCount>, ShortString),
201 QueueUnbindOk(
202 PromiseResolver<()>,
203 ShortString,
204 ShortString,
205 ShortString,
206 FieldTable,
207 ),
208 TxSelectOk(PromiseResolver<()>),
209 TxCommitOk(PromiseResolver<()>),
210 TxRollbackOk(PromiseResolver<()>),
211 ConfirmSelectOk(PromiseResolver<()>),
212}
213
214impl Channel {
215 pub(crate) fn receive_method(&self, method: AMQPClass) -> Result<()> {
216 match method {
217 AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk(m)) => self.receive_basic_qos_ok(m),
218 AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk(m)) => {
219 self.receive_basic_consume_ok(m)
220 }
221 AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(m)) => {
222 self.receive_basic_cancel(m)
223 }
224 AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(m)) => {
225 self.receive_basic_cancel_ok(m)
226 }
227 AMQPClass::Basic(protocol::basic::AMQPMethod::Return(m)) => {
228 self.receive_basic_return(m)
229 }
230 AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver(m)) => {
231 self.receive_basic_deliver(m)
232 }
233 AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk(m)) => self.receive_basic_get_ok(m),
234 AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty(m)) => {
235 self.receive_basic_get_empty(m)
236 }
237 AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(m)) => self.receive_basic_ack(m),
238 AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk(m)) => {
239 self.receive_basic_recover_ok(m)
240 }
241 AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(m)) => self.receive_basic_nack(m),
242 AMQPClass::Connection(protocol::connection::AMQPMethod::Start(m)) => {
243 self.receive_connection_start(m)
244 }
245 AMQPClass::Connection(protocol::connection::AMQPMethod::Secure(m)) => {
246 self.receive_connection_secure(m)
247 }
248 AMQPClass::Connection(protocol::connection::AMQPMethod::Tune(m)) => {
249 self.receive_connection_tune(m)
250 }
251 AMQPClass::Connection(protocol::connection::AMQPMethod::OpenOk(m)) => {
252 self.receive_connection_open_ok(m)
253 }
254 AMQPClass::Connection(protocol::connection::AMQPMethod::Close(m)) => {
255 self.receive_connection_close(m)
256 }
257 AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(m)) => {
258 self.receive_connection_close_ok(m)
259 }
260 AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(m)) => {
261 self.receive_connection_blocked(m)
262 }
263 AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(m)) => {
264 self.receive_connection_unblocked(m)
265 }
266 AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecretOk(m)) => {
267 self.receive_connection_update_secret_ok(m)
268 }
269 AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk(m)) => {
270 self.receive_channel_open_ok(m)
271 }
272 AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(m)) => {
273 self.receive_channel_flow(m)
274 }
275 AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(m)) => {
276 self.receive_channel_flow_ok(m)
277 }
278 AMQPClass::Channel(protocol::channel::AMQPMethod::Close(m)) => {
279 self.receive_channel_close(m)
280 }
281 AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(m)) => {
282 self.receive_channel_close_ok(m)
283 }
284 AMQPClass::Access(protocol::access::AMQPMethod::RequestOk(m)) => {
285 self.receive_access_request_ok(m)
286 }
287 AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk(m)) => {
288 self.receive_exchange_declare_ok(m)
289 }
290 AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk(m)) => {
291 self.receive_exchange_delete_ok(m)
292 }
293 AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk(m)) => {
294 self.receive_exchange_bind_ok(m)
295 }
296 AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk(m)) => {
297 self.receive_exchange_unbind_ok(m)
298 }
299 AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk(m)) => {
300 self.receive_queue_declare_ok(m)
301 }
302 AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk(m)) => {
303 self.receive_queue_bind_ok(m)
304 }
305 AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk(m)) => {
306 self.receive_queue_purge_ok(m)
307 }
308 AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk(m)) => {
309 self.receive_queue_delete_ok(m)
310 }
311 AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk(m)) => {
312 self.receive_queue_unbind_ok(m)
313 }
314 AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk(m)) => self.receive_tx_select_ok(m),
315 AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk(m)) => self.receive_tx_commit_ok(m),
316 AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk(m)) => {
317 self.receive_tx_rollback_ok(m)
318 }
319 AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk(m)) => {
320 self.receive_confirm_select_ok(m)
321 }
322 m => {
323 error!(method=?m, "The client should not receive this method");
324 self.handle_invalid_contents(
325 format!("unexpected method received on channel {}", self.id),
326 m.get_amqp_class_id(),
327 m.get_amqp_method_id(),
328 )
329 }
330 }
331 }
332
333 pub async fn basic_qos(
334 &self,
335 prefetch_count: ShortUInt,
336 options: BasicQosOptions,
337 ) -> Result<()> {
338 if !self.status.connected() {
339 return Err(self.status.state_error("basic.qos"));
340 }
341
342 let BasicQosOptions { global } = options;
343 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Qos(protocol::basic::Qos {
344 prefetch_count,
345 global,
346 }));
347
348 let (promise, send_resolver) = Promise::new();
349 if level_enabled!(Level::TRACE) {
350 promise.set_marker("basic.qos".into());
351 }
352 let ((promise, resolver), promise_out) = (Promise::new(), promise);
353 if level_enabled!(Level::TRACE) {
354 promise.set_marker("basic.qos.Ok".into());
355 }
356 self.send_method_frame(
357 method,
358 send_resolver,
359 Some(ExpectedReply(
360 Reply::BasicQosOk(resolver.clone()),
361 Box::new(resolver),
362 )),
363 );
364 promise_out.await?;
365 promise.await
366 }
367 fn receive_basic_qos_ok(&self, method: protocol::basic::QosOk) -> Result<()> {
368 if !self.status.can_receive_messages() {
369 return Err(self.status.state_error("basic.qos-ok"));
370 }
371
372 match self
373 .frames
374 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicQosOk(..)))
375 {
376 Some(Reply::BasicQosOk(resolver)) => {
377 let res = Ok(());
378 resolver.complete(res.clone());
379 res
380 }
381 unexpected => self.handle_invalid_contents(
382 format!(
383 "unexpected basic qos-ok received on channel {}, was awaiting for {:?}",
384 self.id, unexpected
385 ),
386 method.get_amqp_class_id(),
387 method.get_amqp_method_id(),
388 ),
389 }
390 }
391 async fn do_basic_consume(
392 &self,
393 queue: &str,
394 consumer_tag: &str,
395 options: BasicConsumeOptions,
396 arguments: FieldTable,
397 original: Option<Consumer>,
398 ) -> Result<Consumer> {
399 if !self.status.connected() {
400 return Err(self.status.state_error("basic.consume"));
401 }
402
403 let creation_arguments = arguments.clone();
404 let BasicConsumeOptions {
405 no_local,
406 no_ack,
407 exclusive,
408 nowait,
409 } = options;
410 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Consume(
411 protocol::basic::Consume {
412 queue: queue.into(),
413 consumer_tag: consumer_tag.into(),
414 no_local,
415 no_ack,
416 exclusive,
417 nowait,
418 arguments,
419 },
420 ));
421
422 let (promise, send_resolver) = Promise::new();
423 if level_enabled!(Level::TRACE) {
424 promise.set_marker("basic.consume".into());
425 }
426 let ((promise, resolver), promise_out) = (Promise::new(), promise);
427 if level_enabled!(Level::TRACE) {
428 promise.set_marker("basic.consume.Ok".into());
429 }
430 self.send_method_frame(
431 method,
432 send_resolver,
433 Some(ExpectedReply(
434 Reply::BasicConsumeOk(
435 resolver.clone(),
436 self.channel_closer.clone(),
437 queue.into(),
438 options,
439 creation_arguments,
440 original,
441 ),
442 Box::new(resolver),
443 )),
444 );
445 if nowait {
446 self.receive_basic_consume_ok(protocol::basic::ConsumeOk {
447 consumer_tag: consumer_tag.into(),
448 })?;
449 }
450 promise_out.await?;
451 promise.await
452 }
453 fn receive_basic_consume_ok(&self, method: protocol::basic::ConsumeOk) -> Result<()> {
454 if !self.status.can_receive_messages() {
455 return Err(self.status.state_error("basic.consume-ok"));
456 }
457
458 match self.frames.find_expected_reply(self.id, |reply| {
459 matches!(&reply.0, Reply::BasicConsumeOk(..))
460 }) {
461 Some(Reply::BasicConsumeOk(
462 resolver,
463 channel_closer,
464 queue,
465 options,
466 creation_arguments,
467 original,
468 )) => self.on_basic_consume_ok_received(
469 method,
470 resolver,
471 channel_closer,
472 queue,
473 options,
474 creation_arguments,
475 original,
476 ),
477 unexpected => self.handle_invalid_contents(
478 format!(
479 "unexpected basic consume-ok received on channel {}, was awaiting for {:?}",
480 self.id, unexpected
481 ),
482 method.get_amqp_class_id(),
483 method.get_amqp_method_id(),
484 ),
485 }
486 }
487 pub async fn basic_cancel(
488 &self,
489 consumer_tag: &str,
490 options: BasicCancelOptions,
491 ) -> Result<()> {
492 if !self.status.connected() {
493 return Err(self.status.state_error("basic.cancel"));
494 }
495
496 self.before_basic_cancel(consumer_tag);
497 let BasicCancelOptions { nowait } = options;
498 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(
499 protocol::basic::Cancel {
500 consumer_tag: consumer_tag.into(),
501 nowait,
502 },
503 ));
504
505 let (promise, send_resolver) = Promise::new();
506 if level_enabled!(Level::TRACE) {
507 promise.set_marker("basic.cancel".into());
508 }
509 let ((promise, resolver), promise_out) = (Promise::new(), promise);
510 if level_enabled!(Level::TRACE) {
511 promise.set_marker("basic.cancel.Ok".into());
512 }
513 self.send_method_frame(
514 method,
515 send_resolver,
516 Some(ExpectedReply(
517 Reply::BasicCancelOk(resolver.clone()),
518 Box::new(resolver),
519 )),
520 );
521 if nowait {
522 self.receive_basic_cancel_ok(protocol::basic::CancelOk {
523 consumer_tag: consumer_tag.into(),
524 })?;
525 }
526 promise_out.await?;
527 promise.await
528 }
529 fn receive_basic_cancel(&self, method: protocol::basic::Cancel) -> Result<()> {
530 if !self.status.can_receive_messages() {
531 return Err(self.status.state_error("basic.cancel"));
532 }
533 self.on_basic_cancel_received(method)
534 }
535 async fn basic_cancel_ok(&self, consumer_tag: &str) -> Result<()> {
536 if !self.status.connected() {
537 return Err(self.status.state_error("basic.cancel-ok"));
538 }
539
540 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(
541 protocol::basic::CancelOk {
542 consumer_tag: consumer_tag.into(),
543 },
544 ));
545
546 let (promise, send_resolver) = Promise::new();
547 if level_enabled!(Level::TRACE) {
548 promise.set_marker("basic.cancel-ok".into());
549 }
550 self.send_method_frame(method, send_resolver, None);
551 promise.await
552 }
553 fn receive_basic_cancel_ok(&self, method: protocol::basic::CancelOk) -> Result<()> {
554 if !self.status.can_receive_messages() {
555 return Err(self.status.state_error("basic.cancel-ok"));
556 }
557
558 match self.frames.find_expected_reply(self.id, |reply| {
559 matches!(&reply.0, Reply::BasicCancelOk(..))
560 }) {
561 Some(Reply::BasicCancelOk(resolver)) => {
562 let res = self.on_basic_cancel_ok_received(method);
563 resolver.complete(res.clone());
564 res
565 }
566 unexpected => self.handle_invalid_contents(
567 format!(
568 "unexpected basic cancel-ok received on channel {}, was awaiting for {:?}",
569 self.id, unexpected
570 ),
571 method.get_amqp_class_id(),
572 method.get_amqp_method_id(),
573 ),
574 }
575 }
576 pub async fn basic_publish(
577 &self,
578 exchange: &str,
579 routing_key: &str,
580 options: BasicPublishOptions,
581 payload: &[u8],
582 properties: BasicProperties,
583 ) -> Result<PublisherConfirm> {
584 if !self.status.connected() {
585 return Err(self.status.state_error("basic.publish"));
586 }
587
588 let start_hook_res = self.before_basic_publish();
589 let BasicPublishOptions {
590 mandatory,
591 immediate,
592 } = options;
593 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Publish(
594 protocol::basic::Publish {
595 exchange: exchange.into(),
596 routing_key: routing_key.into(),
597 mandatory,
598 immediate,
599 },
600 ));
601
602 self.send_method_frame_with_body(method, payload, properties, start_hook_res)
603 .await
604 }
605 fn receive_basic_return(&self, method: protocol::basic::Return) -> Result<()> {
606 if !self.status.can_receive_messages() {
607 return Err(self.status.state_error("basic.return"));
608 }
609 self.on_basic_return_received(method)
610 }
611 fn receive_basic_deliver(&self, method: protocol::basic::Deliver) -> Result<()> {
612 if !self.status.can_receive_messages() {
613 return Err(self.status.state_error("basic.deliver"));
614 }
615 self.on_basic_deliver_received(method)
616 }
617 async fn do_basic_get(
618 &self,
619 queue: &str,
620 options: BasicGetOptions,
621 original: Option<PromiseResolver<Option<BasicGetMessage>>>,
622 ) -> Result<Option<BasicGetMessage>> {
623 if !self.status.connected() {
624 return Err(self.status.state_error("basic.get"));
625 }
626
627 let BasicGetOptions { no_ack } = options;
628 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Get(protocol::basic::Get {
629 queue: queue.into(),
630 no_ack,
631 }));
632
633 let (promise, send_resolver) = Promise::new();
634 if level_enabled!(Level::TRACE) {
635 promise.set_marker("basic.get".into());
636 }
637 let ((promise, resolver), promise_out) = (Promise::new(), promise);
638 if level_enabled!(Level::TRACE) {
639 promise.set_marker("basic.get.Ok".into());
640 }
641 let resolver = original.unwrap_or(resolver);
642 self.send_method_frame(
643 method,
644 send_resolver,
645 Some(ExpectedReply(
646 Reply::BasicGetOk(resolver.clone()),
647 Box::new(resolver),
648 )),
649 );
650 promise_out.await?;
651 promise.await
652 }
653 fn receive_basic_get_ok(&self, method: protocol::basic::GetOk) -> Result<()> {
654 if !self.status.can_receive_messages() {
655 return Err(self.status.state_error("basic.get-ok"));
656 }
657
658 match self
659 .frames
660 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::BasicGetOk(..)))
661 {
662 Some(Reply::BasicGetOk(resolver)) => self.on_basic_get_ok_received(method, resolver),
663 unexpected => self.handle_invalid_contents(
664 format!(
665 "unexpected basic get-ok received on channel {}, was awaiting for {:?}",
666 self.id, unexpected
667 ),
668 method.get_amqp_class_id(),
669 method.get_amqp_method_id(),
670 ),
671 }
672 }
673 fn receive_basic_get_empty(&self, method: protocol::basic::GetEmpty) -> Result<()> {
674 if !self.status.can_receive_messages() {
675 return Err(self.status.state_error("basic.get-empty"));
676 }
677 self.on_basic_get_empty_received(method)
678 }
679 pub async fn basic_ack(
680 &self,
681 delivery_tag: LongLongUInt,
682 options: BasicAckOptions,
683 ) -> Result<()> {
684 if !self.status.connected() {
685 return Err(self.status.state_error("basic.ack"));
686 }
687
688 let BasicAckOptions { multiple } = options;
689 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(protocol::basic::Ack {
690 delivery_tag,
691 multiple,
692 }));
693
694 let (promise, send_resolver) = Promise::new();
695 if level_enabled!(Level::TRACE) {
696 promise.set_marker("basic.ack".into());
697 }
698 self.send_method_frame(method, send_resolver, None);
699 self.on_basic_ack_sent(multiple, delivery_tag);
700 promise.await
701 }
702 fn receive_basic_ack(&self, method: protocol::basic::Ack) -> Result<()> {
703 if !self.status.can_receive_messages() {
704 return Err(self.status.state_error("basic.ack"));
705 }
706 self.on_basic_ack_received(method)
707 }
708 pub async fn basic_reject(
709 &self,
710 delivery_tag: LongLongUInt,
711 options: BasicRejectOptions,
712 ) -> Result<()> {
713 if !self.status.connected() {
714 return Err(self.status.state_error("basic.reject"));
715 }
716
717 let BasicRejectOptions { requeue } = options;
718 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Reject(
719 protocol::basic::Reject {
720 delivery_tag,
721 requeue,
722 },
723 ));
724
725 let (promise, send_resolver) = Promise::new();
726 if level_enabled!(Level::TRACE) {
727 promise.set_marker("basic.reject".into());
728 }
729 self.send_method_frame(method, send_resolver, None);
730 promise.await
731 }
732 pub async fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> Result<()> {
733 if !self.status.connected() {
734 return Err(self.status.state_error("basic.recover-async"));
735 }
736
737 let BasicRecoverAsyncOptions { requeue } = options;
738 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverAsync(
739 protocol::basic::RecoverAsync { requeue },
740 ));
741
742 let (promise, send_resolver) = Promise::new();
743 if level_enabled!(Level::TRACE) {
744 promise.set_marker("basic.recover-async".into());
745 }
746 self.send_method_frame(method, send_resolver, None);
747 self.on_basic_recover_async_sent();
748 promise.await
749 }
750 pub async fn basic_recover(&self, options: BasicRecoverOptions) -> Result<()> {
751 if !self.status.connected() {
752 return Err(self.status.state_error("basic.recover"));
753 }
754
755 let BasicRecoverOptions { requeue } = options;
756 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Recover(
757 protocol::basic::Recover { requeue },
758 ));
759
760 let (promise, send_resolver) = Promise::new();
761 if level_enabled!(Level::TRACE) {
762 promise.set_marker("basic.recover".into());
763 }
764 let ((promise, resolver), promise_out) = (Promise::new(), promise);
765 if level_enabled!(Level::TRACE) {
766 promise.set_marker("basic.recover.Ok".into());
767 }
768 self.send_method_frame(
769 method,
770 send_resolver,
771 Some(ExpectedReply(
772 Reply::BasicRecoverOk(resolver.clone()),
773 Box::new(resolver),
774 )),
775 );
776 promise_out.await?;
777 promise.await
778 }
779 fn receive_basic_recover_ok(&self, method: protocol::basic::RecoverOk) -> Result<()> {
780 if !self.status.can_receive_messages() {
781 return Err(self.status.state_error("basic.recover-ok"));
782 }
783
784 match self.frames.find_expected_reply(self.id, |reply| {
785 matches!(&reply.0, Reply::BasicRecoverOk(..))
786 }) {
787 Some(Reply::BasicRecoverOk(resolver)) => {
788 let res = self.on_basic_recover_ok_received();
789 resolver.complete(res.clone());
790 res
791 }
792 unexpected => self.handle_invalid_contents(
793 format!(
794 "unexpected basic recover-ok received on channel {}, was awaiting for {:?}",
795 self.id, unexpected
796 ),
797 method.get_amqp_class_id(),
798 method.get_amqp_method_id(),
799 ),
800 }
801 }
802 pub async fn basic_nack(
803 &self,
804 delivery_tag: LongLongUInt,
805 options: BasicNackOptions,
806 ) -> Result<()> {
807 if !self.status.connected() {
808 return Err(self.status.state_error("basic.nack"));
809 }
810
811 let BasicNackOptions { multiple, requeue } = options;
812 let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(protocol::basic::Nack {
813 delivery_tag,
814 multiple,
815 requeue,
816 }));
817
818 let (promise, send_resolver) = Promise::new();
819 if level_enabled!(Level::TRACE) {
820 promise.set_marker("basic.nack".into());
821 }
822 self.send_method_frame(method, send_resolver, None);
823 self.on_basic_nack_sent(multiple, delivery_tag);
824 promise.await
825 }
826 fn receive_basic_nack(&self, method: protocol::basic::Nack) -> Result<()> {
827 if !self.status.can_receive_messages() {
828 return Err(self.status.state_error("basic.nack"));
829 }
830 self.on_basic_nack_received(method)
831 }
832 fn receive_connection_start(&self, method: protocol::connection::Start) -> Result<()> {
833 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
834 if !self.status.can_receive_messages() {
835 return Err(self.status.state_error("connection.start"));
836 }
837 self.on_connection_start_received(method)
838 }
839 async fn connection_start_ok(
840 &self,
841 client_properties: FieldTable,
842 mechanism: &str,
843 response: &str,
844 locale: &str,
845 resolver: PromiseResolver<Connection>,
846 connection: Connection,
847 credentials: Credentials,
848 ) -> Result<()> {
849 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::StartOk(
850 protocol::connection::StartOk {
851 client_properties,
852 mechanism: mechanism.into(),
853 response: response.into(),
854 locale: locale.into(),
855 },
856 ));
857
858 let (promise, send_resolver) = Promise::new();
859 if level_enabled!(Level::TRACE) {
860 promise.set_marker("connection.start-ok".into());
861 }
862 self.before_connection_start_ok(resolver, connection, credentials);
863 self.send_method_frame(method, send_resolver, None);
864 promise.await
865 }
866 fn receive_connection_secure(&self, method: protocol::connection::Secure) -> Result<()> {
867 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
868 if !self.status.can_receive_messages() {
869 return Err(self.status.state_error("connection.secure"));
870 }
871 self.on_connection_secure_received(method)
872 }
873 async fn connection_secure_ok(&self, response: &str) -> Result<()> {
874 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::SecureOk(
875 protocol::connection::SecureOk {
876 response: response.into(),
877 },
878 ));
879
880 let (promise, send_resolver) = Promise::new();
881 if level_enabled!(Level::TRACE) {
882 promise.set_marker("connection.secure-ok".into());
883 }
884 self.send_method_frame(method, send_resolver, None);
885 promise.await
886 }
887 fn receive_connection_tune(&self, method: protocol::connection::Tune) -> Result<()> {
888 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
889 if !self.status.can_receive_messages() {
890 return Err(self.status.state_error("connection.tune"));
891 }
892 self.on_connection_tune_received(method)
893 }
894 async fn connection_tune_ok(
895 &self,
896 channel_max: ShortUInt,
897 frame_max: LongUInt,
898 heartbeat: ShortUInt,
899 ) -> Result<()> {
900 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::TuneOk(
901 protocol::connection::TuneOk {
902 channel_max,
903 frame_max,
904 heartbeat,
905 },
906 ));
907
908 let (promise, send_resolver) = Promise::new();
909 if level_enabled!(Level::TRACE) {
910 promise.set_marker("connection.tune-ok".into());
911 }
912 self.send_method_frame(method, send_resolver, None);
913 promise.await
914 }
915 pub(crate) async fn connection_open(
916 &self,
917 virtual_host: &str,
918 connection: Box<Connection>,
919 conn_resolver: PromiseResolver<Connection>,
920 ) -> Result<()> {
921 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Open(
922 protocol::connection::Open {
923 virtual_host: virtual_host.into(),
924 },
925 ));
926
927 let (promise, send_resolver) = Promise::new();
928 if level_enabled!(Level::TRACE) {
929 promise.set_marker("connection.open".into());
930 }
931 let ((promise, resolver), promise_out) = (Promise::new(), promise);
932 if level_enabled!(Level::TRACE) {
933 promise.set_marker("connection.open.Ok".into());
934 }
935 self.before_connection_open(conn_resolver);
936 self.send_method_frame(
937 method,
938 send_resolver,
939 Some(ExpectedReply(
940 Reply::ConnectionOpenOk(resolver.clone(), connection),
941 Box::new(resolver),
942 )),
943 );
944 promise_out.await?;
945 promise.await
946 }
947 fn receive_connection_open_ok(&self, method: protocol::connection::OpenOk) -> Result<()> {
948 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
949 if !self.status.can_receive_messages() {
950 return Err(self.status.state_error("connection.open-ok"));
951 }
952
953 match self.frames.find_expected_reply(self.id, |reply| {
954 matches!(&reply.0, Reply::ConnectionOpenOk(..))
955 }) {
956 Some(Reply::ConnectionOpenOk(resolver, connection)) => {
957 let res = self.on_connection_open_ok_received(method, connection);
958 resolver.complete(res.clone());
959 res
960 }
961 unexpected => self.handle_invalid_contents(
962 format!(
963 "unexpected connection open-ok received on channel {}, was awaiting for {:?}",
964 self.id, unexpected
965 ),
966 method.get_amqp_class_id(),
967 method.get_amqp_method_id(),
968 ),
969 }
970 }
971 pub(crate) async fn connection_close(
972 &self,
973 reply_code: ShortUInt,
974 reply_text: &str,
975 class_id: ShortUInt,
976 method_id: ShortUInt,
977 ) -> Result<()> {
978 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Close(
979 protocol::connection::Close {
980 reply_code,
981 reply_text: reply_text.into(),
982 class_id,
983 method_id,
984 },
985 ));
986
987 let (promise, send_resolver) = Promise::new();
988 if level_enabled!(Level::TRACE) {
989 promise.set_marker("connection.close".into());
990 }
991 let ((promise, resolver), promise_out) = (Promise::new(), promise);
992 if level_enabled!(Level::TRACE) {
993 promise.set_marker("connection.close.Ok".into());
994 }
995 self.send_method_frame(
996 method,
997 send_resolver,
998 Some(ExpectedReply(
999 Reply::ConnectionCloseOk(resolver.clone()),
1000 Box::new(resolver),
1001 )),
1002 );
1003 promise_out.await?;
1004 promise.await
1005 }
1006 fn receive_connection_close(&self, method: protocol::connection::Close) -> Result<()> {
1007 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1008 if !self.status.can_receive_messages() {
1009 return Err(self.status.state_error("connection.close"));
1010 }
1011 self.on_connection_close_received(method)
1012 }
1013 pub(crate) async fn connection_close_ok(&self, error: Error) -> Result<()> {
1014 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::CloseOk(
1015 protocol::connection::CloseOk {},
1016 ));
1017
1018 let (promise, send_resolver) = Promise::new();
1019 if level_enabled!(Level::TRACE) {
1020 promise.set_marker("connection.close-ok".into());
1021 }
1022 self.send_method_frame(method, send_resolver, None);
1023 self.on_connection_close_ok_sent(error);
1024 promise.await
1025 }
1026 fn receive_connection_close_ok(&self, method: protocol::connection::CloseOk) -> Result<()> {
1027 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1028 if !self.status.can_receive_messages() {
1029 return Err(self.status.state_error("connection.close-ok"));
1030 }
1031
1032 match self.frames.find_expected_reply(self.id, |reply| {
1033 matches!(&reply.0, Reply::ConnectionCloseOk(..))
1034 }) {
1035 Some(Reply::ConnectionCloseOk(resolver)) => {
1036 let res = self.on_connection_close_ok_received();
1037 resolver.complete(res.clone());
1038 res
1039 }
1040 unexpected => self.handle_invalid_contents(
1041 format!(
1042 "unexpected connection close-ok received on channel {}, was awaiting for {:?}",
1043 self.id, unexpected
1044 ),
1045 method.get_amqp_class_id(),
1046 method.get_amqp_method_id(),
1047 ),
1048 }
1049 }
1050 pub(crate) async fn connection_blocked(&self, reason: &str) -> Result<()> {
1051 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Blocked(
1052 protocol::connection::Blocked {
1053 reason: reason.into(),
1054 },
1055 ));
1056
1057 let (promise, send_resolver) = Promise::new();
1058 if level_enabled!(Level::TRACE) {
1059 promise.set_marker("connection.blocked".into());
1060 }
1061 self.send_method_frame(method, send_resolver, None);
1062 promise.await
1063 }
1064 fn receive_connection_blocked(&self, method: protocol::connection::Blocked) -> Result<()> {
1065 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1066 if !self.status.can_receive_messages() {
1067 return Err(self.status.state_error("connection.blocked"));
1068 }
1069 self.on_connection_blocked_received(method)
1070 }
1071 pub(crate) async fn connection_unblocked(&self) -> Result<()> {
1072 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::Unblocked(
1073 protocol::connection::Unblocked {},
1074 ));
1075
1076 let (promise, send_resolver) = Promise::new();
1077 if level_enabled!(Level::TRACE) {
1078 promise.set_marker("connection.unblocked".into());
1079 }
1080 self.send_method_frame(method, send_resolver, None);
1081 promise.await
1082 }
1083 fn receive_connection_unblocked(&self, method: protocol::connection::Unblocked) -> Result<()> {
1084 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1085 if !self.status.can_receive_messages() {
1086 return Err(self.status.state_error("connection.unblocked"));
1087 }
1088 self.on_connection_unblocked_received(method)
1089 }
1090 pub(crate) async fn connection_update_secret(
1091 &self,
1092 new_secret: &str,
1093 reason: &str,
1094 ) -> Result<()> {
1095 let method = AMQPClass::Connection(protocol::connection::AMQPMethod::UpdateSecret(
1096 protocol::connection::UpdateSecret {
1097 new_secret: new_secret.into(),
1098 reason: reason.into(),
1099 },
1100 ));
1101
1102 let (promise, send_resolver) = Promise::new();
1103 if level_enabled!(Level::TRACE) {
1104 promise.set_marker("connection.update-secret".into());
1105 }
1106 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1107 if level_enabled!(Level::TRACE) {
1108 promise.set_marker("connection.update-secret.Ok".into());
1109 }
1110 self.send_method_frame(
1111 method,
1112 send_resolver,
1113 Some(ExpectedReply(
1114 Reply::ConnectionUpdateSecretOk(resolver.clone()),
1115 Box::new(resolver),
1116 )),
1117 );
1118 promise_out.await?;
1119 promise.await
1120 }
1121 fn receive_connection_update_secret_ok(
1122 &self,
1123 method: protocol::connection::UpdateSecretOk,
1124 ) -> Result<()> {
1125 self.assert_channel0(method.get_amqp_class_id(), method.get_amqp_method_id())?;
1126 if !self.status.can_receive_messages() {
1127 return Err(self.status.state_error("connection.update-secret-ok"));
1128 }
1129
1130 match self.frames.find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::ConnectionUpdateSecretOk(..))){
1131 Some(Reply::ConnectionUpdateSecretOk(resolver)) => {
1132 let res = Ok(())
1133;
1134 resolver.complete(res.clone());
1135 res
1136},
1137 unexpected => {
1138 self.handle_invalid_contents(format!("unexpected connection update-secret-ok received on channel {}, was awaiting for {:?}", self.id, unexpected), method.get_amqp_class_id(), method.get_amqp_method_id())
1139 },
1140 }
1141 }
1142 pub(crate) async fn channel_open(&self, channel: Channel) -> Result<Channel> {
1143 if !self.status.initializing() {
1144 return Err(self.status.state_error("channel.open"));
1145 }
1146
1147 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Open(
1148 protocol::channel::Open {},
1149 ));
1150
1151 let (promise, send_resolver) = Promise::new();
1152 if level_enabled!(Level::TRACE) {
1153 promise.set_marker("channel.open".into());
1154 }
1155 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1156 if level_enabled!(Level::TRACE) {
1157 promise.set_marker("channel.open.Ok".into());
1158 }
1159 self.send_method_frame(
1160 method,
1161 send_resolver,
1162 Some(ExpectedReply(
1163 Reply::ChannelOpenOk(resolver.clone(), channel),
1164 Box::new(resolver),
1165 )),
1166 );
1167 promise_out.await?;
1168 promise.await
1169 }
1170 fn receive_channel_open_ok(&self, method: protocol::channel::OpenOk) -> Result<()> {
1171 if !self.status.initializing() {
1172 return Err(self.status.state_error("channel.open-ok"));
1173 }
1174
1175 match self.frames.find_expected_reply(self.id, |reply| {
1176 matches!(&reply.0, Reply::ChannelOpenOk(..))
1177 }) {
1178 Some(Reply::ChannelOpenOk(resolver, channel)) => {
1179 self.on_channel_open_ok_received(method, resolver, channel)
1180 }
1181 unexpected => self.handle_invalid_contents(
1182 format!(
1183 "unexpected channel open-ok received on channel {}, was awaiting for {:?}",
1184 self.id, unexpected
1185 ),
1186 method.get_amqp_class_id(),
1187 method.get_amqp_method_id(),
1188 ),
1189 }
1190 }
1191 pub async fn channel_flow(&self, options: ChannelFlowOptions) -> Result<Boolean> {
1192 if !self.status.connected() {
1193 return Err(self.status.state_error("channel.flow"));
1194 }
1195
1196 let ChannelFlowOptions { active } = options;
1197 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(
1198 protocol::channel::Flow { active },
1199 ));
1200
1201 let (promise, send_resolver) = Promise::new();
1202 if level_enabled!(Level::TRACE) {
1203 promise.set_marker("channel.flow".into());
1204 }
1205 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1206 if level_enabled!(Level::TRACE) {
1207 promise.set_marker("channel.flow.Ok".into());
1208 }
1209 self.send_method_frame(
1210 method,
1211 send_resolver,
1212 Some(ExpectedReply(
1213 Reply::ChannelFlowOk(resolver.clone()),
1214 Box::new(resolver),
1215 )),
1216 );
1217 promise_out.await?;
1218 promise.await
1219 }
1220 fn receive_channel_flow(&self, method: protocol::channel::Flow) -> Result<()> {
1221 if !self.status.can_receive_messages() {
1222 return Err(self.status.state_error("channel.flow"));
1223 }
1224 self.on_channel_flow_received(method)
1225 }
1226 async fn channel_flow_ok(&self, options: ChannelFlowOkOptions) -> Result<()> {
1227 if !self.status.connected() {
1228 return Err(self.status.state_error("channel.flow-ok"));
1229 }
1230
1231 let ChannelFlowOkOptions { active } = options;
1232 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(
1233 protocol::channel::FlowOk { active },
1234 ));
1235
1236 let (promise, send_resolver) = Promise::new();
1237 if level_enabled!(Level::TRACE) {
1238 promise.set_marker("channel.flow-ok".into());
1239 }
1240 self.send_method_frame(method, send_resolver, None);
1241 promise.await
1242 }
1243 fn receive_channel_flow_ok(&self, method: protocol::channel::FlowOk) -> Result<()> {
1244 if !self.status.can_receive_messages() {
1245 return Err(self.status.state_error("channel.flow-ok"));
1246 }
1247
1248 match self.frames.find_expected_reply(self.id, |reply| {
1249 matches!(&reply.0, Reply::ChannelFlowOk(..))
1250 }) {
1251 Some(Reply::ChannelFlowOk(resolver)) => {
1252 self.on_channel_flow_ok_received(method, resolver)
1253 }
1254 unexpected => self.handle_invalid_contents(
1255 format!(
1256 "unexpected channel flow-ok received on channel {}, was awaiting for {:?}",
1257 self.id, unexpected
1258 ),
1259 method.get_amqp_class_id(),
1260 method.get_amqp_method_id(),
1261 ),
1262 }
1263 }
1264 async fn do_channel_close(
1265 &self,
1266 reply_code: ShortUInt,
1267 reply_text: &str,
1268 class_id: ShortUInt,
1269 method_id: ShortUInt,
1270 ) -> Result<()> {
1271 if !self.status.connected() {
1272 return Err(self.status.state_error("channel.close"));
1273 }
1274
1275 self.before_channel_close();
1276 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Close(
1277 protocol::channel::Close {
1278 reply_code,
1279 reply_text: reply_text.into(),
1280 class_id,
1281 method_id,
1282 },
1283 ));
1284
1285 let (promise, send_resolver) = Promise::new();
1286 if level_enabled!(Level::TRACE) {
1287 promise.set_marker("channel.close".into());
1288 }
1289 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1290 if level_enabled!(Level::TRACE) {
1291 promise.set_marker("channel.close.Ok".into());
1292 }
1293 self.send_method_frame(
1294 method,
1295 send_resolver,
1296 Some(ExpectedReply(
1297 Reply::ChannelCloseOk(resolver.clone()),
1298 Box::new(resolver),
1299 )),
1300 );
1301 promise_out.await?;
1302 promise.await
1303 }
1304 fn receive_channel_close(&self, method: protocol::channel::Close) -> Result<()> {
1305 if !self.status.can_receive_messages() {
1306 return Err(self.status.state_error("channel.close"));
1307 }
1308 self.on_channel_close_received(method)
1309 }
1310 async fn channel_close_ok(&self, error: Option<Error>) -> Result<()> {
1311 if !self.status.closing() {
1312 return Err(self.status.state_error("channel.close-ok"));
1313 }
1314
1315 let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(
1316 protocol::channel::CloseOk {},
1317 ));
1318
1319 let (promise, send_resolver) = Promise::new();
1320 if level_enabled!(Level::TRACE) {
1321 promise.set_marker("channel.close-ok".into());
1322 }
1323 self.send_method_frame(method, send_resolver, None);
1324 self.on_channel_close_ok_sent(error);
1325 promise.await
1326 }
1327 fn receive_channel_close_ok(&self, method: protocol::channel::CloseOk) -> Result<()> {
1328 if !self.status.can_receive_messages() {
1329 return Err(self.status.state_error("channel.close-ok"));
1330 }
1331
1332 match self.next_expected_close_ok_reply() {
1333 Some(Reply::ChannelCloseOk(resolver)) => {
1334 let res = self.on_channel_close_ok_received();
1335 resolver.complete(res.clone());
1336 res
1337 }
1338 unexpected => self.handle_invalid_contents(
1339 format!(
1340 "unexpected channel close-ok received on channel {}, was awaiting for {:?}",
1341 self.id, unexpected
1342 ),
1343 method.get_amqp_class_id(),
1344 method.get_amqp_method_id(),
1345 ),
1346 }
1347 }
1348 pub async fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Result<()> {
1349 if !self.status.connected() {
1350 return Err(self.status.state_error("access.request"));
1351 }
1352
1353 let AccessRequestOptions {
1354 exclusive,
1355 passive,
1356 active,
1357 write,
1358 read,
1359 } = options;
1360 let method = AMQPClass::Access(protocol::access::AMQPMethod::Request(
1361 protocol::access::Request {
1362 realm: realm.into(),
1363 exclusive,
1364 passive,
1365 active,
1366 write,
1367 read,
1368 },
1369 ));
1370
1371 let (promise, send_resolver) = Promise::new();
1372 if level_enabled!(Level::TRACE) {
1373 promise.set_marker("access.request".into());
1374 }
1375 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1376 if level_enabled!(Level::TRACE) {
1377 promise.set_marker("access.request.Ok".into());
1378 }
1379 self.send_method_frame(
1380 method,
1381 send_resolver,
1382 Some(ExpectedReply(
1383 Reply::AccessRequestOk(resolver.clone()),
1384 Box::new(resolver),
1385 )),
1386 );
1387 promise_out.await?;
1388 promise.await
1389 }
1390 fn receive_access_request_ok(&self, method: protocol::access::RequestOk) -> Result<()> {
1391 if !self.status.can_receive_messages() {
1392 return Err(self.status.state_error("access.request-ok"));
1393 }
1394
1395 match self.frames.find_expected_reply(self.id, |reply| {
1396 matches!(&reply.0, Reply::AccessRequestOk(..))
1397 }) {
1398 Some(Reply::AccessRequestOk(resolver)) => {
1399 let res = self.on_access_request_ok_received(method);
1400 resolver.complete(res.clone());
1401 res
1402 }
1403 unexpected => self.handle_invalid_contents(
1404 format!(
1405 "unexpected access request-ok received on channel {}, was awaiting for {:?}",
1406 self.id, unexpected
1407 ),
1408 method.get_amqp_class_id(),
1409 method.get_amqp_method_id(),
1410 ),
1411 }
1412 }
1413 async fn do_exchange_declare(
1414 &self,
1415 exchange: &str,
1416 kind: &str,
1417 options: ExchangeDeclareOptions,
1418 arguments: FieldTable,
1419 exchange_kind: ExchangeKind,
1420 ) -> Result<()> {
1421 if !self.status.connected() {
1422 return Err(self.status.state_error("exchange.declare"));
1423 }
1424
1425 let creation_arguments = arguments.clone();
1426 let ExchangeDeclareOptions {
1427 passive,
1428 durable,
1429 auto_delete,
1430 internal,
1431 nowait,
1432 } = options;
1433 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Declare(
1434 protocol::exchange::Declare {
1435 exchange: exchange.into(),
1436 kind: kind.into(),
1437 passive,
1438 durable,
1439 auto_delete,
1440 internal,
1441 nowait,
1442 arguments,
1443 },
1444 ));
1445
1446 let (promise, send_resolver) = Promise::new();
1447 if level_enabled!(Level::TRACE) {
1448 promise.set_marker("exchange.declare".into());
1449 }
1450 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1451 if level_enabled!(Level::TRACE) {
1452 promise.set_marker("exchange.declare.Ok".into());
1453 }
1454 self.send_method_frame(
1455 method,
1456 send_resolver,
1457 Some(ExpectedReply(
1458 Reply::ExchangeDeclareOk(
1459 resolver.clone(),
1460 exchange.into(),
1461 exchange_kind,
1462 options,
1463 creation_arguments,
1464 ),
1465 Box::new(resolver),
1466 )),
1467 );
1468 if nowait {
1469 self.receive_exchange_declare_ok(protocol::exchange::DeclareOk {})?;
1470 }
1471 promise_out.await?;
1472 promise.await
1473 }
1474 fn receive_exchange_declare_ok(&self, method: protocol::exchange::DeclareOk) -> Result<()> {
1475 if !self.status.can_receive_messages() {
1476 return Err(self.status.state_error("exchange.declare-ok"));
1477 }
1478
1479 match self.frames.find_expected_reply(self.id, |reply| {
1480 matches!(&reply.0, Reply::ExchangeDeclareOk(..))
1481 }) {
1482 Some(Reply::ExchangeDeclareOk(
1483 resolver,
1484 exchange,
1485 exchange_kind,
1486 options,
1487 creation_arguments,
1488 )) => self.on_exchange_declare_ok_received(
1489 resolver,
1490 exchange,
1491 exchange_kind,
1492 options,
1493 creation_arguments,
1494 ),
1495 unexpected => self.handle_invalid_contents(
1496 format!(
1497 "unexpected exchange declare-ok received on channel {}, was awaiting for {:?}",
1498 self.id, unexpected
1499 ),
1500 method.get_amqp_class_id(),
1501 method.get_amqp_method_id(),
1502 ),
1503 }
1504 }
1505 pub async fn exchange_delete(
1507 &self,
1508 exchange: &str,
1509 options: ExchangeDeleteOptions,
1510 ) -> Result<()> {
1511 if !self.status.connected() {
1512 return Err(self.status.state_error("exchange.delete"));
1513 }
1514
1515 let ExchangeDeleteOptions { if_unused, nowait } = options;
1516 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Delete(
1517 protocol::exchange::Delete {
1518 exchange: exchange.into(),
1519 if_unused,
1520 nowait,
1521 },
1522 ));
1523
1524 let (promise, send_resolver) = Promise::new();
1525 if level_enabled!(Level::TRACE) {
1526 promise.set_marker("exchange.delete".into());
1527 }
1528 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1529 if level_enabled!(Level::TRACE) {
1530 promise.set_marker("exchange.delete.Ok".into());
1531 }
1532 self.send_method_frame(
1533 method,
1534 send_resolver,
1535 Some(ExpectedReply(
1536 Reply::ExchangeDeleteOk(resolver.clone(), exchange.into()),
1537 Box::new(resolver),
1538 )),
1539 );
1540 if nowait {
1541 self.receive_exchange_delete_ok(protocol::exchange::DeleteOk {})?;
1542 }
1543 promise_out.await?;
1544 promise.await
1545 }
1546 fn receive_exchange_delete_ok(&self, method: protocol::exchange::DeleteOk) -> Result<()> {
1547 if !self.status.can_receive_messages() {
1548 return Err(self.status.state_error("exchange.delete-ok"));
1549 }
1550
1551 match self.frames.find_expected_reply(self.id, |reply| {
1552 matches!(&reply.0, Reply::ExchangeDeleteOk(..))
1553 }) {
1554 Some(Reply::ExchangeDeleteOk(resolver, exchange)) => {
1555 let res = self.on_exchange_delete_ok_received(exchange);
1556 resolver.complete(res.clone());
1557 res
1558 }
1559 unexpected => self.handle_invalid_contents(
1560 format!(
1561 "unexpected exchange delete-ok received on channel {}, was awaiting for {:?}",
1562 self.id, unexpected
1563 ),
1564 method.get_amqp_class_id(),
1565 method.get_amqp_method_id(),
1566 ),
1567 }
1568 }
1569 pub async fn exchange_bind(
1570 &self,
1571 destination: &str,
1572 source: &str,
1573 routing_key: &str,
1574 options: ExchangeBindOptions,
1575 arguments: FieldTable,
1576 ) -> Result<()> {
1577 if !self.status.connected() {
1578 return Err(self.status.state_error("exchange.bind"));
1579 }
1580
1581 let creation_arguments = arguments.clone();
1582 let ExchangeBindOptions { nowait } = options;
1583 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Bind(
1584 protocol::exchange::Bind {
1585 destination: destination.into(),
1586 source: source.into(),
1587 routing_key: routing_key.into(),
1588 nowait,
1589 arguments,
1590 },
1591 ));
1592
1593 let (promise, send_resolver) = Promise::new();
1594 if level_enabled!(Level::TRACE) {
1595 promise.set_marker("exchange.bind".into());
1596 }
1597 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1598 if level_enabled!(Level::TRACE) {
1599 promise.set_marker("exchange.bind.Ok".into());
1600 }
1601 self.send_method_frame(
1602 method,
1603 send_resolver,
1604 Some(ExpectedReply(
1605 Reply::ExchangeBindOk(
1606 resolver.clone(),
1607 destination.into(),
1608 source.into(),
1609 routing_key.into(),
1610 creation_arguments,
1611 ),
1612 Box::new(resolver),
1613 )),
1614 );
1615 if nowait {
1616 self.receive_exchange_bind_ok(protocol::exchange::BindOk {})?;
1617 }
1618 promise_out.await?;
1619 promise.await
1620 }
1621 fn receive_exchange_bind_ok(&self, method: protocol::exchange::BindOk) -> Result<()> {
1622 if !self.status.can_receive_messages() {
1623 return Err(self.status.state_error("exchange.bind-ok"));
1624 }
1625
1626 match self.frames.find_expected_reply(self.id, |reply| {
1627 matches!(&reply.0, Reply::ExchangeBindOk(..))
1628 }) {
1629 Some(Reply::ExchangeBindOk(
1630 resolver,
1631 destination,
1632 source,
1633 routing_key,
1634 creation_arguments,
1635 )) => {
1636 let res = self.on_exchange_bind_ok_received(
1637 destination,
1638 source,
1639 routing_key,
1640 creation_arguments,
1641 );
1642 resolver.complete(res.clone());
1643 res
1644 }
1645 unexpected => self.handle_invalid_contents(
1646 format!(
1647 "unexpected exchange bind-ok received on channel {}, was awaiting for {:?}",
1648 self.id, unexpected
1649 ),
1650 method.get_amqp_class_id(),
1651 method.get_amqp_method_id(),
1652 ),
1653 }
1654 }
1655 pub async fn exchange_unbind(
1656 &self,
1657 destination: &str,
1658 source: &str,
1659 routing_key: &str,
1660 options: ExchangeUnbindOptions,
1661 arguments: FieldTable,
1662 ) -> Result<()> {
1663 if !self.status.connected() {
1664 return Err(self.status.state_error("exchange.unbind"));
1665 }
1666
1667 let creation_arguments = arguments.clone();
1668 let ExchangeUnbindOptions { nowait } = options;
1669 let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Unbind(
1670 protocol::exchange::Unbind {
1671 destination: destination.into(),
1672 source: source.into(),
1673 routing_key: routing_key.into(),
1674 nowait,
1675 arguments,
1676 },
1677 ));
1678
1679 let (promise, send_resolver) = Promise::new();
1680 if level_enabled!(Level::TRACE) {
1681 promise.set_marker("exchange.unbind".into());
1682 }
1683 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1684 if level_enabled!(Level::TRACE) {
1685 promise.set_marker("exchange.unbind.Ok".into());
1686 }
1687 self.send_method_frame(
1688 method,
1689 send_resolver,
1690 Some(ExpectedReply(
1691 Reply::ExchangeUnbindOk(
1692 resolver.clone(),
1693 destination.into(),
1694 source.into(),
1695 routing_key.into(),
1696 creation_arguments,
1697 ),
1698 Box::new(resolver),
1699 )),
1700 );
1701 if nowait {
1702 self.receive_exchange_unbind_ok(protocol::exchange::UnbindOk {})?;
1703 }
1704 promise_out.await?;
1705 promise.await
1706 }
1707 fn receive_exchange_unbind_ok(&self, method: protocol::exchange::UnbindOk) -> Result<()> {
1708 if !self.status.can_receive_messages() {
1709 return Err(self.status.state_error("exchange.unbind-ok"));
1710 }
1711
1712 match self.frames.find_expected_reply(self.id, |reply| {
1713 matches!(&reply.0, Reply::ExchangeUnbindOk(..))
1714 }) {
1715 Some(Reply::ExchangeUnbindOk(
1716 resolver,
1717 destination,
1718 source,
1719 routing_key,
1720 creation_arguments,
1721 )) => {
1722 let res = self.on_exchange_unbind_ok_received(
1723 destination,
1724 source,
1725 routing_key,
1726 creation_arguments,
1727 );
1728 resolver.complete(res.clone());
1729 res
1730 }
1731 unexpected => self.handle_invalid_contents(
1732 format!(
1733 "unexpected exchange unbind-ok received on channel {}, was awaiting for {:?}",
1734 self.id, unexpected
1735 ),
1736 method.get_amqp_class_id(),
1737 method.get_amqp_method_id(),
1738 ),
1739 }
1740 }
1741 pub async fn queue_declare(
1742 &self,
1743 queue: &str,
1744 options: QueueDeclareOptions,
1745 arguments: FieldTable,
1746 ) -> Result<Queue> {
1747 if !self.status.connected() {
1748 return Err(self.status.state_error("queue.declare"));
1749 }
1750
1751 let creation_arguments = arguments.clone();
1752 let QueueDeclareOptions {
1753 passive,
1754 durable,
1755 exclusive,
1756 auto_delete,
1757 nowait,
1758 } = options;
1759 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Declare(
1760 protocol::queue::Declare {
1761 queue: queue.into(),
1762 passive,
1763 durable,
1764 exclusive,
1765 auto_delete,
1766 nowait,
1767 arguments,
1768 },
1769 ));
1770
1771 let (promise, send_resolver) = Promise::new();
1772 if level_enabled!(Level::TRACE) {
1773 promise.set_marker("queue.declare".into());
1774 }
1775 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1776 if level_enabled!(Level::TRACE) {
1777 promise.set_marker("queue.declare.Ok".into());
1778 }
1779 self.send_method_frame(
1780 method,
1781 send_resolver,
1782 Some(ExpectedReply(
1783 Reply::QueueDeclareOk(resolver.clone(), options, creation_arguments),
1784 Box::new(resolver),
1785 )),
1786 );
1787 if nowait {
1788 self.receive_queue_declare_ok(protocol::queue::DeclareOk {
1789 queue: queue.into(),
1790 ..Default::default()
1791 })?;
1792 }
1793 promise_out.await?;
1794 promise.await
1795 }
1796 fn receive_queue_declare_ok(&self, method: protocol::queue::DeclareOk) -> Result<()> {
1797 if !self.status.can_receive_messages() {
1798 return Err(self.status.state_error("queue.declare-ok"));
1799 }
1800
1801 match self.frames.find_expected_reply(self.id, |reply| {
1802 matches!(&reply.0, Reply::QueueDeclareOk(..))
1803 }) {
1804 Some(Reply::QueueDeclareOk(resolver, options, creation_arguments)) => {
1805 self.on_queue_declare_ok_received(method, resolver, options, creation_arguments)
1806 }
1807 unexpected => self.handle_invalid_contents(
1808 format!(
1809 "unexpected queue declare-ok received on channel {}, was awaiting for {:?}",
1810 self.id, unexpected
1811 ),
1812 method.get_amqp_class_id(),
1813 method.get_amqp_method_id(),
1814 ),
1815 }
1816 }
1817 pub async fn queue_bind(
1818 &self,
1819 queue: &str,
1820 exchange: &str,
1821 routing_key: &str,
1822 options: QueueBindOptions,
1823 arguments: FieldTable,
1824 ) -> Result<()> {
1825 if !self.status.connected() {
1826 return Err(self.status.state_error("queue.bind"));
1827 }
1828
1829 let creation_arguments = arguments.clone();
1830 let QueueBindOptions { nowait } = options;
1831 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Bind(protocol::queue::Bind {
1832 queue: queue.into(),
1833 exchange: exchange.into(),
1834 routing_key: routing_key.into(),
1835 nowait,
1836 arguments,
1837 }));
1838
1839 let (promise, send_resolver) = Promise::new();
1840 if level_enabled!(Level::TRACE) {
1841 promise.set_marker("queue.bind".into());
1842 }
1843 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1844 if level_enabled!(Level::TRACE) {
1845 promise.set_marker("queue.bind.Ok".into());
1846 }
1847 self.send_method_frame(
1848 method,
1849 send_resolver,
1850 Some(ExpectedReply(
1851 Reply::QueueBindOk(
1852 resolver.clone(),
1853 queue.into(),
1854 exchange.into(),
1855 routing_key.into(),
1856 creation_arguments,
1857 ),
1858 Box::new(resolver),
1859 )),
1860 );
1861 if nowait {
1862 self.receive_queue_bind_ok(protocol::queue::BindOk {})?;
1863 }
1864 promise_out.await?;
1865 promise.await
1866 }
1867 fn receive_queue_bind_ok(&self, method: protocol::queue::BindOk) -> Result<()> {
1868 if !self.status.can_receive_messages() {
1869 return Err(self.status.state_error("queue.bind-ok"));
1870 }
1871
1872 match self
1873 .frames
1874 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueueBindOk(..)))
1875 {
1876 Some(Reply::QueueBindOk(
1877 resolver,
1878 queue,
1879 exchange,
1880 routing_key,
1881 creation_arguments,
1882 )) => {
1883 let res = self.on_queue_bind_ok_received(
1884 queue,
1885 exchange,
1886 routing_key,
1887 creation_arguments,
1888 );
1889 resolver.complete(res.clone());
1890 res
1891 }
1892 unexpected => self.handle_invalid_contents(
1893 format!(
1894 "unexpected queue bind-ok received on channel {}, was awaiting for {:?}",
1895 self.id, unexpected
1896 ),
1897 method.get_amqp_class_id(),
1898 method.get_amqp_method_id(),
1899 ),
1900 }
1901 }
1902 pub async fn queue_purge(
1903 &self,
1904 queue: &str,
1905 options: QueuePurgeOptions,
1906 ) -> Result<MessageCount> {
1907 if !self.status.connected() {
1908 return Err(self.status.state_error("queue.purge"));
1909 }
1910
1911 let QueuePurgeOptions { nowait } = options;
1912 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Purge(protocol::queue::Purge {
1913 queue: queue.into(),
1914 nowait,
1915 }));
1916
1917 let (promise, send_resolver) = Promise::new();
1918 if level_enabled!(Level::TRACE) {
1919 promise.set_marker("queue.purge".into());
1920 }
1921 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1922 if level_enabled!(Level::TRACE) {
1923 promise.set_marker("queue.purge.Ok".into());
1924 }
1925 self.send_method_frame(
1926 method,
1927 send_resolver,
1928 Some(ExpectedReply(
1929 Reply::QueuePurgeOk(resolver.clone()),
1930 Box::new(resolver),
1931 )),
1932 );
1933 promise_out.await?;
1934 promise.await
1935 }
1936 fn receive_queue_purge_ok(&self, method: protocol::queue::PurgeOk) -> Result<()> {
1937 if !self.status.can_receive_messages() {
1938 return Err(self.status.state_error("queue.purge-ok"));
1939 }
1940
1941 match self
1942 .frames
1943 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::QueuePurgeOk(..)))
1944 {
1945 Some(Reply::QueuePurgeOk(resolver)) => {
1946 self.on_queue_purge_ok_received(method, resolver)
1947 }
1948 unexpected => self.handle_invalid_contents(
1949 format!(
1950 "unexpected queue purge-ok received on channel {}, was awaiting for {:?}",
1951 self.id, unexpected
1952 ),
1953 method.get_amqp_class_id(),
1954 method.get_amqp_method_id(),
1955 ),
1956 }
1957 }
1958 pub async fn queue_delete(
1959 &self,
1960 queue: &str,
1961 options: QueueDeleteOptions,
1962 ) -> Result<MessageCount> {
1963 if !self.status.connected() {
1964 return Err(self.status.state_error("queue.delete"));
1965 }
1966
1967 let QueueDeleteOptions {
1968 if_unused,
1969 if_empty,
1970 nowait,
1971 } = options;
1972 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Delete(
1973 protocol::queue::Delete {
1974 queue: queue.into(),
1975 if_unused,
1976 if_empty,
1977 nowait,
1978 },
1979 ));
1980
1981 let (promise, send_resolver) = Promise::new();
1982 if level_enabled!(Level::TRACE) {
1983 promise.set_marker("queue.delete".into());
1984 }
1985 let ((promise, resolver), promise_out) = (Promise::new(), promise);
1986 if level_enabled!(Level::TRACE) {
1987 promise.set_marker("queue.delete.Ok".into());
1988 }
1989 self.send_method_frame(
1990 method,
1991 send_resolver,
1992 Some(ExpectedReply(
1993 Reply::QueueDeleteOk(resolver.clone(), queue.into()),
1994 Box::new(resolver),
1995 )),
1996 );
1997 if nowait {
1998 self.receive_queue_delete_ok(protocol::queue::DeleteOk {
1999 ..Default::default()
2000 })?;
2001 }
2002 promise_out.await?;
2003 promise.await
2004 }
2005 fn receive_queue_delete_ok(&self, method: protocol::queue::DeleteOk) -> Result<()> {
2006 if !self.status.can_receive_messages() {
2007 return Err(self.status.state_error("queue.delete-ok"));
2008 }
2009
2010 match self.frames.find_expected_reply(self.id, |reply| {
2011 matches!(&reply.0, Reply::QueueDeleteOk(..))
2012 }) {
2013 Some(Reply::QueueDeleteOk(resolver, queue)) => {
2014 self.on_queue_delete_ok_received(method, resolver, queue)
2015 }
2016 unexpected => self.handle_invalid_contents(
2017 format!(
2018 "unexpected queue delete-ok received on channel {}, was awaiting for {:?}",
2019 self.id, unexpected
2020 ),
2021 method.get_amqp_class_id(),
2022 method.get_amqp_method_id(),
2023 ),
2024 }
2025 }
2026 pub async fn queue_unbind(
2027 &self,
2028 queue: &str,
2029 exchange: &str,
2030 routing_key: &str,
2031 arguments: FieldTable,
2032 ) -> Result<()> {
2033 if !self.status.connected() {
2034 return Err(self.status.state_error("queue.unbind"));
2035 }
2036
2037 let creation_arguments = arguments.clone();
2038 let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Unbind(
2039 protocol::queue::Unbind {
2040 queue: queue.into(),
2041 exchange: exchange.into(),
2042 routing_key: routing_key.into(),
2043 arguments,
2044 },
2045 ));
2046
2047 let (promise, send_resolver) = Promise::new();
2048 if level_enabled!(Level::TRACE) {
2049 promise.set_marker("queue.unbind".into());
2050 }
2051 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2052 if level_enabled!(Level::TRACE) {
2053 promise.set_marker("queue.unbind.Ok".into());
2054 }
2055 self.send_method_frame(
2056 method,
2057 send_resolver,
2058 Some(ExpectedReply(
2059 Reply::QueueUnbindOk(
2060 resolver.clone(),
2061 queue.into(),
2062 exchange.into(),
2063 routing_key.into(),
2064 creation_arguments,
2065 ),
2066 Box::new(resolver),
2067 )),
2068 );
2069 promise_out.await?;
2070 promise.await
2071 }
2072 fn receive_queue_unbind_ok(&self, method: protocol::queue::UnbindOk) -> Result<()> {
2073 if !self.status.can_receive_messages() {
2074 return Err(self.status.state_error("queue.unbind-ok"));
2075 }
2076
2077 match self.frames.find_expected_reply(self.id, |reply| {
2078 matches!(&reply.0, Reply::QueueUnbindOk(..))
2079 }) {
2080 Some(Reply::QueueUnbindOk(
2081 resolver,
2082 queue,
2083 exchange,
2084 routing_key,
2085 creation_arguments,
2086 )) => {
2087 let res = self.on_queue_unbind_ok_received(
2088 queue,
2089 exchange,
2090 routing_key,
2091 creation_arguments,
2092 );
2093 resolver.complete(res.clone());
2094 res
2095 }
2096 unexpected => self.handle_invalid_contents(
2097 format!(
2098 "unexpected queue unbind-ok received on channel {}, was awaiting for {:?}",
2099 self.id, unexpected
2100 ),
2101 method.get_amqp_class_id(),
2102 method.get_amqp_method_id(),
2103 ),
2104 }
2105 }
2106 pub async fn tx_select(&self) -> Result<()> {
2107 if !self.status.connected() {
2108 return Err(self.status.state_error("tx.select"));
2109 }
2110
2111 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Select(protocol::tx::Select {}));
2112
2113 let (promise, send_resolver) = Promise::new();
2114 if level_enabled!(Level::TRACE) {
2115 promise.set_marker("tx.select".into());
2116 }
2117 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2118 if level_enabled!(Level::TRACE) {
2119 promise.set_marker("tx.select.Ok".into());
2120 }
2121 self.send_method_frame(
2122 method,
2123 send_resolver,
2124 Some(ExpectedReply(
2125 Reply::TxSelectOk(resolver.clone()),
2126 Box::new(resolver),
2127 )),
2128 );
2129 promise_out.await?;
2130 promise.await
2131 }
2132 fn receive_tx_select_ok(&self, method: protocol::tx::SelectOk) -> Result<()> {
2133 if !self.status.can_receive_messages() {
2134 return Err(self.status.state_error("tx.select-ok"));
2135 }
2136
2137 match self
2138 .frames
2139 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxSelectOk(..)))
2140 {
2141 Some(Reply::TxSelectOk(resolver)) => {
2142 let res = Ok(());
2143 resolver.complete(res.clone());
2144 res
2145 }
2146 unexpected => self.handle_invalid_contents(
2147 format!(
2148 "unexpected tx select-ok received on channel {}, was awaiting for {:?}",
2149 self.id, unexpected
2150 ),
2151 method.get_amqp_class_id(),
2152 method.get_amqp_method_id(),
2153 ),
2154 }
2155 }
2156 pub async fn tx_commit(&self) -> Result<()> {
2157 if !self.status.connected() {
2158 return Err(self.status.state_error("tx.commit"));
2159 }
2160
2161 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Commit(protocol::tx::Commit {}));
2162
2163 let (promise, send_resolver) = Promise::new();
2164 if level_enabled!(Level::TRACE) {
2165 promise.set_marker("tx.commit".into());
2166 }
2167 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2168 if level_enabled!(Level::TRACE) {
2169 promise.set_marker("tx.commit.Ok".into());
2170 }
2171 self.send_method_frame(
2172 method,
2173 send_resolver,
2174 Some(ExpectedReply(
2175 Reply::TxCommitOk(resolver.clone()),
2176 Box::new(resolver),
2177 )),
2178 );
2179 promise_out.await?;
2180 promise.await
2181 }
2182 fn receive_tx_commit_ok(&self, method: protocol::tx::CommitOk) -> Result<()> {
2183 if !self.status.can_receive_messages() {
2184 return Err(self.status.state_error("tx.commit-ok"));
2185 }
2186
2187 match self
2188 .frames
2189 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxCommitOk(..)))
2190 {
2191 Some(Reply::TxCommitOk(resolver)) => {
2192 let res = Ok(());
2193 resolver.complete(res.clone());
2194 res
2195 }
2196 unexpected => self.handle_invalid_contents(
2197 format!(
2198 "unexpected tx commit-ok received on channel {}, was awaiting for {:?}",
2199 self.id, unexpected
2200 ),
2201 method.get_amqp_class_id(),
2202 method.get_amqp_method_id(),
2203 ),
2204 }
2205 }
2206 pub async fn tx_rollback(&self) -> Result<()> {
2207 if !self.status.connected() {
2208 return Err(self.status.state_error("tx.rollback"));
2209 }
2210
2211 let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Rollback(
2212 protocol::tx::Rollback {},
2213 ));
2214
2215 let (promise, send_resolver) = Promise::new();
2216 if level_enabled!(Level::TRACE) {
2217 promise.set_marker("tx.rollback".into());
2218 }
2219 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2220 if level_enabled!(Level::TRACE) {
2221 promise.set_marker("tx.rollback.Ok".into());
2222 }
2223 self.send_method_frame(
2224 method,
2225 send_resolver,
2226 Some(ExpectedReply(
2227 Reply::TxRollbackOk(resolver.clone()),
2228 Box::new(resolver),
2229 )),
2230 );
2231 promise_out.await?;
2232 promise.await
2233 }
2234 fn receive_tx_rollback_ok(&self, method: protocol::tx::RollbackOk) -> Result<()> {
2235 if !self.status.can_receive_messages() {
2236 return Err(self.status.state_error("tx.rollback-ok"));
2237 }
2238
2239 match self
2240 .frames
2241 .find_expected_reply(self.id, |reply| matches!(&reply.0, Reply::TxRollbackOk(..)))
2242 {
2243 Some(Reply::TxRollbackOk(resolver)) => {
2244 let res = Ok(());
2245 resolver.complete(res.clone());
2246 res
2247 }
2248 unexpected => self.handle_invalid_contents(
2249 format!(
2250 "unexpected tx rollback-ok received on channel {}, was awaiting for {:?}",
2251 self.id, unexpected
2252 ),
2253 method.get_amqp_class_id(),
2254 method.get_amqp_method_id(),
2255 ),
2256 }
2257 }
2258 pub async fn confirm_select(&self, options: ConfirmSelectOptions) -> Result<()> {
2259 if !self.status.connected_or_recovering() {
2260 return Err(self.status.state_error("confirm.select"));
2261 }
2262
2263 let ConfirmSelectOptions { nowait } = options;
2264 let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::Select(
2265 protocol::confirm::Select { nowait },
2266 ));
2267
2268 let (promise, send_resolver) = Promise::new();
2269 if level_enabled!(Level::TRACE) {
2270 promise.set_marker("confirm.select".into());
2271 }
2272 let ((promise, resolver), promise_out) = (Promise::new(), promise);
2273 if level_enabled!(Level::TRACE) {
2274 promise.set_marker("confirm.select.Ok".into());
2275 }
2276 self.send_method_frame(
2277 method,
2278 send_resolver,
2279 Some(ExpectedReply(
2280 Reply::ConfirmSelectOk(resolver.clone()),
2281 Box::new(resolver),
2282 )),
2283 );
2284 promise_out.await?;
2285 promise.await
2286 }
2287 fn receive_confirm_select_ok(&self, method: protocol::confirm::SelectOk) -> Result<()> {
2288 if !self.status.can_receive_messages() {
2289 return Err(self.status.state_error("confirm.select-ok"));
2290 }
2291
2292 match self.frames.find_expected_reply(self.id, |reply| {
2293 matches!(&reply.0, Reply::ConfirmSelectOk(..))
2294 }) {
2295 Some(Reply::ConfirmSelectOk(resolver)) => {
2296 let res = self.on_confirm_select_ok_received();
2297 resolver.complete(res.clone());
2298 res
2299 }
2300 unexpected => self.handle_invalid_contents(
2301 format!(
2302 "unexpected confirm select-ok received on channel {}, was awaiting for {:?}",
2303 self.id, unexpected
2304 ),
2305 method.get_amqp_class_id(),
2306 method.get_amqp_method_id(),
2307 ),
2308 }
2309 }
2310}