rabbitmq-stream-client 0.11.0

A Rust client for RabbitMQ Stream
Documentation
use crate::client::Client;

use murmur3::murmur3_32;
use rabbitmq_stream_protocol::message::Message;
use std::collections::HashMap;
use std::io::Cursor;

#[derive(Clone)]
pub struct DefaultSuperStreamMetadata {
    pub super_stream: String,
    pub client: Client,
    pub partitions: Vec<String>,
    pub routes: HashMap<String, Vec<String>>,
}

impl DefaultSuperStreamMetadata {
    pub async fn partitions(&mut self) -> Vec<String> {
        if self.partitions.is_empty() {
            let response = self.client.partitions(self.super_stream.clone()).await;

            self.partitions = response.unwrap().streams;
        }
        self.partitions.clone()
    }
    pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
        if !self.routes.contains_key(&routing_key) {
            let response = self
                .client
                .route(routing_key.clone(), self.super_stream.clone())
                .await;

            self.routes
                .insert(routing_key.clone(), response.unwrap().streams);
        }

        self.routes.get(routing_key.as_str()).unwrap().clone()
    }
}

type RoutingExtractor = dyn Fn(&Message) -> String + 'static + Sync + Send;

#[derive(Clone)]
pub struct RoutingKeyRoutingStrategy {
    pub routing_extractor: &'static RoutingExtractor,
}

impl RoutingKeyRoutingStrategy {
    pub async fn routes(
        &self,
        message: &Message,
        metadata: &mut DefaultSuperStreamMetadata,
    ) -> Vec<String> {
        let key = (self.routing_extractor)(message);

        metadata.routes(key).await
    }
}

#[derive(Clone)]
pub struct HashRoutingMurmurStrategy {
    pub routing_extractor: &'static RoutingExtractor,
}

impl HashRoutingMurmurStrategy {
    pub async fn routes(
        &self,
        message: &Message,
        metadata: &mut DefaultSuperStreamMetadata,
    ) -> Vec<String> {
        let mut streams: Vec<String> = Vec::new();

        let key = (self.routing_extractor)(message);
        let hash_result = murmur3_32(&mut Cursor::new(key), 104729);

        let partitions = metadata.partitions().await;
        let number_of_partitions = partitions.len();
        let route = hash_result.unwrap() % number_of_partitions as u32;

        let stream = partitions.into_iter().nth(route as usize).unwrap();
        streams.push(stream);

        streams
    }
}

#[derive(Clone)]
pub enum RoutingStrategy {
    HashRoutingStrategy(HashRoutingMurmurStrategy),
    RoutingKeyStrategy(RoutingKeyRoutingStrategy),
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_send_sync() {
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<RoutingStrategy>();
        assert_send_sync::<HashRoutingMurmurStrategy>();
        assert_send_sync::<RoutingKeyRoutingStrategy>();
    }
}