rabbitmq_stream_client/
stream_creator.rs1use std::{collections::HashMap, time::Duration};
2
3use crate::{byte_capacity::ByteCapacity, environment::Environment, error::StreamCreateError};
4
5pub struct StreamCreator {
7 pub(crate) env: Environment,
8 pub options: HashMap<String, String>,
9}
10
11impl StreamCreator {
12 pub fn new(env: Environment) -> Self {
13 let creator = Self {
14 env,
15 options: HashMap::new(),
16 };
17
18 creator.leader_locator(LeaderLocator::LeastLeaders)
19 }
20
21 pub async fn create(self, stream: &str) -> Result<(), StreamCreateError> {
23 let client = self.env.create_client().await?;
24 let response = client.create_stream(stream, self.options).await?;
25 client.close().await?;
26
27 if response.is_ok() {
28 Ok(())
29 } else {
30 Err(StreamCreateError::Create {
31 stream: stream.to_owned(),
32 status: response.code().clone(),
33 })
34 }
35 }
36
37 pub async fn create_super_stream(
38 self,
39 super_stream: &str,
40 number_of_partitions: usize,
41 binding_keys: Option<Vec<String>>,
42 ) -> Result<(), StreamCreateError> {
43 let mut partitions_names = Vec::with_capacity(number_of_partitions);
44 let mut new_binding_keys: Vec<String> = Vec::with_capacity(number_of_partitions);
45
46 if binding_keys.is_none() {
47 for i in 0..number_of_partitions {
48 new_binding_keys.push(i.to_string());
49 partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str())
50 }
51 } else {
52 new_binding_keys = binding_keys.unwrap();
53 for binding_key in new_binding_keys.iter() {
54 partitions_names.push(super_stream.to_owned() + "-" + binding_key)
55 }
56 }
57
58 let client = self.env.create_client().await?;
59 let response = client
60 .create_super_stream(
61 super_stream,
62 partitions_names,
63 new_binding_keys,
64 self.options,
65 )
66 .await?;
67 client.close().await?;
68
69 if response.is_ok() {
70 Ok(())
71 } else {
72 Err(StreamCreateError::Create {
73 stream: super_stream.to_owned(),
74 status: response.code().clone(),
75 })
76 }
77 }
78
79 pub fn max_age(mut self, max_age: Duration) -> Self {
80 self.options
81 .insert("max-age".to_owned(), format!("{}s", max_age.as_secs()));
82 self
83 }
84 pub fn leader_locator(mut self, leader_locator: LeaderLocator) -> Self {
85 self.options.insert(
86 "queue-leader-locator".to_owned(),
87 leader_locator.as_ref().to_string(),
88 );
89 self
90 }
91 pub fn max_length(mut self, byte_capacity: ByteCapacity) -> Self {
92 self.options.insert(
93 "max-length-bytes".to_owned(),
94 byte_capacity.bytes().to_string(),
95 );
96 self
97 }
98 pub fn max_segment_size(mut self, byte_capacity: ByteCapacity) -> Self {
99 self.options.insert(
100 "stream-max-segment-size-bytes".to_owned(),
101 byte_capacity.bytes().to_string(),
102 );
103 self
104 }
105}
106
107pub enum LeaderLocator {
108 ClientLocal,
109 Random,
110 LeastLeaders,
111}
112
113impl AsRef<str> for LeaderLocator {
114 fn as_ref(&self) -> &str {
115 match self {
116 LeaderLocator::ClientLocal => "client-local",
117 LeaderLocator::Random => "random",
118 LeaderLocator::LeastLeaders => "least-leaders",
119 }
120 }
121}