rabbitmq_stream_client/
environment.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use crate::types::OffsetSpecification;
5use crate::{client::TlsConfiguration, producer::NoDedup};
6use rand::prelude::SliceRandom;
7use rand::rngs::StdRng;
8use rand::SeedableRng;
9use std::collections::HashMap;
10
11use crate::{
12    client::{Client, ClientOptions, MetricsCollector},
13    consumer::ConsumerBuilder,
14    error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
15    producer::ProducerBuilder,
16    stream_creator::StreamCreator,
17    superstream::RoutingStrategy,
18    superstream_consumer::SuperStreamConsumerBuilder,
19    superstream_producer::SuperStreamProducerBuilder,
20    RabbitMQStreamResult,
21};
22
23/// Main access point to a node
24#[derive(Clone)]
25pub struct Environment {
26    pub(crate) options: EnvironmentOptions,
27}
28
29impl Environment {
30    pub fn builder() -> EnvironmentBuilder {
31        EnvironmentBuilder(EnvironmentOptions::default())
32    }
33
34    /// Create environment instance from client options.
35    /// This allow to create an `Environment` instance from configuration.
36    ///
37    /// ```rust,no_run
38    /// # async fn doc_fn() -> Result<(), Box<dyn std::error::Error>> {
39    /// use rabbitmq_stream_client::Environment;
40    /// use rabbitmq_stream_client::ClientOptions;
41    ///
42    /// #[derive(serde::Deserialize)]
43    /// struct MyConfig {
44    ///     rabbitmq: ClientOptions
45    /// }
46    ///
47    /// let j = r#"
48    /// {
49    ///     "rabbitmq": {
50    ///         "host": "localhost",
51    ///         "tls": {
52    ///             "enabled": false
53    ///         }
54    ///     }
55    /// }
56    ///         "#;
57    ///  let my_config: MyConfig = serde_json::from_str(j).unwrap();
58    ///  let env = Environment::from_client_option(my_config.rabbitmq)
59    ///     .await
60    ///     .unwrap();
61    /// # Ok(())
62    /// # }
63    #[cfg(feature = "serde")]
64    pub async fn from_client_option(
65        client_options: impl Into<ClientOptions>,
66    ) -> RabbitMQStreamResult<Self> {
67        let env_option = EnvironmentOptions {
68            client_options: client_options.into(),
69        };
70        Self::boostrap(env_option).await
71    }
72
73    async fn boostrap(options: EnvironmentOptions) -> RabbitMQStreamResult<Self> {
74        // check connection
75        let client = Client::connect(options.client_options.clone()).await?;
76        client.close().await?;
77        Ok(Environment { options })
78    }
79
80    pub(crate) async fn create_producer_client(
81        self,
82        stream: &str,
83        client_provided_name: String,
84    ) -> Result<Client, ProducerCreateError> {
85        let mut opt_with_client_provided_name = self.options.client_options.clone();
86        opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
87
88        let mut client = self
89            .create_client_with_options(opt_with_client_provided_name.clone())
90            .await?;
91
92        if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
93            tracing::debug!(
94                "Connecting to leader node {:?} of stream {}",
95                metadata.leader,
96                stream
97            );
98            let load_balancer_mode = self.options.client_options.load_balancer_mode;
99            if load_balancer_mode {
100                // Producer must connect to leader node
101                let options: ClientOptions = self.options.client_options.clone();
102                loop {
103                    let temp_client = Client::connect(options.clone()).await?;
104                    let mapping = temp_client.connection_properties().await;
105                    if let Some(advertised_host) = mapping.get("advertised_host") {
106                        if *advertised_host == metadata.leader.host.clone() {
107                            client.close().await?;
108                            client = temp_client;
109                            break;
110                        }
111                    }
112                    temp_client.close().await?;
113                }
114            } else {
115                client.close().await?;
116                client = Client::connect(ClientOptions {
117                    host: metadata.leader.host.clone(),
118                    port: metadata.leader.port as u16,
119                    ..opt_with_client_provided_name.clone()
120                })
121                .await?
122            };
123        } else {
124            return Err(ProducerCreateError::StreamDoesNotExist {
125                stream: stream.into(),
126            });
127        }
128
129        Ok(client)
130    }
131
132    pub(crate) async fn create_consumer_client(
133        self,
134        stream: &str,
135        client_provided_name: String,
136    ) -> Result<Client, ConsumerCreateError> {
137        let mut opt_with_client_provided_name = self.options.client_options.clone();
138        opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
139
140        let mut client = self
141            .create_client_with_options(opt_with_client_provided_name.clone())
142            .await?;
143
144        if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
145            // If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
146            // This is desired behavior in case there is only one node in the cluster.
147            if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
148                tracing::debug!(
149                    "Picked replica {:?} out of possible candidates {:?} for stream {}",
150                    replica,
151                    metadata.replicas,
152                    stream
153                );
154                let load_balancer_mode = self.options.client_options.load_balancer_mode;
155                if load_balancer_mode {
156                    let options = self.options.client_options.clone();
157                    loop {
158                        let temp_client = Client::connect(options.clone()).await?;
159                        let mapping = temp_client.connection_properties().await;
160                        if let Some(advertised_host) = mapping.get("advertised_host") {
161                            if *advertised_host == replica.host.clone() {
162                                client.close().await?;
163                                client = temp_client;
164                                break;
165                            }
166                        }
167                        temp_client.close().await?;
168                    }
169                } else {
170                    client.close().await?;
171                    client = Client::connect(ClientOptions {
172                        host: replica.host.clone(),
173                        port: replica.port as u16,
174                        ..self.options.client_options
175                    })
176                    .await?;
177                }
178            }
179        } else {
180            return Err(ConsumerCreateError::StreamDoesNotExist {
181                stream: stream.into(),
182            });
183        }
184
185        Ok(client)
186    }
187
188    /// Returns a builder for creating a stream with a specific configuration
189    pub fn stream_creator(&self) -> StreamCreator {
190        StreamCreator::new(self.clone())
191    }
192
193    /// Returns a builder for creating a producer
194    pub fn producer(&self) -> ProducerBuilder<NoDedup> {
195        ProducerBuilder {
196            environment: self.clone(),
197            name: None,
198            batch_size: 100,
199            data: PhantomData,
200            filter_value_extractor: None,
201            client_provided_name: String::from("rust-stream-producer"),
202            on_closed: None,
203            overwrite_heartbeat: None,
204        }
205    }
206
207    pub fn super_stream_producer(
208        &self,
209        routing_strategy: RoutingStrategy,
210    ) -> SuperStreamProducerBuilder<NoDedup> {
211        SuperStreamProducerBuilder {
212            environment: self.clone(),
213            data: PhantomData,
214            filter_value_extractor: None,
215            route_strategy: routing_strategy,
216            client_provided_name: String::from("rust-super-stream-producer"),
217        }
218    }
219
220    /// Returns a builder for creating a consumer
221    pub fn consumer(&self) -> ConsumerBuilder {
222        ConsumerBuilder {
223            consumer_name: None,
224            environment: self.clone(),
225            offset_specification: OffsetSpecification::Next,
226            filter_configuration: None,
227            consumer_update_listener: None,
228            client_provided_name: String::from("rust-stream-consumer"),
229            properties: HashMap::new(),
230            is_single_active_consumer: false,
231        }
232    }
233
234    pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
235        SuperStreamConsumerBuilder {
236            super_stream_consumer_name: None,
237            environment: self.clone(),
238            offset_specification: OffsetSpecification::Next,
239            filter_configuration: None,
240            consumer_update_listener: None,
241            client_provided_name: String::from("rust-super-stream-consumer"),
242            properties: HashMap::new(),
243            is_single_active_consumer: false,
244        }
245    }
246
247    pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
248        Client::connect(self.options.client_options.clone()).await
249    }
250    pub(crate) async fn create_client_with_options(
251        &self,
252        opts: impl Into<ClientOptions>,
253    ) -> RabbitMQStreamResult<Client> {
254        Client::connect(opts).await
255    }
256
257    /// Delete a stream
258    pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
259        let client = self.create_client().await?;
260        let response = client.delete_stream(stream).await?;
261        client.close().await?;
262
263        if response.is_ok() {
264            Ok(())
265        } else {
266            Err(StreamDeleteError::Delete {
267                stream: stream.to_owned(),
268                status: response.code().clone(),
269            })
270        }
271    }
272
273    pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> {
274        let client = self.create_client().await?;
275        let response = client.delete_super_stream(super_stream).await?;
276        client.close().await?;
277
278        if response.is_ok() {
279            Ok(())
280        } else {
281            Err(StreamDeleteError::Delete {
282                stream: super_stream.to_owned(),
283                status: response.code().clone(),
284            })
285        }
286    }
287}
288
289/// Builder for [`Environment`]
290pub struct EnvironmentBuilder(EnvironmentOptions);
291
292impl EnvironmentBuilder {
293    pub async fn build(self) -> RabbitMQStreamResult<Environment> {
294        Environment::boostrap(self.0).await
295    }
296
297    pub fn host(mut self, host: &str) -> EnvironmentBuilder {
298        self.0.client_options.host = host.to_owned();
299        self
300    }
301
302    pub fn username(mut self, username: &str) -> EnvironmentBuilder {
303        self.0.client_options.user = username.to_owned();
304        self
305    }
306
307    pub fn password(mut self, password: &str) -> EnvironmentBuilder {
308        self.0.client_options.password = password.to_owned();
309        self
310    }
311
312    pub fn virtual_host(mut self, virtual_host: &str) -> EnvironmentBuilder {
313        self.0.client_options.v_host = virtual_host.to_owned();
314        self
315    }
316    pub fn port(mut self, port: u16) -> EnvironmentBuilder {
317        self.0.client_options.port = port;
318        self
319    }
320
321    pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder {
322        self.0.client_options.tls = tls_configuration;
323
324        self
325    }
326
327    pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
328        self.0.client_options.heartbeat = heartbeat;
329        self
330    }
331
332    pub fn metrics_collector(
333        mut self,
334        collector: impl MetricsCollector + 'static,
335    ) -> EnvironmentBuilder {
336        self.0.client_options.collector = Arc::new(collector);
337        self
338    }
339
340    pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder {
341        self.0.client_options.load_balancer_mode = load_balancer_mode;
342        self
343    }
344
345    pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder {
346        self.0.client_options.client_provided_name = name.to_owned();
347        self
348    }
349}
350#[derive(Clone, Default)]
351pub struct EnvironmentOptions {
352    pub(crate) client_options: ClientOptions,
353}