amqp_client_rust/api/
callback.rs1use amqprs::{
2 callbacks::{ChannelCallback, ConnectionCallback},
3 channel::Channel,
4 connection::Connection,
5 error::Error as AMQPError,
6 Ack, BasicProperties, Cancel, Close, CloseChannel, Nack, Return
7};
8use async_trait::async_trait;
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::{info, debug, error, warn};
11use crate::api::{connection_manager::ConnectionCommand, utils::ChannelCmd};
12
13pub type AMQPResult<T> = std::result::Result<T, AMQPError>;
14pub struct MyChannelCallback{
15 pub channel_tx: UnboundedSender<ChannelCmd>,
16}
17
18#[async_trait]
19impl ChannelCallback for MyChannelCallback {
20 async fn close(&mut self, channel: &Channel, close: CloseChannel) -> AMQPResult<()> {
21 match close.reply_code() {
22 200 => {
23 info!("Channel closed normally: {}", close);
24 },
25 403_u16..=406_u16 => {
26 error!("Channel closed due to channel exception (Safe to Reopen): {}", close);
27 if let Err(e) = self.channel_tx.send(ChannelCmd::ReOpen(channel.channel_id())) {
28 error!("Failed to send ReOpen command to connection manager: {}", e);
29 }
30 },
31 320_u16|500_u16..=599_u16 => {
32 error!("Channel closed due to fatal connection exception: {}", close);
33 },
34 _ => {
35 error!(
36 "Unhandled close request for channel {}, cause: {}",
37 channel, close
38 );
39 }
40 }
41 Ok(())
42 }
43 async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> AMQPResult<()> {
44 warn!(
45 "handle cancel request for consumer {} on channel {}",
46 _cancel.consumer_tag(),
47 _channel
48 );
49 Ok(())
50 }
51 async fn flow(&mut self, _channel: &Channel, _active: bool) -> AMQPResult<bool> {
52 info!(
53 "handle flow request active={} for channel {}",
54 _active, _channel
55 );
56 Ok(true)
57 }
58 async fn publish_ack(&mut self, _channel: &Channel, ack: Ack) {
59 info!(
60 "handle publish ack delivery_tag={} on channel {}",
61 ack.delivery_tag(),
62 _channel
63 );
64 let tag = ack.delivery_tag();
65 let multiple = ack.mutiple();
66 debug!("Received ACK: tag={}, multiple={}", tag, multiple);
67
68 if let Err(e) = self.channel_tx.send(ChannelCmd::PublishAck((tag, multiple))) {
69 error!("Failed to send ACK to connection manager: {}", e);
70 }
71
72 }
73 async fn publish_nack(&mut self, _channel: &Channel, nack: Nack) {
74 warn!(
75 "handle publish nack delivery_tag={} on channel {}",
76 nack.delivery_tag(),
77 _channel
78 );
79 let tag = nack.delivery_tag();
80 let multiple = nack.multiple();
81
82 if let Err(e) = self.channel_tx.send(ChannelCmd::PublishNack((tag, multiple))) {
83 error!("Failed to send NACK to connection manager: {}", e);
84 }
85
86 }
87 async fn publish_return(
88 &mut self,
89 _channel: &Channel,
90 _ret: Return,
91 _basic_properties: BasicProperties,
92 _content: Vec<u8>,
93 ) {
94 warn!(
95 "handle publish return {} on channel {}, content size: {}",
96 _ret,
97 _channel,
98 _content.len()
99 );
100 }
101}
102
103
104pub struct MyConnectionCallback{
105 pub sender: UnboundedSender<ConnectionCommand>,
106}
107
108
109#[async_trait]
110impl ConnectionCallback for MyConnectionCallback {
111
112 async fn close(&mut self, _connection: &Connection, _close: Close) -> AMQPResult<()> {
113 error!(
114 "handle close request for connection {}, cause: {}",
115 _connection, _close
116 );
117 let _ = self.sender.send(ConnectionCommand::CheckConnection{});
118 Ok(())
119 }
120
121 async fn blocked(&mut self, _connection: &Connection, _reason: String) {
122 debug!(
123 "handle blocked notification for connection {}, reason: {}",
124 _connection, _reason
125 );
126 }
127
128 async fn unblocked(&mut self, _connection: &Connection) {
129 debug!(
130 "handle unblocked notification for connection {}",
131 _connection
132 );
133 }
134
135 async fn secret_updated(&mut self, connection: &Connection){
136 debug!(
137 "handle secret updated notification for connection {}",
138 connection
139 );
140 let _ = self.sender.send(ConnectionCommand::CheckConnection{});
141 }
142}