Skip to main content

amqp_client_rust/api/
callback.rs

1use amqprs::{
2    callbacks::{ChannelCallback, ConnectionCallback},
3    channel::Channel,
4    connection::Connection,
5    error::Error as AMQPError,
6    Ack, BasicProperties, Cancel, Close, CloseChannel, Nack, Return
7};
8use async_trait::async_trait;
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::{info, debug, error, warn};
11use crate::api::{connection_manager::ConnectionCommand, utils::ChannelCmd};
12
13pub type AMQPResult<T> = std::result::Result<T, AMQPError>;
14pub struct MyChannelCallback{
15    pub channel_tx: UnboundedSender<ChannelCmd>,
16}
17
18#[async_trait]
19impl ChannelCallback for MyChannelCallback {
20    async fn close(&mut self, channel: &Channel, close: CloseChannel) -> AMQPResult<()> {
21        match close.reply_code() {
22            200 => {
23                info!("Channel closed normally: {}", close);
24            },
25            403_u16..=406_u16 => {
26                error!("Channel closed due to channel exception (Safe to Reopen): {}", close);
27                if let Err(e) = self.channel_tx.send(ChannelCmd::ReOpen(channel.channel_id())) {
28                    error!("Failed to send ReOpen command to connection manager: {}", e);
29                }
30            },
31            320_u16|500_u16..=599_u16 => {
32                error!("Channel closed due to fatal connection exception: {}", close);
33            },
34            _ => {
35                error!(
36                    "Unhandled close request for channel {}, cause: {}",
37                    channel, close
38                );
39            }
40        }
41        Ok(())
42    }
43    async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> AMQPResult<()> {
44        warn!(
45            "handle cancel request for consumer {} on channel {}",
46            _cancel.consumer_tag(),
47            _channel
48        );
49        Ok(())
50    }
51    async fn flow(&mut self, _channel: &Channel, _active: bool) -> AMQPResult<bool> {
52        info!(
53            "handle flow request active={} for channel {}",
54            _active, _channel
55        );
56        Ok(true)
57    }
58    async fn publish_ack(&mut self, _channel: &Channel, ack: Ack) {
59        info!(
60            "handle publish ack delivery_tag={} on channel {}",
61            ack.delivery_tag(),
62            _channel
63        );
64        let tag = ack.delivery_tag();
65        let multiple = ack.mutiple();
66        debug!("Received ACK: tag={}, multiple={}", tag, multiple);
67
68        if let Err(e) = self.channel_tx.send(ChannelCmd::PublishAck((tag, multiple))) {
69            error!("Failed to send ACK to connection manager: {}", e);
70        }
71        
72    }
73    async fn publish_nack(&mut self, _channel: &Channel, nack: Nack) {
74        warn!(
75            "handle publish nack delivery_tag={} on channel {}",
76            nack.delivery_tag(),
77            _channel
78        );
79        let tag = nack.delivery_tag();
80        let multiple = nack.multiple();
81
82        if let Err(e) = self.channel_tx.send(ChannelCmd::PublishNack((tag, multiple))) {
83            error!("Failed to send NACK to connection manager: {}", e);
84        }
85        
86    }
87    async fn publish_return(
88        &mut self,
89        _channel: &Channel,
90        _ret: Return,
91        _basic_properties: BasicProperties,
92        _content: Vec<u8>,
93    ) {
94        warn!(
95            "handle publish return {} on channel {}, content size: {}",
96            _ret,
97            _channel,
98            _content.len()
99        );
100    }
101}
102
103
104pub struct MyConnectionCallback{
105    pub sender: UnboundedSender<ConnectionCommand>,
106}
107
108
109#[async_trait]
110impl ConnectionCallback for MyConnectionCallback {
111
112    async fn close(&mut self, _connection: &Connection, _close: Close) -> AMQPResult<()> {
113        error!(
114            "handle close request for connection {}, cause: {}",
115            _connection, _close
116        );
117        let _ = self.sender.send(ConnectionCommand::CheckConnection{});
118        Ok(())
119    }
120
121    async fn blocked(&mut self, _connection: &Connection, _reason: String) {
122        debug!(
123            "handle blocked notification for connection {}, reason: {}",
124            _connection, _reason
125        );
126    }
127
128    async fn unblocked(&mut self, _connection: &Connection) {
129        debug!(
130            "handle unblocked notification for connection {}",
131            _connection
132        );
133    }
134    
135    async fn secret_updated(&mut self, connection: &Connection){
136        debug!(
137            "handle secret updated notification for connection {}",
138            connection
139        );
140        let _ = self.sender.send(ConnectionCommand::CheckConnection{});
141    }
142}