Skip to main content

rabbitmq_stream_client/
environment.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use crate::types::OffsetSpecification;
5use crate::{client::TlsConfiguration, producer::NoDedup};
6use rand::prelude::IndexedRandom;
7use rand::rngs::StdRng;
8use std::collections::HashMap;
9
10use crate::{
11    client::{Client, ClientOptions, MetricsCollector},
12    consumer::ConsumerBuilder,
13    error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
14    producer::ProducerBuilder,
15    stream_creator::StreamCreator,
16    superstream::RoutingStrategy,
17    superstream_consumer::SuperStreamConsumerBuilder,
18    superstream_producer::SuperStreamProducerBuilder,
19    RabbitMQStreamResult,
20};
21
22/// Main access point to a node
23#[derive(Clone)]
24pub struct Environment {
25    pub(crate) options: EnvironmentOptions,
26}
27
28impl Environment {
29    pub fn builder() -> EnvironmentBuilder {
30        EnvironmentBuilder(EnvironmentOptions::default())
31    }
32
33    /// Create environment instance from client options.
34    /// This allow to create an `Environment` instance from configuration.
35    ///
36    /// ```rust,no_run
37    /// # async fn doc_fn() -> Result<(), Box<dyn std::error::Error>> {
38    /// use rabbitmq_stream_client::Environment;
39    /// use rabbitmq_stream_client::ClientOptions;
40    ///
41    /// #[derive(serde::Deserialize)]
42    /// struct MyConfig {
43    ///     rabbitmq: ClientOptions
44    /// }
45    ///
46    /// let j = r#"
47    /// {
48    ///     "rabbitmq": {
49    ///         "host": "localhost",
50    ///         "tls": {
51    ///             "enabled": false
52    ///         }
53    ///     }
54    /// }
55    ///         "#;
56    ///  let my_config: MyConfig = serde_json::from_str(j).unwrap();
57    ///  let env = Environment::from_client_option(my_config.rabbitmq)
58    ///     .await
59    ///     .unwrap();
60    /// # Ok(())
61    /// # }
62    #[cfg(feature = "serde")]
63    pub async fn from_client_option(
64        client_options: impl Into<ClientOptions>,
65    ) -> RabbitMQStreamResult<Self> {
66        let env_option = EnvironmentOptions {
67            client_options: client_options.into(),
68        };
69        Self::boostrap(env_option).await
70    }
71
72    async fn boostrap(options: EnvironmentOptions) -> RabbitMQStreamResult<Self> {
73        // check connection
74        let client = Client::connect(options.client_options.clone()).await?;
75        client.close().await?;
76        Ok(Environment { options })
77    }
78
79    pub(crate) async fn create_producer_client(
80        self,
81        stream: &str,
82        client_provided_name: String,
83    ) -> Result<Client, ProducerCreateError> {
84        let mut opt_with_client_provided_name = self.options.client_options.clone();
85        opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
86
87        let mut client = self
88            .create_client_with_options(opt_with_client_provided_name.clone())
89            .await?;
90
91        if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
92            tracing::debug!(
93                "Connecting to leader node {:?} of stream {}",
94                metadata.leader,
95                stream
96            );
97            let load_balancer_mode = self.options.client_options.load_balancer_mode;
98            if load_balancer_mode {
99                // Producer must connect to leader node
100                let options: ClientOptions = self.options.client_options.clone();
101                loop {
102                    let temp_client = Client::connect(options.clone()).await?;
103                    let mapping = temp_client.connection_properties().await;
104                    if let Some(advertised_host) = mapping.get("advertised_host") {
105                        if *advertised_host == metadata.leader.host.clone() {
106                            client.close().await?;
107                            client = temp_client;
108                            break;
109                        }
110                    }
111                    temp_client.close().await?;
112                }
113            } else {
114                client.close().await?;
115                client = Client::connect(ClientOptions {
116                    host: metadata.leader.host.clone(),
117                    port: metadata.leader.port as u16,
118                    ..opt_with_client_provided_name.clone()
119                })
120                .await?
121            };
122        } else {
123            return Err(ProducerCreateError::StreamDoesNotExist {
124                stream: stream.into(),
125            });
126        }
127
128        Ok(client)
129    }
130
131    pub(crate) async fn create_consumer_client(
132        self,
133        stream: &str,
134        client_provided_name: String,
135    ) -> Result<Client, ConsumerCreateError> {
136        let mut opt_with_client_provided_name = self.options.client_options.clone();
137        opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
138
139        let mut client = self
140            .create_client_with_options(opt_with_client_provided_name.clone())
141            .await?;
142
143        if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
144            // If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
145            // This is desired behavior in case there is only one node in the cluster.
146            let mut rng: StdRng = rand::make_rng();
147            if let Some(replica) = metadata.replicas.choose(&mut rng) {
148                tracing::debug!(
149                    "Picked replica {:?} out of possible candidates {:?} for stream {}",
150                    replica,
151                    metadata.replicas,
152                    stream
153                );
154                let load_balancer_mode = self.options.client_options.load_balancer_mode;
155                if load_balancer_mode {
156                    let options = self.options.client_options.clone();
157                    loop {
158                        let temp_client = Client::connect(options.clone()).await?;
159                        let mapping = temp_client.connection_properties().await;
160                        if let Some(advertised_host) = mapping.get("advertised_host") {
161                            if *advertised_host == replica.host.clone() {
162                                client.close().await?;
163                                client = temp_client;
164                                break;
165                            }
166                        }
167                        temp_client.close().await?;
168                    }
169                } else {
170                    client.close().await?;
171                    client = Client::connect(ClientOptions {
172                        host: replica.host.clone(),
173                        port: replica.port as u16,
174                        ..self.options.client_options
175                    })
176                    .await?;
177                }
178            }
179        } else {
180            return Err(ConsumerCreateError::StreamDoesNotExist {
181                stream: stream.into(),
182            });
183        }
184
185        Ok(client)
186    }
187
188    /// Returns a builder for creating a stream with a specific configuration
189    pub fn stream_creator(&self) -> StreamCreator {
190        StreamCreator::new(self.clone())
191    }
192
193    /// Returns a builder for creating a producer
194    pub fn producer(&self) -> ProducerBuilder<NoDedup> {
195        ProducerBuilder {
196            environment: self.clone(),
197            name: None,
198            batch_size: 100,
199            data: PhantomData,
200            filter_value_extractor: None,
201            client_provided_name: String::from("rust-stream-producer"),
202            on_closed: None,
203            overwrite_heartbeat: None,
204        }
205    }
206
207    pub fn super_stream_producer(
208        &self,
209        routing_strategy: RoutingStrategy,
210    ) -> SuperStreamProducerBuilder<NoDedup> {
211        SuperStreamProducerBuilder {
212            environment: self.clone(),
213            data: PhantomData,
214            filter_value_extractor: None,
215            route_strategy: routing_strategy,
216            client_provided_name: String::from("rust-super-stream-producer"),
217        }
218    }
219
220    /// Returns a builder for creating a consumer
221    pub fn consumer(&self) -> ConsumerBuilder {
222        ConsumerBuilder {
223            consumer_name: None,
224            environment: self.clone(),
225            offset_specification: OffsetSpecification::Next,
226            filter_configuration: None,
227            consumer_update_listener: None,
228            client_provided_name: String::from("rust-stream-consumer"),
229            properties: HashMap::new(),
230            is_single_active_consumer: false,
231        }
232    }
233
234    pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
235        SuperStreamConsumerBuilder {
236            super_stream_consumer_name: None,
237            environment: self.clone(),
238            offset_specification: OffsetSpecification::Next,
239            filter_configuration: None,
240            consumer_update_listener: None,
241            client_provided_name: String::from("rust-super-stream-consumer"),
242            properties: HashMap::new(),
243            is_single_active_consumer: false,
244        }
245    }
246
247    pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
248        Client::connect(self.options.client_options.clone()).await
249    }
250    pub(crate) async fn create_client_with_options(
251        &self,
252        opts: impl Into<ClientOptions>,
253    ) -> RabbitMQStreamResult<Client> {
254        Client::connect(opts).await
255    }
256
257    /// Delete a stream
258    pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
259        let client = self.create_client().await?;
260        let response = client.delete_stream(stream).await?;
261        client.close().await?;
262
263        if response.is_ok() {
264            Ok(())
265        } else {
266            Err(StreamDeleteError::Delete {
267                stream: stream.to_owned(),
268                status: response.code().clone(),
269            })
270        }
271    }
272
273    pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> {
274        let client = self.create_client().await?;
275        let response = client.delete_super_stream(super_stream).await?;
276        client.close().await?;
277
278        if response.is_ok() {
279            Ok(())
280        } else {
281            Err(StreamDeleteError::Delete {
282                stream: super_stream.to_owned(),
283                status: response.code().clone(),
284            })
285        }
286    }
287}
288
289/// Builder for [`Environment`]
290pub struct EnvironmentBuilder(EnvironmentOptions);
291
292impl EnvironmentBuilder {
293    pub async fn build(self) -> RabbitMQStreamResult<Environment> {
294        Environment::boostrap(self.0).await
295    }
296
297    pub fn host(mut self, host: &str) -> EnvironmentBuilder {
298        self.0.client_options.host = host.to_owned();
299        self
300    }
301
302    pub fn username(mut self, username: &str) -> EnvironmentBuilder {
303        self.0.client_options.user = username.to_owned();
304        self
305    }
306
307    pub fn password(mut self, password: &str) -> EnvironmentBuilder {
308        self.0.client_options.password = password.to_owned();
309        self
310    }
311
312    pub fn virtual_host(mut self, virtual_host: &str) -> EnvironmentBuilder {
313        self.0.client_options.v_host = virtual_host.to_owned();
314        self
315    }
316    pub fn port(mut self, port: u16) -> EnvironmentBuilder {
317        self.0.client_options.port = port;
318        self
319    }
320
321    pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder {
322        self.0.client_options.tls = tls_configuration;
323
324        self
325    }
326
327    pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
328        self.0.client_options.heartbeat = heartbeat;
329        self
330    }
331
332    pub fn metrics_collector(
333        mut self,
334        collector: impl MetricsCollector + 'static,
335    ) -> EnvironmentBuilder {
336        self.0.client_options.collector = Arc::new(collector);
337        self
338    }
339
340    pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder {
341        self.0.client_options.load_balancer_mode = load_balancer_mode;
342        self
343    }
344
345    pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder {
346        self.0.client_options.client_provided_name = name.to_owned();
347        self
348    }
349}
350#[derive(Clone, Default)]
351pub struct EnvironmentOptions {
352    pub(crate) client_options: ClientOptions,
353}