rabbitmq_stream_client/
superstream_producer.rs

1use crate::error::ProducerCloseError;
2use crate::{
3    client::Client,
4    environment::Environment,
5    error::{ProducerCreateError, ProducerPublishError, SuperStreamProducerPublishError},
6    producer::{ConfirmationStatus, NoDedup, Producer},
7    superstream::{DefaultSuperStreamMetadata, RoutingStrategy},
8};
9use rabbitmq_stream_protocol::message::Message;
10use std::collections::HashMap;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::sync::Arc;
14
15type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
16
17#[derive(Clone)]
18pub struct SuperStreamProducer<T>(
19    Arc<SuperStreamProducerInternal>,
20    HashMap<String, Producer<T>>,
21    DefaultSuperStreamMetadata,
22    PhantomData<T>,
23    RoutingStrategy,
24);
25
26/// Builder for [`SuperStreamProducer`]
27pub struct SuperStreamProducerBuilder<T> {
28    pub(crate) environment: Environment,
29    pub filter_value_extractor: Option<FilterValueExtractor>,
30    pub route_strategy: RoutingStrategy,
31    pub(crate) data: PhantomData<T>,
32    pub(crate) client_provided_name: String,
33}
34
35pub struct SuperStreamProducerInternal {
36    pub(crate) environment: Environment,
37    client: Client,
38    // TODO: implement filtering for superstream
39    filter_value_extractor: Option<FilterValueExtractor>,
40    client_provided_name: String,
41}
42
43impl SuperStreamProducer<NoDedup> {
44    pub async fn send<Fut>(
45        &mut self,
46        message: Message,
47        cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut
48            + Send
49            + Sync
50            + 'static
51            + Clone,
52    ) -> Result<(), SuperStreamProducerPublishError>
53    where
54        Fut: Future<Output = ()> + Send + Sync + 'static,
55    {
56        let routes = match &self.4 {
57            RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
58                routing_strategy.routes(&message, &mut self.2).await
59            }
60            RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
61                routing_strategy.routes(&message, &mut self.2).await
62            }
63        };
64
65        if routes.is_empty() {
66            return Err(crate::error::SuperStreamProducerPublishError::ProducerCreateError());
67        }
68
69        for route in routes.into_iter() {
70            if !self.1.contains_key(route.as_str()) {
71                let producer = self
72                    .0
73                    .environment
74                    .producer()
75                    .client_provided_name(self.0.client_provided_name.as_str())
76                    .filter_value_extractor_arc(self.0.filter_value_extractor.clone())
77                    .build(route.as_str())
78                    .await?;
79                self.1.insert(route.clone(), producer);
80            }
81
82            let producer = self.1.get(&route).unwrap();
83            producer.send(message.clone(), cb.clone()).await?;
84        }
85        Ok(())
86    }
87
88    pub async fn close(self) -> Result<(), ProducerCloseError> {
89        self.0.client.close().await?;
90
91        let mut err: Option<ProducerCloseError> = None;
92        let mut is_error = false;
93        for (_, producer) in self.1.into_iter() {
94            let close = producer.close().await;
95            if let Err(e) = close {
96                is_error = true;
97                err = Some(e);
98            }
99        }
100
101        if !is_error {
102            Ok(())
103        } else {
104            Err(err.unwrap())
105        }
106    }
107}
108
109impl<T> SuperStreamProducerBuilder<T> {
110    pub async fn build(
111        self,
112        super_stream: &str,
113    ) -> Result<SuperStreamProducer<T>, ProducerCreateError> {
114        // Connect to the user specified node first, then look for the stream leader.
115        // The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
116        // to the leader anyway - it is the only one capable of writing.
117        let client = self.environment.create_client().await?;
118
119        let producers = HashMap::new();
120
121        let super_stream_metadata = DefaultSuperStreamMetadata {
122            super_stream: super_stream.to_string(),
123            client: client.clone(),
124            partitions: Vec::new(),
125            routes: HashMap::new(),
126        };
127
128        let super_stream_producer = SuperStreamProducerInternal {
129            environment: self.environment.clone(),
130            client,
131            filter_value_extractor: self.filter_value_extractor,
132            client_provided_name: self.client_provided_name,
133        };
134
135        let internal_producer = Arc::new(super_stream_producer);
136        let super_stream_producer = SuperStreamProducer(
137            internal_producer.clone(),
138            producers,
139            super_stream_metadata,
140            PhantomData,
141            self.route_strategy,
142        );
143
144        Ok(super_stream_producer)
145    }
146
147    pub fn filter_value_extractor(
148        mut self,
149        filter_value_extractor: impl Fn(&Message) -> String + Send + Sync + 'static,
150    ) -> Self {
151        let f = Arc::new(filter_value_extractor);
152        self.filter_value_extractor = Some(f);
153        self
154    }
155
156    pub fn client_provided_name(mut self, name: &str) -> Self {
157        self.client_provided_name = String::from(name);
158        self
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use crate::Dedup;
165
166    use super::*;
167
168    #[test]
169    fn test_send_sync() {
170        fn assert_send_sync<T: Send + Sync>() {}
171
172        assert_send_sync::<SuperStreamProducer<NoDedup>>();
173        assert_send_sync::<SuperStreamProducer<Dedup>>();
174    }
175}