1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use crate::types::OffsetSpecification;
5use crate::{client::TlsConfiguration, producer::NoDedup};
6use rand::prelude::SliceRandom;
7use rand::rngs::StdRng;
8use rand::SeedableRng;
9use std::collections::HashMap;
10
11use crate::{
12 client::{Client, ClientOptions, MetricsCollector},
13 consumer::ConsumerBuilder,
14 error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
15 producer::ProducerBuilder,
16 stream_creator::StreamCreator,
17 superstream::RoutingStrategy,
18 superstream_consumer::SuperStreamConsumerBuilder,
19 superstream_producer::SuperStreamProducerBuilder,
20 RabbitMQStreamResult,
21};
22
23#[derive(Clone)]
25pub struct Environment {
26 pub(crate) options: EnvironmentOptions,
27}
28
29impl Environment {
30 pub fn builder() -> EnvironmentBuilder {
31 EnvironmentBuilder(EnvironmentOptions::default())
32 }
33
34 #[cfg(feature = "serde")]
64 pub async fn from_client_option(
65 client_options: impl Into<ClientOptions>,
66 ) -> RabbitMQStreamResult<Self> {
67 let env_option = EnvironmentOptions {
68 client_options: client_options.into(),
69 };
70 Self::boostrap(env_option).await
71 }
72
73 async fn boostrap(options: EnvironmentOptions) -> RabbitMQStreamResult<Self> {
74 let client = Client::connect(options.client_options.clone()).await?;
76 client.close().await?;
77 Ok(Environment { options })
78 }
79
80 pub(crate) async fn create_producer_client(
81 self,
82 stream: &str,
83 client_provided_name: String,
84 ) -> Result<Client, ProducerCreateError> {
85 let mut opt_with_client_provided_name = self.options.client_options.clone();
86 opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
87
88 let mut client = self
89 .create_client_with_options(opt_with_client_provided_name.clone())
90 .await?;
91
92 if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
93 tracing::debug!(
94 "Connecting to leader node {:?} of stream {}",
95 metadata.leader,
96 stream
97 );
98 let load_balancer_mode = self.options.client_options.load_balancer_mode;
99 if load_balancer_mode {
100 let options: ClientOptions = self.options.client_options.clone();
102 loop {
103 let temp_client = Client::connect(options.clone()).await?;
104 let mapping = temp_client.connection_properties().await;
105 if let Some(advertised_host) = mapping.get("advertised_host") {
106 if *advertised_host == metadata.leader.host.clone() {
107 client.close().await?;
108 client = temp_client;
109 break;
110 }
111 }
112 temp_client.close().await?;
113 }
114 } else {
115 client.close().await?;
116 client = Client::connect(ClientOptions {
117 host: metadata.leader.host.clone(),
118 port: metadata.leader.port as u16,
119 ..opt_with_client_provided_name.clone()
120 })
121 .await?
122 };
123 } else {
124 return Err(ProducerCreateError::StreamDoesNotExist {
125 stream: stream.into(),
126 });
127 }
128
129 Ok(client)
130 }
131
132 pub(crate) async fn create_consumer_client(
133 self,
134 stream: &str,
135 client_provided_name: String,
136 ) -> Result<Client, ConsumerCreateError> {
137 let mut opt_with_client_provided_name = self.options.client_options.clone();
138 opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
139
140 let mut client = self
141 .create_client_with_options(opt_with_client_provided_name.clone())
142 .await?;
143
144 if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
145 if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
148 tracing::debug!(
149 "Picked replica {:?} out of possible candidates {:?} for stream {}",
150 replica,
151 metadata.replicas,
152 stream
153 );
154 let load_balancer_mode = self.options.client_options.load_balancer_mode;
155 if load_balancer_mode {
156 let options = self.options.client_options.clone();
157 loop {
158 let temp_client = Client::connect(options.clone()).await?;
159 let mapping = temp_client.connection_properties().await;
160 if let Some(advertised_host) = mapping.get("advertised_host") {
161 if *advertised_host == replica.host.clone() {
162 client.close().await?;
163 client = temp_client;
164 break;
165 }
166 }
167 temp_client.close().await?;
168 }
169 } else {
170 client.close().await?;
171 client = Client::connect(ClientOptions {
172 host: replica.host.clone(),
173 port: replica.port as u16,
174 ..self.options.client_options
175 })
176 .await?;
177 }
178 }
179 } else {
180 return Err(ConsumerCreateError::StreamDoesNotExist {
181 stream: stream.into(),
182 });
183 }
184
185 Ok(client)
186 }
187
188 pub fn stream_creator(&self) -> StreamCreator {
190 StreamCreator::new(self.clone())
191 }
192
193 pub fn producer(&self) -> ProducerBuilder<NoDedup> {
195 ProducerBuilder {
196 environment: self.clone(),
197 name: None,
198 batch_size: 100,
199 data: PhantomData,
200 filter_value_extractor: None,
201 client_provided_name: String::from("rust-stream-producer"),
202 on_closed: None,
203 overwrite_heartbeat: None,
204 }
205 }
206
207 pub fn super_stream_producer(
208 &self,
209 routing_strategy: RoutingStrategy,
210 ) -> SuperStreamProducerBuilder<NoDedup> {
211 SuperStreamProducerBuilder {
212 environment: self.clone(),
213 data: PhantomData,
214 filter_value_extractor: None,
215 route_strategy: routing_strategy,
216 client_provided_name: String::from("rust-super-stream-producer"),
217 }
218 }
219
220 pub fn consumer(&self) -> ConsumerBuilder {
222 ConsumerBuilder {
223 consumer_name: None,
224 environment: self.clone(),
225 offset_specification: OffsetSpecification::Next,
226 filter_configuration: None,
227 consumer_update_listener: None,
228 client_provided_name: String::from("rust-stream-consumer"),
229 properties: HashMap::new(),
230 is_single_active_consumer: false,
231 }
232 }
233
234 pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
235 SuperStreamConsumerBuilder {
236 super_stream_consumer_name: None,
237 environment: self.clone(),
238 offset_specification: OffsetSpecification::Next,
239 filter_configuration: None,
240 consumer_update_listener: None,
241 client_provided_name: String::from("rust-super-stream-consumer"),
242 properties: HashMap::new(),
243 is_single_active_consumer: false,
244 }
245 }
246
247 pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
248 Client::connect(self.options.client_options.clone()).await
249 }
250 pub(crate) async fn create_client_with_options(
251 &self,
252 opts: impl Into<ClientOptions>,
253 ) -> RabbitMQStreamResult<Client> {
254 Client::connect(opts).await
255 }
256
257 pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
259 let client = self.create_client().await?;
260 let response = client.delete_stream(stream).await?;
261 client.close().await?;
262
263 if response.is_ok() {
264 Ok(())
265 } else {
266 Err(StreamDeleteError::Delete {
267 stream: stream.to_owned(),
268 status: response.code().clone(),
269 })
270 }
271 }
272
273 pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> {
274 let client = self.create_client().await?;
275 let response = client.delete_super_stream(super_stream).await?;
276 client.close().await?;
277
278 if response.is_ok() {
279 Ok(())
280 } else {
281 Err(StreamDeleteError::Delete {
282 stream: super_stream.to_owned(),
283 status: response.code().clone(),
284 })
285 }
286 }
287}
288
289pub struct EnvironmentBuilder(EnvironmentOptions);
291
292impl EnvironmentBuilder {
293 pub async fn build(self) -> RabbitMQStreamResult<Environment> {
294 Environment::boostrap(self.0).await
295 }
296
297 pub fn host(mut self, host: &str) -> EnvironmentBuilder {
298 self.0.client_options.host = host.to_owned();
299 self
300 }
301
302 pub fn username(mut self, username: &str) -> EnvironmentBuilder {
303 self.0.client_options.user = username.to_owned();
304 self
305 }
306
307 pub fn password(mut self, password: &str) -> EnvironmentBuilder {
308 self.0.client_options.password = password.to_owned();
309 self
310 }
311
312 pub fn virtual_host(mut self, virtual_host: &str) -> EnvironmentBuilder {
313 self.0.client_options.v_host = virtual_host.to_owned();
314 self
315 }
316 pub fn port(mut self, port: u16) -> EnvironmentBuilder {
317 self.0.client_options.port = port;
318 self
319 }
320
321 pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder {
322 self.0.client_options.tls = tls_configuration;
323
324 self
325 }
326
327 pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
328 self.0.client_options.heartbeat = heartbeat;
329 self
330 }
331
332 pub fn metrics_collector(
333 mut self,
334 collector: impl MetricsCollector + 'static,
335 ) -> EnvironmentBuilder {
336 self.0.client_options.collector = Arc::new(collector);
337 self
338 }
339
340 pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder {
341 self.0.client_options.load_balancer_mode = load_balancer_mode;
342 self
343 }
344
345 pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder {
346 self.0.client_options.client_provided_name = name.to_owned();
347 self
348 }
349}
350#[derive(Clone, Default)]
351pub struct EnvironmentOptions {
352 pub(crate) client_options: ClientOptions,
353}