rabbitmq_stream_client/
superstream.rs

1use crate::client::Client;
2
3use murmur3::murmur3_32;
4use rabbitmq_stream_protocol::message::Message;
5use std::collections::HashMap;
6use std::io::Cursor;
7
8#[derive(Clone)]
9pub struct DefaultSuperStreamMetadata {
10    pub super_stream: String,
11    pub client: Client,
12    pub partitions: Vec<String>,
13    pub routes: HashMap<String, Vec<String>>,
14}
15
16impl DefaultSuperStreamMetadata {
17    pub async fn partitions(&mut self) -> Vec<String> {
18        if self.partitions.is_empty() {
19            let response = self.client.partitions(self.super_stream.clone()).await;
20
21            self.partitions = response.unwrap().streams;
22        }
23        self.partitions.clone()
24    }
25    pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
26        if !self.routes.contains_key(&routing_key) {
27            let response = self
28                .client
29                .route(routing_key.clone(), self.super_stream.clone())
30                .await;
31
32            self.routes
33                .insert(routing_key.clone(), response.unwrap().streams);
34        }
35
36        self.routes.get(routing_key.as_str()).unwrap().clone()
37    }
38}
39
40type RoutingExtractor = dyn Fn(&Message) -> String + 'static + Sync + Send;
41
42#[derive(Clone)]
43pub struct RoutingKeyRoutingStrategy {
44    pub routing_extractor: &'static RoutingExtractor,
45}
46
47impl RoutingKeyRoutingStrategy {
48    pub async fn routes(
49        &self,
50        message: &Message,
51        metadata: &mut DefaultSuperStreamMetadata,
52    ) -> Vec<String> {
53        let key = (self.routing_extractor)(message);
54
55        metadata.routes(key).await
56    }
57}
58
59#[derive(Clone)]
60pub struct HashRoutingMurmurStrategy {
61    pub routing_extractor: &'static RoutingExtractor,
62}
63
64impl HashRoutingMurmurStrategy {
65    pub async fn routes(
66        &self,
67        message: &Message,
68        metadata: &mut DefaultSuperStreamMetadata,
69    ) -> Vec<String> {
70        let mut streams: Vec<String> = Vec::new();
71
72        let key = (self.routing_extractor)(message);
73        let hash_result = murmur3_32(&mut Cursor::new(key), 104729);
74
75        let partitions = metadata.partitions().await;
76        let number_of_partitions = partitions.len();
77        let route = hash_result.unwrap() % number_of_partitions as u32;
78
79        let stream = partitions.into_iter().nth(route as usize).unwrap();
80        streams.push(stream);
81
82        streams
83    }
84}
85
86#[derive(Clone)]
87pub enum RoutingStrategy {
88    HashRoutingStrategy(HashRoutingMurmurStrategy),
89    RoutingKeyStrategy(RoutingKeyRoutingStrategy),
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn test_send_sync() {
98        fn assert_send_sync<T: Send + Sync>() {}
99        assert_send_sync::<RoutingStrategy>();
100        assert_send_sync::<HashRoutingMurmurStrategy>();
101        assert_send_sync::<RoutingKeyRoutingStrategy>();
102    }
103}