amqp_client_rust/api/
callback.rs1use amqprs::{
2 callbacks::{ChannelCallback, ConnectionCallback},
3 channel::Channel,
4 connection::Connection,
5 error::Error as AMQPError,
6 Ack, BasicProperties, Cancel, Close, CloseChannel, Nack, Return
7};
8use async_trait::async_trait;
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::{info, debug, error, warn};
11use crate::api::{connection::ConnectionCommand, utils::PendingCmd};
12
13pub type AMQPResult<T> = std::result::Result<T, AMQPError>;
14pub struct MyChannelCallback{
15 pub sender_pending: UnboundedSender<PendingCmd>,
16}
17
18#[async_trait]
19impl ChannelCallback for MyChannelCallback {
20 async fn close(&mut self, _channel: &Channel, _close: CloseChannel) -> AMQPResult<()> {
21 error!(
22 "handle close request for channel {}, cause: {}",
23 _channel, _close
24 );
25 Ok(())
26 }
27 async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> AMQPResult<()> {
28 warn!(
29 "handle cancel request for consumer {} on channel {}",
30 _cancel.consumer_tag(),
31 _channel
32 );
33 Ok(())
34 }
35 async fn flow(&mut self, _channel: &Channel, _active: bool) -> AMQPResult<bool> {
36 info!(
37 "handle flow request active={} for channel {}",
38 _active, _channel
39 );
40 Ok(true)
41 }
42 async fn publish_ack(&mut self, _channel: &Channel, ack: Ack) {
43 info!(
44 "handle publish ack delivery_tag={} on channel {}",
45 ack.delivery_tag(),
46 _channel
47 );
48 let tag = ack.delivery_tag();
49 let multiple = ack.mutiple();
50 debug!("Received ACK: tag={}, multiple={}", tag, multiple);
51
52 if let Err(e) = self.sender_pending.send(PendingCmd::Ack((tag, multiple))) {
53 error!("Failed to send ACK to connection manager: {}", e);
54 }
55
56 }
57 async fn publish_nack(&mut self, _channel: &Channel, nack: Nack) {
58 warn!(
59 "handle publish nack delivery_tag={} on channel {}",
60 nack.delivery_tag(),
61 _channel
62 );
63 let tag = nack.delivery_tag();
64 let multiple = nack.multiple();
65
66 if let Err(e) = self.sender_pending.send(PendingCmd::Nack((tag, multiple))) {
67 error!("Failed to send NACK to connection manager: {}", e);
68 }
69
70 }
71 async fn publish_return(
72 &mut self,
73 _channel: &Channel,
74 _ret: Return,
75 _basic_properties: BasicProperties,
76 _content: Vec<u8>,
77 ) {
78 warn!(
79 "handle publish return {} on channel {}, content size: {}",
80 _ret,
81 _channel,
82 _content.len()
83 );
84 }
85}
86
87
88pub struct MyConnectionCallback{
89 pub sender: UnboundedSender<ConnectionCommand>,
90}
91
92
93#[async_trait]
94impl ConnectionCallback for MyConnectionCallback {
95
96 async fn close(&mut self, _connection: &Connection, _close: Close) -> AMQPResult<()> {
97 error!(
98 "handle close request for connection {}, cause: {}",
99 _connection, _close
100 );
101 let _ = self.sender.send(ConnectionCommand::CheckConnection{});
102 Ok(())
103 }
104
105 async fn blocked(&mut self, _connection: &Connection, _reason: String) {
106 debug!(
107 "handle blocked notification for connection {}, reason: {}",
108 _connection, _reason
109 );
110 }
111
112 async fn unblocked(&mut self, _connection: &Connection) {
113 debug!(
114 "handle unblocked notification for connection {}",
115 _connection
116 );
117 }
118
119 async fn secret_updated(&mut self, connection: &Connection){
120 debug!(
121 "handle secret updated notification for connection {}",
122 connection
123 );
124 let _ = self.sender.send(ConnectionCommand::CheckConnection{});
125 }
126}