rabbitmq_stream_client/
superstream_consumer.rs1use crate::client::Client;
2use crate::consumer::{ConsumerUpdateListener, Delivery};
3use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
4use crate::superstream::DefaultSuperStreamMetadata;
5use crate::{
6 error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext,
7};
8use futures::task::AtomicWaker;
9use futures::FutureExt;
10use futures::Stream;
11use futures::StreamExt;
12use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
13use std::collections::HashMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering::{Relaxed, SeqCst};
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Receiver};
21use tokio::task;
22
23pub struct SuperStreamConsumer {
25 internal: Arc<SuperStreamConsumerInternal>,
26 receiver: Receiver<Result<Delivery, ConsumerDeliveryError>>,
27}
28
29struct SuperStreamConsumerInternal {
30 closed: Arc<AtomicBool>,
31 handlers: Vec<ConsumerHandle>,
32 waker: AtomicWaker,
33 client: Client,
34}
35
36pub struct SuperStreamConsumerBuilder {
38 pub(crate) super_stream_consumer_name: Option<String>,
39 pub(crate) environment: Environment,
40 pub(crate) offset_specification: OffsetSpecification,
41 pub(crate) filter_configuration: Option<FilterConfiguration>,
42 pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
43 pub(crate) client_provided_name: String,
44 pub(crate) is_single_active_consumer: bool,
45 pub(crate) properties: HashMap<String, String>,
46}
47
48impl SuperStreamConsumerBuilder {
49 pub async fn build(
50 &mut self,
51 super_stream: &str,
52 ) -> Result<SuperStreamConsumer, ConsumerCreateError> {
53 if (self.is_single_active_consumer
56 || self.properties.contains_key("single-active-consumer"))
57 && self.super_stream_consumer_name.is_none()
58 {
59 return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
60 }
61
62 let client = self.environment.create_client().await?;
63 let (tx, rx) = channel(10000);
64
65 let mut super_stream_metadata = DefaultSuperStreamMetadata {
66 super_stream: super_stream.to_string(),
67 client: client.clone(),
68 partitions: Vec::new(),
69 routes: HashMap::new(),
70 };
71 let partitions = super_stream_metadata.partitions().await;
72
73 if self.is_single_active_consumer {
74 self.properties
75 .insert("super-stream".to_string(), super_stream.to_string());
76 }
77
78 let mut handlers = Vec::<ConsumerHandle>::new();
79 for partition in partitions.into_iter() {
80 let tx_cloned = tx.clone();
81 let mut consumer = self
82 .environment
83 .consumer()
84 .name_optional(self.super_stream_consumer_name.clone())
85 .offset(self.offset_specification.clone())
86 .client_provided_name(self.client_provided_name.as_str())
87 .filter_input(self.filter_configuration.clone())
88 .consumer_update_arc(self.consumer_update_listener.clone())
89 .properties(self.properties.clone())
90 .enable_single_active_consumer(self.is_single_active_consumer)
91 .build(partition.as_str())
92 .await
93 .unwrap();
94
95 handlers.push(consumer.handle());
96
97 task::spawn(async move {
98 while let Some(d) = consumer.next().await {
99 _ = tx_cloned.send(d).await;
100 }
101 });
102 }
103
104 let super_stream_consumer_internal = SuperStreamConsumerInternal {
105 closed: Arc::new(AtomicBool::new(false)),
106 handlers,
107 waker: AtomicWaker::new(),
108 client,
109 };
110
111 Ok(SuperStreamConsumer {
112 internal: Arc::new(super_stream_consumer_internal),
113 receiver: rx,
114 })
115 }
116
117 pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
118 self.offset_specification = offset_specification;
119 self
120 }
121
122 pub fn name(mut self, consumer_name: &str) -> Self {
123 self.super_stream_consumer_name = Some(String::from(consumer_name));
124 self
125 }
126
127 pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self {
128 self.is_single_active_consumer = is_single_active_consumer;
129 self
130 }
131
132 pub fn filter_input(mut self, filter_configuration: Option<FilterConfiguration>) -> Self {
133 self.filter_configuration = filter_configuration;
134 self
135 }
136 pub fn consumer_update<Fut>(
137 mut self,
138 consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static,
139 ) -> Self
140 where
141 Fut: Future<Output = OffsetSpecification> + Send + Sync + 'static,
142 {
143 let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed());
144 self.consumer_update_listener = Some(f);
145 self
146 }
147
148 pub fn client_provided_name(mut self, name: &str) -> Self {
149 self.client_provided_name = String::from(name);
150 self
151 }
152
153 pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
154 self.properties = properties;
155 self
156 }
157}
158
159impl Stream for SuperStreamConsumer {
160 type Item = Result<Delivery, ConsumerDeliveryError>;
161
162 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163 self.internal.waker.register(cx.waker());
164 let poll = Pin::new(&mut self.receiver).poll_recv(cx);
165 match (self.is_closed(), poll.is_ready()) {
166 (true, false) => Poll::Ready(None),
167 _ => poll,
168 }
169 }
170}
171
172impl SuperStreamConsumer {
173 pub fn is_closed(&self) -> bool {
175 self.internal.is_closed()
176 }
177
178 pub fn handle(&self) -> SuperStreamConsumerHandle {
179 SuperStreamConsumerHandle(self.internal.clone())
180 }
181
182 pub fn client(&self) -> Client {
183 self.internal.client.clone()
184 }
185}
186
187impl SuperStreamConsumerInternal {
188 fn is_closed(&self) -> bool {
189 self.closed.load(Relaxed)
190 }
191}
192
193pub struct SuperStreamConsumerHandle(Arc<SuperStreamConsumerInternal>);
194
195impl SuperStreamConsumerHandle {
196 pub async fn close(self) -> Result<(), ConsumerCloseError> {
198 self.0.waker.wake();
199 match self.0.closed.compare_exchange(false, true, SeqCst, SeqCst) {
200 Ok(false) => {
201 for handle in &self.0.handlers {
202 handle.internal_close().await.unwrap();
203 }
204 self.0.client.close().await?;
205 Ok(())
206 }
207 _ => Err(ConsumerCloseError::AlreadyClosed),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_send_sync() {
218 fn assert_send_sync<T: Send + Sync>() {}
219 assert_send_sync::<SuperStreamConsumer>();
220 }
221}