Skip to main content

amqp_client_rust/api/
callback.rs

1use amqprs::{
2    callbacks::{ChannelCallback, ConnectionCallback},
3    channel::Channel,
4    connection::Connection,
5    error::Error as AMQPError,
6    Ack, BasicProperties, Cancel, Close, CloseChannel, Nack, Return
7};
8use async_trait::async_trait;
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::{info, debug, error, warn};
11use crate::api::{connection::ConnectionCommand, utils::PendingCmd};
12
13pub type AMQPResult<T> = std::result::Result<T, AMQPError>;
14pub struct MyChannelCallback{
15    pub sender_pending: UnboundedSender<PendingCmd>,
16}
17
18#[async_trait]
19impl ChannelCallback for MyChannelCallback {
20    async fn close(&mut self, _channel: &Channel, _close: CloseChannel) -> AMQPResult<()> {
21        error!(
22            "handle close request for channel {}, cause: {}",
23            _channel, _close
24        );
25        Ok(())
26    }
27    async fn cancel(&mut self, _channel: &Channel, _cancel: Cancel) -> AMQPResult<()> {
28        warn!(
29            "handle cancel request for consumer {} on channel {}",
30            _cancel.consumer_tag(),
31            _channel
32        );
33        Ok(())
34    }
35    async fn flow(&mut self, _channel: &Channel, _active: bool) -> AMQPResult<bool> {
36        info!(
37            "handle flow request active={} for channel {}",
38            _active, _channel
39        );
40        Ok(true)
41    }
42    async fn publish_ack(&mut self, _channel: &Channel, ack: Ack) {
43        info!(
44            "handle publish ack delivery_tag={} on channel {}",
45            ack.delivery_tag(),
46            _channel
47        );
48        let tag = ack.delivery_tag();
49        let multiple = ack.mutiple();
50        debug!("Received ACK: tag={}, multiple={}", tag, multiple);
51
52        if let Err(e) = self.sender_pending.send(PendingCmd::Ack((tag, multiple))) {
53            error!("Failed to send ACK to connection manager: {}", e);
54        }
55        
56    }
57    async fn publish_nack(&mut self, _channel: &Channel, nack: Nack) {
58        warn!(
59            "handle publish nack delivery_tag={} on channel {}",
60            nack.delivery_tag(),
61            _channel
62        );
63        let tag = nack.delivery_tag();
64        let multiple = nack.multiple();
65
66        if let Err(e) = self.sender_pending.send(PendingCmd::Nack((tag, multiple))) {
67            error!("Failed to send NACK to connection manager: {}", e);
68        }
69        
70    }
71    async fn publish_return(
72        &mut self,
73        _channel: &Channel,
74        _ret: Return,
75        _basic_properties: BasicProperties,
76        _content: Vec<u8>,
77    ) {
78        warn!(
79            "handle publish return {} on channel {}, content size: {}",
80            _ret,
81            _channel,
82            _content.len()
83        );
84    }
85}
86
87
88pub struct MyConnectionCallback{
89    pub sender: UnboundedSender<ConnectionCommand>,
90}
91
92
93#[async_trait]
94impl ConnectionCallback for MyConnectionCallback {
95
96    async fn close(&mut self, _connection: &Connection, _close: Close) -> AMQPResult<()> {
97        error!(
98            "handle close request for connection {}, cause: {}",
99            _connection, _close
100        );
101        let _ = self.sender.send(ConnectionCommand::CheckConnection{});
102        Ok(())
103    }
104
105    async fn blocked(&mut self, _connection: &Connection, _reason: String) {
106        debug!(
107            "handle blocked notification for connection {}, reason: {}",
108            _connection, _reason
109        );
110    }
111
112    async fn unblocked(&mut self, _connection: &Connection) {
113        debug!(
114            "handle unblocked notification for connection {}",
115            _connection
116        );
117    }
118    
119    async fn secret_updated(&mut self, connection: &Connection){
120        debug!(
121            "handle secret updated notification for connection {}",
122            connection
123        );
124        let _ = self.sender.send(ConnectionCommand::CheckConnection{});
125    }
126}