rabbitmq_stream_client/
stream_creator.rs1use std::{collections::HashMap, time::Duration};
2
3use crate::{byte_capacity::ByteCapacity, environment::Environment, error::StreamCreateError};
4
5pub struct StreamCreator {
7 pub(crate) env: Environment,
8 pub options: HashMap<String, String>,
9}
10
11impl StreamCreator {
12 pub fn new(env: Environment) -> Self {
13 let creator = Self {
14 env,
15 options: HashMap::new(),
16 };
17
18 creator.leader_locator(LeaderLocator::LeastLeaders)
19 }
20
21 pub async fn create(self, stream: &str) -> Result<(), StreamCreateError> {
23 let client = self.env.create_client().await?;
24 let response = client.create_stream(stream, self.options).await?;
25 client.close().await?;
26
27 if response.is_ok() {
28 Ok(())
29 } else {
30 Err(StreamCreateError::Create {
31 stream: stream.to_owned(),
32 status: response.code().clone(),
33 })
34 }
35 }
36
37 pub async fn create_super_stream(
38 self,
39 super_stream: &str,
40 number_of_partitions: usize,
41 binding_keys: Option<Vec<String>>,
42 ) -> Result<(), StreamCreateError> {
43 let binding_keys = binding_keys
44 .unwrap_or_else(|| (0..number_of_partitions).map(|i| i.to_string()).collect());
45 let partition_names = binding_keys
46 .iter()
47 .map(|binding_key| format!("{super_stream}-{binding_key}"))
48 .collect();
49
50 let client = self.env.create_client().await?;
51 let response = client
52 .create_super_stream(super_stream, partition_names, binding_keys, self.options)
53 .await?;
54 client.close().await?;
55
56 if response.is_ok() {
57 Ok(())
58 } else {
59 Err(StreamCreateError::Create {
60 stream: super_stream.to_owned(),
61 status: response.code().clone(),
62 })
63 }
64 }
65
66 pub fn max_age(mut self, max_age: Duration) -> Self {
67 self.options
68 .insert("max-age".to_owned(), format!("{}s", max_age.as_secs()));
69 self
70 }
71 pub fn leader_locator(mut self, leader_locator: LeaderLocator) -> Self {
72 self.options.insert(
73 "queue-leader-locator".to_owned(),
74 leader_locator.as_ref().to_string(),
75 );
76 self
77 }
78 pub fn max_length(mut self, byte_capacity: ByteCapacity) -> Self {
79 self.options.insert(
80 "max-length-bytes".to_owned(),
81 byte_capacity.bytes().to_string(),
82 );
83 self
84 }
85 pub fn max_segment_size(mut self, byte_capacity: ByteCapacity) -> Self {
86 self.options.insert(
87 "stream-max-segment-size-bytes".to_owned(),
88 byte_capacity.bytes().to_string(),
89 );
90 self
91 }
92}
93
94pub enum LeaderLocator {
95 ClientLocal,
96 Random,
97 LeastLeaders,
98}
99
100impl AsRef<str> for LeaderLocator {
101 fn as_ref(&self) -> &str {
102 match self {
103 LeaderLocator::ClientLocal => "client-local",
104 LeaderLocator::Random => "random",
105 LeaderLocator::LeastLeaders => "least-leaders",
106 }
107 }
108}