1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use crate::types::OffsetSpecification;
5use crate::{client::TlsConfiguration, producer::NoDedup};
6use rand::prelude::IndexedRandom;
7use rand::rngs::StdRng;
8use std::collections::HashMap;
9
10use crate::{
11 client::{Client, ClientOptions, MetricsCollector},
12 consumer::ConsumerBuilder,
13 error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
14 producer::ProducerBuilder,
15 stream_creator::StreamCreator,
16 superstream::RoutingStrategy,
17 superstream_consumer::SuperStreamConsumerBuilder,
18 superstream_producer::SuperStreamProducerBuilder,
19 RabbitMQStreamResult,
20};
21
22#[derive(Clone)]
24pub struct Environment {
25 pub(crate) options: EnvironmentOptions,
26}
27
28impl Environment {
29 pub fn builder() -> EnvironmentBuilder {
30 EnvironmentBuilder(EnvironmentOptions::default())
31 }
32
33 #[cfg(feature = "serde")]
63 pub async fn from_client_option(
64 client_options: impl Into<ClientOptions>,
65 ) -> RabbitMQStreamResult<Self> {
66 let env_option = EnvironmentOptions {
67 client_options: client_options.into(),
68 };
69 Self::boostrap(env_option).await
70 }
71
72 async fn boostrap(options: EnvironmentOptions) -> RabbitMQStreamResult<Self> {
73 let client = Client::connect(options.client_options.clone()).await?;
75 client.close().await?;
76 Ok(Environment { options })
77 }
78
79 pub(crate) async fn create_producer_client(
80 self,
81 stream: &str,
82 client_provided_name: String,
83 ) -> Result<Client, ProducerCreateError> {
84 let mut opt_with_client_provided_name = self.options.client_options.clone();
85 opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
86
87 let mut client = self
88 .create_client_with_options(opt_with_client_provided_name.clone())
89 .await?;
90
91 if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
92 tracing::debug!(
93 "Connecting to leader node {:?} of stream {}",
94 metadata.leader,
95 stream
96 );
97 let load_balancer_mode = self.options.client_options.load_balancer_mode;
98 if load_balancer_mode {
99 let options: ClientOptions = self.options.client_options.clone();
101 loop {
102 let temp_client = Client::connect(options.clone()).await?;
103 let mapping = temp_client.connection_properties().await;
104 if let Some(advertised_host) = mapping.get("advertised_host") {
105 if *advertised_host == metadata.leader.host.clone() {
106 client.close().await?;
107 client = temp_client;
108 break;
109 }
110 }
111 temp_client.close().await?;
112 }
113 } else {
114 client.close().await?;
115 client = Client::connect(ClientOptions {
116 host: metadata.leader.host.clone(),
117 port: metadata.leader.port as u16,
118 ..opt_with_client_provided_name.clone()
119 })
120 .await?
121 };
122 } else {
123 return Err(ProducerCreateError::StreamDoesNotExist {
124 stream: stream.into(),
125 });
126 }
127
128 Ok(client)
129 }
130
131 pub(crate) async fn create_consumer_client(
132 self,
133 stream: &str,
134 client_provided_name: String,
135 ) -> Result<Client, ConsumerCreateError> {
136 let mut opt_with_client_provided_name = self.options.client_options.clone();
137 opt_with_client_provided_name.client_provided_name = client_provided_name.clone();
138
139 let mut client = self
140 .create_client_with_options(opt_with_client_provided_name.clone())
141 .await?;
142
143 if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
144 let mut rng: StdRng = rand::make_rng();
147 if let Some(replica) = metadata.replicas.choose(&mut rng) {
148 tracing::debug!(
149 "Picked replica {:?} out of possible candidates {:?} for stream {}",
150 replica,
151 metadata.replicas,
152 stream
153 );
154 let load_balancer_mode = self.options.client_options.load_balancer_mode;
155 if load_balancer_mode {
156 let options = self.options.client_options.clone();
157 loop {
158 let temp_client = Client::connect(options.clone()).await?;
159 let mapping = temp_client.connection_properties().await;
160 if let Some(advertised_host) = mapping.get("advertised_host") {
161 if *advertised_host == replica.host.clone() {
162 client.close().await?;
163 client = temp_client;
164 break;
165 }
166 }
167 temp_client.close().await?;
168 }
169 } else {
170 client.close().await?;
171 client = Client::connect(ClientOptions {
172 host: replica.host.clone(),
173 port: replica.port as u16,
174 ..self.options.client_options
175 })
176 .await?;
177 }
178 }
179 } else {
180 return Err(ConsumerCreateError::StreamDoesNotExist {
181 stream: stream.into(),
182 });
183 }
184
185 Ok(client)
186 }
187
188 pub fn stream_creator(&self) -> StreamCreator {
190 StreamCreator::new(self.clone())
191 }
192
193 pub fn producer(&self) -> ProducerBuilder<NoDedup> {
195 ProducerBuilder {
196 environment: self.clone(),
197 name: None,
198 batch_size: 100,
199 data: PhantomData,
200 filter_value_extractor: None,
201 client_provided_name: String::from("rust-stream-producer"),
202 on_closed: None,
203 overwrite_heartbeat: None,
204 }
205 }
206
207 pub fn super_stream_producer(
208 &self,
209 routing_strategy: RoutingStrategy,
210 ) -> SuperStreamProducerBuilder<NoDedup> {
211 SuperStreamProducerBuilder {
212 environment: self.clone(),
213 data: PhantomData,
214 filter_value_extractor: None,
215 route_strategy: routing_strategy,
216 client_provided_name: String::from("rust-super-stream-producer"),
217 }
218 }
219
220 pub fn consumer(&self) -> ConsumerBuilder {
222 ConsumerBuilder {
223 consumer_name: None,
224 environment: self.clone(),
225 offset_specification: OffsetSpecification::Next,
226 filter_configuration: None,
227 consumer_update_listener: None,
228 client_provided_name: String::from("rust-stream-consumer"),
229 properties: HashMap::new(),
230 is_single_active_consumer: false,
231 }
232 }
233
234 pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
235 SuperStreamConsumerBuilder {
236 super_stream_consumer_name: None,
237 environment: self.clone(),
238 offset_specification: OffsetSpecification::Next,
239 filter_configuration: None,
240 consumer_update_listener: None,
241 client_provided_name: String::from("rust-super-stream-consumer"),
242 properties: HashMap::new(),
243 is_single_active_consumer: false,
244 }
245 }
246
247 pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
248 Client::connect(self.options.client_options.clone()).await
249 }
250 pub(crate) async fn create_client_with_options(
251 &self,
252 opts: impl Into<ClientOptions>,
253 ) -> RabbitMQStreamResult<Client> {
254 Client::connect(opts).await
255 }
256
257 pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
259 let client = self.create_client().await?;
260 let response = client.delete_stream(stream).await?;
261 client.close().await?;
262
263 if response.is_ok() {
264 Ok(())
265 } else {
266 Err(StreamDeleteError::Delete {
267 stream: stream.to_owned(),
268 status: response.code().clone(),
269 })
270 }
271 }
272
273 pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> {
274 let client = self.create_client().await?;
275 let response = client.delete_super_stream(super_stream).await?;
276 client.close().await?;
277
278 if response.is_ok() {
279 Ok(())
280 } else {
281 Err(StreamDeleteError::Delete {
282 stream: super_stream.to_owned(),
283 status: response.code().clone(),
284 })
285 }
286 }
287}
288
289pub struct EnvironmentBuilder(EnvironmentOptions);
291
292impl EnvironmentBuilder {
293 pub async fn build(self) -> RabbitMQStreamResult<Environment> {
294 Environment::boostrap(self.0).await
295 }
296
297 pub fn host(mut self, host: &str) -> EnvironmentBuilder {
298 self.0.client_options.host = host.to_owned();
299 self
300 }
301
302 pub fn username(mut self, username: &str) -> EnvironmentBuilder {
303 self.0.client_options.user = username.to_owned();
304 self
305 }
306
307 pub fn password(mut self, password: &str) -> EnvironmentBuilder {
308 self.0.client_options.password = password.to_owned();
309 self
310 }
311
312 pub fn virtual_host(mut self, virtual_host: &str) -> EnvironmentBuilder {
313 self.0.client_options.v_host = virtual_host.to_owned();
314 self
315 }
316 pub fn port(mut self, port: u16) -> EnvironmentBuilder {
317 self.0.client_options.port = port;
318 self
319 }
320
321 pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder {
322 self.0.client_options.tls = tls_configuration;
323
324 self
325 }
326
327 pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
328 self.0.client_options.heartbeat = heartbeat;
329 self
330 }
331
332 pub fn metrics_collector(
333 mut self,
334 collector: impl MetricsCollector + 'static,
335 ) -> EnvironmentBuilder {
336 self.0.client_options.collector = Arc::new(collector);
337 self
338 }
339
340 pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder {
341 self.0.client_options.load_balancer_mode = load_balancer_mode;
342 self
343 }
344
345 pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder {
346 self.0.client_options.client_provided_name = name.to_owned();
347 self
348 }
349}
350#[derive(Clone, Default)]
351pub struct EnvironmentOptions {
352 pub(crate) client_options: ClientOptions,
353}