Skip to main content

amqp_client_rust/api/
eventbus.rs

1use crate::domain::config::QoSConfig;
2use crate::{
3    api::connection::AsyncConnection,
4    domain::config::Config,
5    errors::AppError,
6};
7use std::error::Error as StdError;
8use std::future::Future;
9use std::sync::Arc;
10use tokio::time::Duration;
11use std::pin::Pin;
12use crate::api::utils::{Confirmations, ContentEncoding, DeliveryMode, Message, QueueOptions};
13
14#[derive(Clone)]
15pub struct AsyncEventbusRabbitMQ {
16    config: Arc<Config>,
17    pub_connection: AsyncConnection,
18    sub_connection: AsyncConnection,
19    rpc_client_connection: AsyncConnection,
20    rpc_server_connection: AsyncConnection,
21}
22
23impl AsyncEventbusRabbitMQ {
24    pub fn new(config: Config, qos_config: QoSConfig) -> Self {
25        let config = Arc::new(config);
26        Self {
27            config: Arc::clone(&config),
28            pub_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.pub_confirm { Confirmations::PublisherConfirms } else { Confirmations::Disables }, false, None),
29            sub_connection: AsyncConnection::new(Arc::clone(&config), Confirmations::Disables, qos_config.sub_auto_ack, qos_config.sub_prefetch),
30            rpc_client_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.rpc_client_confirm { Confirmations::RPCClientPublisherConfirms } else { Confirmations::Disables }, qos_config.rpc_client_auto_ack, qos_config.rpc_client_prefetch),
31            rpc_server_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.rpc_server_confirm { Confirmations::RPCServerPublisherConfirms } else { Confirmations::Disables }, qos_config.rpc_server_auto_ack, qos_config.rpc_server_prefetch),
32        }
33    }
34
35    pub async fn update_secret(&self, new_secret: &str, reason: &str, command_timeout: Option<Duration>) -> Result<(), AppError> {
36        tokio::try_join!(
37            self.pub_connection.update_secret(new_secret, reason, command_timeout),
38            self.sub_connection.update_secret(new_secret, reason, command_timeout),
39            self.rpc_client_connection.update_secret(new_secret, reason, command_timeout),
40            self.rpc_server_connection.update_secret(new_secret, reason, command_timeout)
41        )?;
42        Ok(())
43    }
44    pub async fn publish(
45        &self,
46        exchange_name: &str,
47        routing_key: &str,
48        body: impl Into<Vec<u8>>,
49        content_type: Option<&str>,
50        content_encoding: ContentEncoding,
51        command_timeout: Option<Duration>,
52        delivery_mode: Option<DeliveryMode>,
53        expiration: Option<u32>,
54    ) -> Result<(), AppError> {
55        let content_type = content_type.unwrap_or("application/json");
56        let delivery_mode = delivery_mode.unwrap_or(DeliveryMode::Transient);
57        let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
58
59        self.pub_connection.publish(
60            exchange_name, 
61            routing_key, 
62            body,
63            content_type,
64            content_encoding,
65            command_timeout,
66            delivery_mode,
67            expiration,
68        ).await
69    }
70
71    pub async fn subscribe<F, Fut>(
72        &self,
73        exchange_name: &str,
74        routing_key: &str,
75        handler: F,
76        process_timeout: Option<Duration>,
77        command_timeout: Option<Duration>,
78    ) -> Result<(), AppError>
79    where
80        F: Fn(Message) -> Fut + Send + Sync + 'static,
81        Fut: Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send + 'static,
82    {
83        let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
84        let queue_name = &self.config.options.queue_name;
85        let exchange_type = "topic";
86        
87        let handler = Arc::new(move |data| {
88            Box::pin(handler(data)) as Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send>>
89        });
90        let queue_options = QueueOptions::new()
91            .auto_delete(false)
92            .durable(true)
93            .exclusive(false)
94            .no_create(false);
95
96        self.sub_connection.subscribe(
97            handler,
98            routing_key,
99            exchange_name,
100            exchange_type,
101            queue_name,
102            process_timeout,
103            command_timeout,
104            queue_options
105        ).await
106    }
107
108    pub async fn rpc_client(
109        &self,
110        exchange_name: &str,
111        routing_key: &str,
112        body: impl Into<Vec<u8>>,
113        content_type: &str,
114        content_encoding: ContentEncoding,
115        response_timeout_millis: u32,
116        command_timeout: Option<Duration>,
117        delivery_mode: Option<DeliveryMode>,
118        expiration: Option<u32>
119    ) -> Result<Vec<u8>, AppError>
120    {
121        let command_timeout = command_timeout.or(Some(Duration::from_secs(32)));
122        let delivery_mode = delivery_mode.unwrap_or(DeliveryMode::Transient);
123    
124        self.rpc_client_connection.rpc_client(
125            exchange_name,
126            routing_key,
127            body,
128            content_type,
129            content_encoding,
130            response_timeout_millis,
131            command_timeout,
132            delivery_mode,
133            expiration,
134        ).await
135    }
136
137    pub async fn provide_resource<F, Fut>(
138        &self,
139        routing_key: &str,
140        handler: F,
141        process_timeout: Option<Duration>,
142        command_timeout: Option<Duration>,
143    ) -> Result<(), AppError>
144    where
145        F: Fn(Message) -> Fut + Send + Sync + 'static,
146        Fut: Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send + 'static,
147    {
148        let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
149        let queue_name = &self.config.options.rpc_queue_name;
150        let exchange_name = &self.config.options.rpc_exchange_name;
151        let exchange_type = "topic";
152        
153        let handler = Arc::new(move |data| {
154            Box::pin(handler(data)) as Pin<Box<dyn Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send>>
155        });
156
157        let queue_options = QueueOptions::new()
158            .auto_delete(false)
159            .durable(true)
160            .exclusive(false)
161            .no_create(false);
162
163        self.rpc_server_connection.rpc_server(
164            handler,
165            routing_key,
166            exchange_name,
167            exchange_type,
168            queue_name,
169            process_timeout,
170            command_timeout,
171            queue_options
172        ).await
173    }
174
175    pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
176        self.sub_connection.close().await?;
177        self.rpc_server_connection.close().await?;
178        self.pub_connection.close().await?;
179        self.rpc_client_connection.close().await?;
180        Ok(())
181    }
182}