rabbitmq_stream_client/
superstream_producer.rs1use crate::error::ProducerCloseError;
2use crate::{
3 client::Client,
4 environment::Environment,
5 error::{ProducerCreateError, ProducerPublishError, SuperStreamProducerPublishError},
6 producer::{ConfirmationStatus, NoDedup, Producer},
7 superstream::{DefaultSuperStreamMetadata, RoutingStrategy},
8};
9use rabbitmq_stream_protocol::message::Message;
10use std::collections::HashMap;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::sync::Arc;
14
15type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
16
17#[derive(Clone)]
18pub struct SuperStreamProducer<T>(
19 Arc<SuperStreamProducerInternal>,
20 HashMap<String, Producer<T>>,
21 DefaultSuperStreamMetadata,
22 PhantomData<T>,
23 RoutingStrategy,
24);
25
26pub struct SuperStreamProducerBuilder<T> {
28 pub(crate) environment: Environment,
29 pub filter_value_extractor: Option<FilterValueExtractor>,
30 pub route_strategy: RoutingStrategy,
31 pub(crate) data: PhantomData<T>,
32 pub(crate) client_provided_name: String,
33}
34
35pub struct SuperStreamProducerInternal {
36 pub(crate) environment: Environment,
37 client: Client,
38 filter_value_extractor: Option<FilterValueExtractor>,
40 client_provided_name: String,
41}
42
43impl SuperStreamProducer<NoDedup> {
44 pub async fn send<Fut>(
45 &mut self,
46 message: Message,
47 cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut
48 + Send
49 + Sync
50 + 'static
51 + Clone,
52 ) -> Result<(), SuperStreamProducerPublishError>
53 where
54 Fut: Future<Output = ()> + Send + Sync + 'static,
55 {
56 let routes = match &self.4 {
57 RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
58 routing_strategy.routes(&message, &mut self.2).await
59 }
60 RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
61 routing_strategy.routes(&message, &mut self.2).await
62 }
63 };
64
65 if routes.is_empty() {
66 return Err(crate::error::SuperStreamProducerPublishError::ProducerCreateError());
67 }
68
69 for route in routes.into_iter() {
70 if !self.1.contains_key(route.as_str()) {
71 let producer = self
72 .0
73 .environment
74 .producer()
75 .client_provided_name(self.0.client_provided_name.as_str())
76 .filter_value_extractor_arc(self.0.filter_value_extractor.clone())
77 .build(route.as_str())
78 .await?;
79 self.1.insert(route.clone(), producer);
80 }
81
82 let producer = self.1.get(&route).unwrap();
83 producer.send(message.clone(), cb.clone()).await?;
84 }
85 Ok(())
86 }
87
88 pub async fn close(self) -> Result<(), ProducerCloseError> {
89 self.0.client.close().await?;
90
91 let mut err: Option<ProducerCloseError> = None;
92 let mut is_error = false;
93 for (_, producer) in self.1.into_iter() {
94 let close = producer.close().await;
95 if let Err(e) = close {
96 is_error = true;
97 err = Some(e);
98 }
99 }
100
101 if !is_error {
102 Ok(())
103 } else {
104 Err(err.unwrap())
105 }
106 }
107}
108
109impl<T> SuperStreamProducerBuilder<T> {
110 pub async fn build(
111 self,
112 super_stream: &str,
113 ) -> Result<SuperStreamProducer<T>, ProducerCreateError> {
114 let client = self.environment.create_client().await?;
118
119 let producers = HashMap::new();
120
121 let super_stream_metadata = DefaultSuperStreamMetadata {
122 super_stream: super_stream.to_string(),
123 client: client.clone(),
124 partitions: Vec::new(),
125 routes: HashMap::new(),
126 };
127
128 let super_stream_producer = SuperStreamProducerInternal {
129 environment: self.environment.clone(),
130 client,
131 filter_value_extractor: self.filter_value_extractor,
132 client_provided_name: self.client_provided_name,
133 };
134
135 let internal_producer = Arc::new(super_stream_producer);
136 let super_stream_producer = SuperStreamProducer(
137 internal_producer.clone(),
138 producers,
139 super_stream_metadata,
140 PhantomData,
141 self.route_strategy,
142 );
143
144 Ok(super_stream_producer)
145 }
146
147 pub fn filter_value_extractor(
148 mut self,
149 filter_value_extractor: impl Fn(&Message) -> String + Send + Sync + 'static,
150 ) -> Self {
151 let f = Arc::new(filter_value_extractor);
152 self.filter_value_extractor = Some(f);
153 self
154 }
155
156 pub fn client_provided_name(mut self, name: &str) -> Self {
157 self.client_provided_name = String::from(name);
158 self
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use crate::Dedup;
165
166 use super::*;
167
168 #[test]
169 fn test_send_sync() {
170 fn assert_send_sync<T: Send + Sync>() {}
171
172 assert_send_sync::<SuperStreamProducer<NoDedup>>();
173 assert_send_sync::<SuperStreamProducer<Dedup>>();
174 }
175}