rabbitmq_stream_client/
superstream.rs1use crate::client::Client;
2
3use murmur3::murmur3_32;
4use rabbitmq_stream_protocol::message::Message;
5use std::collections::HashMap;
6use std::io::Cursor;
7
8#[derive(Clone)]
9pub struct DefaultSuperStreamMetadata {
10 pub super_stream: String,
11 pub client: Client,
12 pub partitions: Vec<String>,
13 pub routes: HashMap<String, Vec<String>>,
14}
15
16impl DefaultSuperStreamMetadata {
17 pub async fn partitions(&mut self) -> Vec<String> {
18 if self.partitions.is_empty() {
19 let response = self.client.partitions(self.super_stream.clone()).await;
20
21 self.partitions = response.unwrap().streams;
22 }
23 self.partitions.clone()
24 }
25 pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
26 if !self.routes.contains_key(&routing_key) {
27 let response = self
28 .client
29 .route(routing_key.clone(), self.super_stream.clone())
30 .await;
31
32 self.routes
33 .insert(routing_key.clone(), response.unwrap().streams);
34 }
35
36 self.routes.get(routing_key.as_str()).unwrap().clone()
37 }
38}
39
40type RoutingExtractor = dyn Fn(&Message) -> String + 'static + Sync + Send;
41
42#[derive(Clone)]
43pub struct RoutingKeyRoutingStrategy {
44 pub routing_extractor: &'static RoutingExtractor,
45}
46
47impl RoutingKeyRoutingStrategy {
48 pub async fn routes(
49 &self,
50 message: &Message,
51 metadata: &mut DefaultSuperStreamMetadata,
52 ) -> Vec<String> {
53 let key = (self.routing_extractor)(message);
54
55 metadata.routes(key).await
56 }
57}
58
59#[derive(Clone)]
60pub struct HashRoutingMurmurStrategy {
61 pub routing_extractor: &'static RoutingExtractor,
62}
63
64impl HashRoutingMurmurStrategy {
65 pub async fn routes(
66 &self,
67 message: &Message,
68 metadata: &mut DefaultSuperStreamMetadata,
69 ) -> Vec<String> {
70 let mut streams: Vec<String> = Vec::new();
71
72 let key = (self.routing_extractor)(message);
73 let hash_result = murmur3_32(&mut Cursor::new(key), 104729);
74
75 let partitions = metadata.partitions().await;
76 let number_of_partitions = partitions.len();
77 let route = hash_result.unwrap() % number_of_partitions as u32;
78
79 let stream = partitions.into_iter().nth(route as usize).unwrap();
80 streams.push(stream);
81
82 streams
83 }
84}
85
86#[derive(Clone)]
87pub enum RoutingStrategy {
88 HashRoutingStrategy(HashRoutingMurmurStrategy),
89 RoutingKeyStrategy(RoutingKeyRoutingStrategy),
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn test_send_sync() {
98 fn assert_send_sync<T: Send + Sync>() {}
99 assert_send_sync::<RoutingStrategy>();
100 assert_send_sync::<HashRoutingMurmurStrategy>();
101 assert_send_sync::<RoutingKeyRoutingStrategy>();
102 }
103}