rabbitmq_stream_client/
superstream_consumer.rs

1use crate::client::Client;
2use crate::consumer::{ConsumerUpdateListener, Delivery};
3use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
4use crate::superstream::DefaultSuperStreamMetadata;
5use crate::{
6    error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext,
7};
8use futures::task::AtomicWaker;
9use futures::FutureExt;
10use futures::Stream;
11use futures::StreamExt;
12use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
13use std::collections::HashMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering::{Relaxed, SeqCst};
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Receiver};
21use tokio::task;
22
23/// API for consuming RabbitMQ stream messages
24pub struct SuperStreamConsumer {
25    internal: Arc<SuperStreamConsumerInternal>,
26    receiver: Receiver<Result<Delivery, ConsumerDeliveryError>>,
27}
28
29struct SuperStreamConsumerInternal {
30    closed: Arc<AtomicBool>,
31    handlers: Vec<ConsumerHandle>,
32    waker: AtomicWaker,
33    client: Client,
34}
35
36/// Builder for [`Consumer`]
37pub struct SuperStreamConsumerBuilder {
38    pub(crate) super_stream_consumer_name: Option<String>,
39    pub(crate) environment: Environment,
40    pub(crate) offset_specification: OffsetSpecification,
41    pub(crate) filter_configuration: Option<FilterConfiguration>,
42    pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
43    pub(crate) client_provided_name: String,
44    pub(crate) is_single_active_consumer: bool,
45    pub(crate) properties: HashMap<String, String>,
46}
47
48impl SuperStreamConsumerBuilder {
49    pub async fn build(
50        &mut self,
51        super_stream: &str,
52    ) -> Result<SuperStreamConsumer, ConsumerCreateError> {
53        // Connect to the user specified node first, then look for a random replica to connect to instead.
54        // This is recommended for load balancing purposes.
55        if (self.is_single_active_consumer
56            || self.properties.contains_key("single-active-consumer"))
57            && self.super_stream_consumer_name.is_none()
58        {
59            return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
60        }
61
62        let client = self.environment.create_client().await?;
63        let (tx, rx) = channel(10000);
64
65        let mut super_stream_metadata = DefaultSuperStreamMetadata {
66            super_stream: super_stream.to_string(),
67            client: client.clone(),
68            partitions: Vec::new(),
69            routes: HashMap::new(),
70        };
71        let partitions = super_stream_metadata.partitions().await;
72
73        if self.is_single_active_consumer {
74            self.properties
75                .insert("super-stream".to_string(), super_stream.to_string());
76        }
77
78        let mut handlers = Vec::<ConsumerHandle>::new();
79        for partition in partitions.into_iter() {
80            let tx_cloned = tx.clone();
81            let mut consumer = self
82                .environment
83                .consumer()
84                .name_optional(self.super_stream_consumer_name.clone())
85                .offset(self.offset_specification.clone())
86                .client_provided_name(self.client_provided_name.as_str())
87                .filter_input(self.filter_configuration.clone())
88                .consumer_update_arc(self.consumer_update_listener.clone())
89                .properties(self.properties.clone())
90                .enable_single_active_consumer(self.is_single_active_consumer)
91                .build(partition.as_str())
92                .await
93                .unwrap();
94
95            handlers.push(consumer.handle());
96
97            task::spawn(async move {
98                while let Some(d) = consumer.next().await {
99                    _ = tx_cloned.send(d).await;
100                }
101            });
102        }
103
104        let super_stream_consumer_internal = SuperStreamConsumerInternal {
105            closed: Arc::new(AtomicBool::new(false)),
106            handlers,
107            waker: AtomicWaker::new(),
108            client,
109        };
110
111        Ok(SuperStreamConsumer {
112            internal: Arc::new(super_stream_consumer_internal),
113            receiver: rx,
114        })
115    }
116
117    pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
118        self.offset_specification = offset_specification;
119        self
120    }
121
122    pub fn name(mut self, consumer_name: &str) -> Self {
123        self.super_stream_consumer_name = Some(String::from(consumer_name));
124        self
125    }
126
127    pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self {
128        self.is_single_active_consumer = is_single_active_consumer;
129        self
130    }
131
132    pub fn filter_input(mut self, filter_configuration: Option<FilterConfiguration>) -> Self {
133        self.filter_configuration = filter_configuration;
134        self
135    }
136    pub fn consumer_update<Fut>(
137        mut self,
138        consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static,
139    ) -> Self
140    where
141        Fut: Future<Output = OffsetSpecification> + Send + Sync + 'static,
142    {
143        let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed());
144        self.consumer_update_listener = Some(f);
145        self
146    }
147
148    pub fn client_provided_name(mut self, name: &str) -> Self {
149        self.client_provided_name = String::from(name);
150        self
151    }
152
153    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
154        self.properties = properties;
155        self
156    }
157}
158
159impl Stream for SuperStreamConsumer {
160    type Item = Result<Delivery, ConsumerDeliveryError>;
161
162    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163        self.internal.waker.register(cx.waker());
164        let poll = Pin::new(&mut self.receiver).poll_recv(cx);
165        match (self.is_closed(), poll.is_ready()) {
166            (true, false) => Poll::Ready(None),
167            _ => poll,
168        }
169    }
170}
171
172impl SuperStreamConsumer {
173    /// Check if the consumer is closed
174    pub fn is_closed(&self) -> bool {
175        self.internal.is_closed()
176    }
177
178    pub fn handle(&self) -> SuperStreamConsumerHandle {
179        SuperStreamConsumerHandle(self.internal.clone())
180    }
181
182    pub fn client(&self) -> Client {
183        self.internal.client.clone()
184    }
185}
186
187impl SuperStreamConsumerInternal {
188    fn is_closed(&self) -> bool {
189        self.closed.load(Relaxed)
190    }
191}
192
193pub struct SuperStreamConsumerHandle(Arc<SuperStreamConsumerInternal>);
194
195impl SuperStreamConsumerHandle {
196    /// Close the [`Consumer`] associated to this handle
197    pub async fn close(self) -> Result<(), ConsumerCloseError> {
198        self.0.waker.wake();
199        match self.0.closed.compare_exchange(false, true, SeqCst, SeqCst) {
200            Ok(false) => {
201                for handle in &self.0.handlers {
202                    handle.internal_close().await.unwrap();
203                }
204                self.0.client.close().await?;
205                Ok(())
206            }
207            _ => Err(ConsumerCloseError::AlreadyClosed),
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_send_sync() {
218        fn assert_send_sync<T: Send + Sync>() {}
219        assert_send_sync::<SuperStreamConsumer>();
220    }
221}