rabbitmq_stream_client/
stream_creator.rs

1use std::{collections::HashMap, time::Duration};
2
3use crate::{byte_capacity::ByteCapacity, environment::Environment, error::StreamCreateError};
4
5/// Builder for creating a RabbitMQ stream
6pub struct StreamCreator {
7    pub(crate) env: Environment,
8    pub options: HashMap<String, String>,
9}
10
11impl StreamCreator {
12    pub fn new(env: Environment) -> Self {
13        let creator = Self {
14            env,
15            options: HashMap::new(),
16        };
17
18        creator.leader_locator(LeaderLocator::LeastLeaders)
19    }
20
21    /// Create a stream with name and options
22    pub async fn create(self, stream: &str) -> Result<(), StreamCreateError> {
23        let client = self.env.create_client().await?;
24        let response = client.create_stream(stream, self.options).await?;
25        client.close().await?;
26
27        if response.is_ok() {
28            Ok(())
29        } else {
30            Err(StreamCreateError::Create {
31                stream: stream.to_owned(),
32                status: response.code().clone(),
33            })
34        }
35    }
36
37    pub async fn create_super_stream(
38        self,
39        super_stream: &str,
40        number_of_partitions: usize,
41        binding_keys: Option<Vec<String>>,
42    ) -> Result<(), StreamCreateError> {
43        let mut partitions_names = Vec::with_capacity(number_of_partitions);
44        let mut new_binding_keys: Vec<String> = Vec::with_capacity(number_of_partitions);
45
46        if binding_keys.is_none() {
47            for i in 0..number_of_partitions {
48                new_binding_keys.push(i.to_string());
49                partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str())
50            }
51        } else {
52            new_binding_keys = binding_keys.unwrap();
53            for binding_key in new_binding_keys.iter() {
54                partitions_names.push(super_stream.to_owned() + "-" + binding_key)
55            }
56        }
57
58        let client = self.env.create_client().await?;
59        let response = client
60            .create_super_stream(
61                super_stream,
62                partitions_names,
63                new_binding_keys,
64                self.options,
65            )
66            .await?;
67        client.close().await?;
68
69        if response.is_ok() {
70            Ok(())
71        } else {
72            Err(StreamCreateError::Create {
73                stream: super_stream.to_owned(),
74                status: response.code().clone(),
75            })
76        }
77    }
78
79    pub fn max_age(mut self, max_age: Duration) -> Self {
80        self.options
81            .insert("max-age".to_owned(), format!("{}s", max_age.as_secs()));
82        self
83    }
84    pub fn leader_locator(mut self, leader_locator: LeaderLocator) -> Self {
85        self.options.insert(
86            "queue-leader-locator".to_owned(),
87            leader_locator.as_ref().to_string(),
88        );
89        self
90    }
91    pub fn max_length(mut self, byte_capacity: ByteCapacity) -> Self {
92        self.options.insert(
93            "max-length-bytes".to_owned(),
94            byte_capacity.bytes().to_string(),
95        );
96        self
97    }
98    pub fn max_segment_size(mut self, byte_capacity: ByteCapacity) -> Self {
99        self.options.insert(
100            "stream-max-segment-size-bytes".to_owned(),
101            byte_capacity.bytes().to_string(),
102        );
103        self
104    }
105}
106
107pub enum LeaderLocator {
108    ClientLocal,
109    Random,
110    LeastLeaders,
111}
112
113impl AsRef<str> for LeaderLocator {
114    fn as_ref(&self) -> &str {
115        match self {
116            LeaderLocator::ClientLocal => "client-local",
117            LeaderLocator::Random => "random",
118            LeaderLocator::LeastLeaders => "least-leaders",
119        }
120    }
121}