rabbitmq_stream_client/
stream_creator.rs

1use std::{collections::HashMap, time::Duration};
2
3use crate::{byte_capacity::ByteCapacity, environment::Environment, error::StreamCreateError};
4
5/// Builder for creating a RabbitMQ stream
6pub struct StreamCreator {
7    pub(crate) env: Environment,
8    pub options: HashMap<String, String>,
9}
10
11impl StreamCreator {
12    pub fn new(env: Environment) -> Self {
13        let creator = Self {
14            env,
15            options: HashMap::new(),
16        };
17
18        creator.leader_locator(LeaderLocator::LeastLeaders)
19    }
20
21    /// Create a stream with name and options
22    pub async fn create(self, stream: &str) -> Result<(), StreamCreateError> {
23        let client = self.env.create_client().await?;
24        let response = client.create_stream(stream, self.options).await?;
25        client.close().await?;
26
27        if response.is_ok() {
28            Ok(())
29        } else {
30            Err(StreamCreateError::Create {
31                stream: stream.to_owned(),
32                status: response.code().clone(),
33            })
34        }
35    }
36
37    pub async fn create_super_stream(
38        self,
39        super_stream: &str,
40        number_of_partitions: usize,
41        binding_keys: Option<Vec<String>>,
42    ) -> Result<(), StreamCreateError> {
43        let binding_keys = binding_keys
44            .unwrap_or_else(|| (0..number_of_partitions).map(|i| i.to_string()).collect());
45        let partition_names = binding_keys
46            .iter()
47            .map(|binding_key| format!("{super_stream}-{binding_key}"))
48            .collect();
49
50        let client = self.env.create_client().await?;
51        let response = client
52            .create_super_stream(super_stream, partition_names, binding_keys, self.options)
53            .await?;
54        client.close().await?;
55
56        if response.is_ok() {
57            Ok(())
58        } else {
59            Err(StreamCreateError::Create {
60                stream: super_stream.to_owned(),
61                status: response.code().clone(),
62            })
63        }
64    }
65
66    pub fn max_age(mut self, max_age: Duration) -> Self {
67        self.options
68            .insert("max-age".to_owned(), format!("{}s", max_age.as_secs()));
69        self
70    }
71    pub fn leader_locator(mut self, leader_locator: LeaderLocator) -> Self {
72        self.options.insert(
73            "queue-leader-locator".to_owned(),
74            leader_locator.as_ref().to_string(),
75        );
76        self
77    }
78    pub fn max_length(mut self, byte_capacity: ByteCapacity) -> Self {
79        self.options.insert(
80            "max-length-bytes".to_owned(),
81            byte_capacity.bytes().to_string(),
82        );
83        self
84    }
85    pub fn max_segment_size(mut self, byte_capacity: ByteCapacity) -> Self {
86        self.options.insert(
87            "stream-max-segment-size-bytes".to_owned(),
88            byte_capacity.bytes().to_string(),
89        );
90        self
91    }
92}
93
94pub enum LeaderLocator {
95    ClientLocal,
96    Random,
97    LeastLeaders,
98}
99
100impl AsRef<str> for LeaderLocator {
101    fn as_ref(&self) -> &str {
102        match self {
103            LeaderLocator::ClientLocal => "client-local",
104            LeaderLocator::Random => "random",
105            LeaderLocator::LeastLeaders => "least-leaders",
106        }
107    }
108}