paladin/queue/
in_memory.rs

1//! An in-memory implementation of [`Connection`] and its associated types.
2//!
3//! This implementation is useful for testing and debugging purposes, as it
4//! provides a simple, in-memory queue that can be used to emulate a real queue.
5//! It uses asynchronous synchronization primitives to faithfully emulate the
6//! behavior of a real queue, and is well suited for a multi-threaded and/or
7//! asynchronous environment.
8//!
9//! The [`InMemoryConnection`] is cloneable can be used to simulate a connection
10//! pool to a real queue. Each clone of the connection will maintain references
11//! to same underlying queues.
12use std::{
13    collections::HashMap,
14    pin::Pin,
15    sync::{
16        atomic::{AtomicU64, Ordering},
17        Arc,
18    },
19    task::{Context, Poll},
20};
21
22use anyhow::Result;
23use async_trait::async_trait;
24use bytes::Bytes;
25use crossbeam::queue::SegQueue;
26use dashmap::{mapref::entry::Entry, DashMap};
27use futures::{ready, Stream};
28use tokio::sync::{OwnedSemaphorePermit, Semaphore};
29use tokio_util::sync::PollSemaphore;
30
31use super::{Connection, DeliveryMode, Publisher, QueueHandle, QueueOptions, SyndicationMode};
32use crate::{
33    acker::NoopAcker,
34    serializer::{Serializable, Serializer},
35};
36
37/// An in-memory implementation of [`Connection`].
38///
39/// This implementation maintains a stable set of queues, and is cloneable. Each
40/// clone of the connection will maintain references to the same underlying
41/// queues.
42///
43/// ```
44/// use paladin::queue::{Connection, QueueOptions, in_memory::InMemoryConnection};
45/// use paladin::serializer::Serializer;
46/// use anyhow::Result;
47///
48/// #[tokio::main]
49/// async fn main() -> Result<()> {
50///     let connection = InMemoryConnection::new(Serializer::default());
51///     // Declare a queue
52///     let handle = connection.declare_queue("my_queue", QueueOptions::default()).await?;
53///
54///     // ...
55///     // Delete a queue
56///     connection.delete_queue("my_queue").await?;
57///     
58///     Ok(())
59/// }
60/// ```
61#[derive(Clone)]
62pub struct InMemoryConnection {
63    /// The queues managed by this connection.
64    ///
65    /// Queues are indexed by their name  and stored in an atomically reference
66    /// counted pointer to allow for multiple clones of the connection to
67    /// maintain references to the same queues.
68    queues: Arc<DashMap<String, InMemoryQueueHandle>>,
69    /// The serializer to use for serializing and deserializing messages.
70    serializer: Serializer,
71}
72
73impl InMemoryConnection {
74    pub fn new(serializer: Serializer) -> Self {
75        Self {
76            queues: Default::default(),
77            serializer,
78        }
79    }
80}
81
82#[async_trait]
83impl Connection for InMemoryConnection {
84    type QueueHandle = InMemoryQueueHandle;
85
86    async fn close(&self) -> Result<()> {
87        Ok(())
88    }
89
90    async fn declare_queue(&self, name: &str, options: QueueOptions) -> Result<Self::QueueHandle> {
91        match self.queues.entry(name.to_string()) {
92            Entry::Occupied(entry) => Ok(entry.get().clone()),
93            Entry::Vacant(entry) => {
94                let queue = InMemoryQueueHandle::new(self.serializer, options);
95                entry.insert(queue.clone());
96                Ok(queue)
97            }
98        }
99    }
100
101    async fn delete_queue(&self, name: &str) -> Result<()> {
102        self.queues.remove(name);
103
104        Ok(())
105    }
106}
107
108/// Queue implementation for [`SyndicationMode::ExactlyOnce`].
109///
110/// Exactly once queues must guarantee that each message is delivered to exactly
111/// one consumer. As such, this implementation only has a single `messages`
112/// queue, from which, all consumers will pop messages.
113#[derive(Clone)]
114struct ExactlyOnceQueue {
115    /// The messages in the queue.
116    messages: Arc<SegQueue<Bytes>>,
117    /// The number of messages in the queue.
118    num_messages: PollSemaphore,
119    /// The queue options.
120    _options: QueueOptions,
121    /// The serializer to use for serializing and deserializing messages.
122    serializer: Serializer,
123}
124
125impl Default for ExactlyOnceQueue {
126    fn default() -> Self {
127        Self {
128            messages: Default::default(),
129            num_messages: PollSemaphore::new(Arc::new(Semaphore::new(0))),
130            _options: Default::default(),
131            serializer: Default::default(),
132        }
133    }
134}
135
136impl ExactlyOnceQueue {
137    fn new(options: QueueOptions, serializer: Serializer) -> Self {
138        Self {
139            messages: Default::default(),
140            num_messages: PollSemaphore::new(Arc::new(Semaphore::new(0))),
141            _options: options,
142            serializer,
143        }
144    }
145
146    fn publish<PayloadTarget: Serializable>(&self, payload: &PayloadTarget) -> Result<()> {
147        let bytes = self.serializer.to_bytes(payload)?;
148        self.messages.push(bytes.clone());
149        self.num_messages.add_permits(1);
150        Ok(())
151    }
152
153    fn declare_consumer<PayloadTarget: Serializable>(
154        &self,
155        _consumer_name: &str,
156    ) -> Result<InMemoryConsumer<PayloadTarget>> {
157        Ok(InMemoryConsumer {
158            messages: self.messages.clone(),
159            num_messages: self.num_messages.clone(),
160            serializer: self.serializer,
161            permit: None,
162            _marker: std::marker::PhantomData,
163        })
164    }
165}
166
167/// A per-consumer broadcast queue for [`InMemoryQueueHandle`].
168///
169/// This queue is used to synchronize messages between the broadcast handle and
170/// its consumers.
171///
172/// This differs from the implementation of [`ExactlyOnceQueue`] in that each
173/// consumer has its own queue of messages. This allows for each consumer to
174/// have its own queue of messages, which is required for broadcast queues.
175#[derive(Clone)]
176struct BroadcastConsumer {
177    /// The messages in the queue.
178    messages: Arc<SegQueue<Bytes>>,
179    /// The number of messages in the queue.
180    num_messages: PollSemaphore,
181    /// The seen message IDs.
182    seen: Arc<DashMap<u64, ()>>,
183}
184
185impl Default for BroadcastConsumer {
186    fn default() -> Self {
187        Self {
188            messages: Default::default(),
189            num_messages: PollSemaphore::new(Arc::new(Semaphore::new(0))),
190            seen: Default::default(),
191        }
192    }
193}
194
195/// Queue implementation for [`SyndicationMode::Broadcast`].
196///
197/// Broadcast queues must guarantee that each message is delivered to every
198/// consumer. As such, this implementation maintains a set of consumers, each
199/// with their own queue of messages.
200///
201/// # Design
202/// This implementation maintains a map of consumers, each with their own queue
203/// of messages. When a message is published, it is pushed to each consumer's
204/// queue.
205///
206/// A message history is maintained to support queues declared with the
207/// [`DeliveryMode::Persistent`] option. If this is enabled, messages will be
208/// pushed to the history queue when there are no consumers. When a new consumer
209/// is declared, the history queue will be drained into the new consumer's
210/// queue.
211///
212/// We maintain a global message counter to assign a unique ID to each message.
213/// This allows us to avoid sending messages to consumers that have already seen
214/// the message, especially in a heavily concurrent environment.
215#[derive(Clone, Default)]
216struct BroadcastQueue {
217    /// The consumers subscribed to the broadcast queue.
218    consumers: Arc<DashMap<String, BroadcastConsumer>>,
219    /// All messages.
220    ///
221    /// This can be used to replay messages to new consumers with delivery mode
222    /// set to [`DeliveryMode::Persistent`].
223    history: Arc<SegQueue<(u64, Bytes)>>,
224    /// A global message counter.
225    message_counter: Arc<AtomicU64>,
226    /// The queue options.
227    options: QueueOptions,
228    /// The serializer to use for serializing and deserializing messages.
229    serializer: Serializer,
230}
231
232impl BroadcastQueue {
233    fn new(options: QueueOptions, serializer: Serializer) -> Self {
234        Self {
235            options,
236            serializer,
237            ..Default::default()
238        }
239    }
240
241    fn publish<PayloadTarget: Serializable>(&self, payload: &PayloadTarget) -> Result<()> {
242        let bytes = self.serializer.to_bytes(payload)?;
243        // Assign a unique message ID to the message such that consumers can
244        // track which messages they've seen.
245        let message_id = self.message_counter.fetch_add(1, Ordering::Relaxed);
246
247        // If the delivery mode is persistent and there are no consumers, push
248        // the message to the history queue such that new consumers can replay
249        // the message.
250        if DeliveryMode::Persistent == self.options.delivery_mode && self.consumers.is_empty() {
251            self.history.push((message_id, bytes.clone()));
252        }
253
254        for consumer in self.consumers.iter() {
255            // Newly added consumers may _concurrently_ see messages from history while
256            // propagating the new message. In other words, a new consumer may come online
257            // _while_ new messages are being published. To prevent this, we check if the
258            // consumer has already seen the message.
259            if DeliveryMode::Persistent == self.options.delivery_mode {
260                match consumer.seen.entry(message_id) {
261                    Entry::Occupied(_) => continue,
262                    Entry::Vacant(entry) => {
263                        entry.insert(());
264                    }
265                }
266            }
267
268            consumer.messages.push(bytes.clone());
269            consumer.num_messages.add_permits(1);
270        }
271
272        Ok(())
273    }
274
275    fn declare_consumer<PayloadTarget: Serializable>(
276        &self,
277        consumer_name: &str,
278    ) -> Result<InMemoryConsumer<PayloadTarget>> {
279        match self.consumers.entry(consumer_name.to_string()) {
280            Entry::Occupied(entry) => {
281                let consumer = entry.get().clone();
282                Ok(InMemoryConsumer {
283                    messages: consumer.messages.clone(),
284                    num_messages: consumer.num_messages.clone(),
285                    serializer: self.serializer,
286                    permit: None,
287                    _marker: std::marker::PhantomData,
288                })
289            }
290            Entry::Vacant(entry) => {
291                // If the delivery mode is persistent and there are messages in
292                // the history queue, there are unseen messages that need to be
293                // replayed.
294                let (messages, seen) = if DeliveryMode::Persistent == self.options.delivery_mode
295                    && !self.history.is_empty()
296                {
297                    let messages = SegQueue::new();
298                    let mut seen = HashMap::new();
299
300                    // Populate the new consumer's queue with messages from the
301                    // history queue.
302                    while let Some((message_id, message)) = self.history.pop() {
303                        // Ensure that during a concurrent publish the message is not replayed.
304                        match seen.entry(message_id) {
305                            std::collections::hash_map::Entry::Occupied(_) => continue,
306                            std::collections::hash_map::Entry::Vacant(entry) => {
307                                entry.insert(());
308                            }
309                        }
310
311                        messages.push(message);
312                    }
313                    (messages, seen)
314                } else {
315                    (Default::default(), Default::default())
316                };
317
318                let consumer = BroadcastConsumer {
319                    num_messages: PollSemaphore::new(Arc::new(Semaphore::new(seen.len()))),
320                    messages: Arc::new(messages),
321                    seen: Arc::new(seen.into_iter().collect()),
322                };
323
324                entry.insert(consumer.clone());
325                Ok(InMemoryConsumer {
326                    messages: consumer.messages.clone(),
327                    num_messages: consumer.num_messages.clone(),
328                    serializer: self.serializer,
329                    permit: None,
330                    _marker: std::marker::PhantomData,
331                })
332            }
333        }
334    }
335}
336
337/// An in-memory implementation of [`QueueHandle`].
338///
339/// # Example
340///
341/// ## Exactly Once Mode
342/// ```
343/// # use paladin::{
344/// #     serializer::Serializer,
345/// #     acker::Acker,
346/// #     queue::{
347/// #         Connection,
348/// #         SyndicationMode,
349/// #         DeliveryMode,
350/// #         QueueOptions,
351/// #         QueueHandle,
352/// #         Publisher,
353/// #         in_memory::InMemoryConnection,
354/// #     }
355/// # };
356/// # use serde::{Serialize, Deserialize};
357/// # use anyhow::Result;
358/// # use futures::StreamExt;
359/// #
360/// #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
361/// struct MyStruct {
362///     field: String,
363/// }
364///
365/// #[tokio::main]
366/// async fn main() -> Result<()> {
367///     let connection = InMemoryConnection::new(Serializer::default());
368///
369///     // Declare a queue
370///     let handle = connection.declare_queue("my_queue", QueueOptions {
371///         syndication_mode: SyndicationMode::ExactlyOnce,
372///         delivery_mode: DeliveryMode::Persistent,
373///         ..Default::default()
374///     }).await?;
375///
376///     // Publish a message
377///     handle.publish(&MyStruct { field: "Hello, World!".to_string() }).await?;
378///
379///     // Consume the message
380///     let mut consumer = handle.declare_consumer::<MyStruct>("my_consumer").await?;
381///     if let Some((message, acker)) = consumer.next().await {
382///         acker.ack().await?;
383///         assert_eq!(message, MyStruct { field: "Hello, World!".to_string() });
384///     }
385/// #    
386/// #    Ok(())
387/// }
388/// ```
389///
390/// ## Broadcast Mode
391/// ```
392/// # use paladin::{
393/// #     serializer::Serializer,
394/// #     acker::Acker,
395/// #         queue::{Connection,
396/// #         QueueOptions,
397/// #         SyndicationMode,
398/// #         DeliveryMode,
399/// #         QueueHandle,
400/// #         Publisher,
401/// #         in_memory::InMemoryConnection
402/// #     }
403/// # };
404/// # use serde::{Serialize, Deserialize};
405/// # use anyhow::Result;
406/// # use futures::StreamExt;
407/// #
408/// #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
409/// struct MyStruct {
410///     field: String,
411/// }
412///
413/// #[tokio::main]
414/// async fn main() -> Result<()> {
415///     let connection = InMemoryConnection::new(Serializer::default());
416///
417///     // Declare a queue
418///     let handle = connection.declare_queue("my_queue", QueueOptions {
419///         syndication_mode: SyndicationMode::Broadcast,
420///         ..Default::default()
421///     }).await?;
422///
423///     let mut consumer_1 = handle.declare_consumer::<MyStruct>("consumer_1").await?;
424///     let mut consumer_2 = handle.declare_consumer::<MyStruct>("consumer_2").await?;
425///
426///     // Consume the message in the first consumer
427///     let consumer_task_1 = tokio::spawn(async move {
428///         let (message, acker) = consumer_1.next().await.unwrap();
429///         acker.ack().await.unwrap();
430///         message
431///     });
432///
433///     // Consume the message in the second consumer
434///     let consumer_task_2 = tokio::spawn(async move {
435///         let (message, acker) = consumer_2.next().await.unwrap();
436///         acker.ack().await.unwrap();
437///         message
438///     });
439///
440///     // Publish a message
441///     handle.publish(&MyStruct { field: "Hello, World!".to_string() }).await?;
442///
443///     assert_eq!(consumer_task_1.await.unwrap(), MyStruct { field: "Hello, World!".to_string() });
444///     assert_eq!(consumer_task_2.await.unwrap(), MyStruct { field: "Hello, World!".to_string() });
445/// #
446/// #    Ok(())
447/// }
448/// ```
449#[derive(Clone, Default)]
450pub struct InMemoryQueueHandle {
451    /// The broadcast queue instance.
452    broadcast_queue: BroadcastQueue,
453    /// The exactly once queue instance.
454    exactly_once_queue: ExactlyOnceQueue,
455    /// The queue options.
456    options: QueueOptions,
457}
458
459impl InMemoryQueueHandle {
460    pub fn new(serializer: Serializer, options: QueueOptions) -> Self {
461        Self {
462            options,
463            broadcast_queue: BroadcastQueue::new(options, serializer),
464            exactly_once_queue: ExactlyOnceQueue::new(options, serializer),
465        }
466    }
467}
468
469pub struct InMemoryPublisher<T> {
470    queue_handle: InMemoryQueueHandle,
471    _marker: std::marker::PhantomData<T>,
472}
473
474impl<T> InMemoryPublisher<T> {
475    pub fn new(queue_handle: InMemoryQueueHandle) -> Self {
476        Self {
477            queue_handle,
478            _marker: std::marker::PhantomData,
479        }
480    }
481}
482
483#[async_trait]
484impl<T: Serializable> Publisher<T> for InMemoryPublisher<T> {
485    async fn publish(&self, payload: &T) -> Result<()> {
486        match self.queue_handle.options.syndication_mode {
487            SyndicationMode::ExactlyOnce => self.queue_handle.exactly_once_queue.publish(payload),
488            SyndicationMode::Broadcast => self.queue_handle.broadcast_queue.publish(payload),
489        }
490    }
491
492    async fn close(&self) -> Result<()> {
493        Ok(())
494    }
495}
496
497#[async_trait]
498impl QueueHandle for InMemoryQueueHandle {
499    type Acker = NoopAcker;
500    type Consumer<PayloadTarget: Serializable> = InMemoryConsumer<PayloadTarget>;
501    type Publisher<PayloadTarget: Serializable> = InMemoryPublisher<PayloadTarget>;
502
503    fn publisher<PayloadTarget: Serializable>(&self) -> Self::Publisher<PayloadTarget> {
504        InMemoryPublisher::new(self.clone())
505    }
506
507    async fn declare_consumer<PayloadTarget: Serializable>(
508        &self,
509        consumer_name: &str,
510    ) -> Result<Self::Consumer<PayloadTarget>> {
511        match self.options.syndication_mode {
512            SyndicationMode::ExactlyOnce => self.exactly_once_queue.declare_consumer(consumer_name),
513            SyndicationMode::Broadcast => self.broadcast_queue.declare_consumer(consumer_name),
514        }
515    }
516}
517
518/// A [`Stream`] implementation for [`InMemoryConsumer`].
519///
520/// # Design
521/// This stream will poll the semaphore to acquire a permit, which will return
522/// pending if no messages are available. Once a permit is acquired, the stream
523/// will attempt to acquire a lock on the queue's messages. Once the lock is
524/// acquired, the stream will pop a message from the queue and release the
525/// permit, signaling to the queue that a message has been consumed.
526pub struct InMemoryConsumer<T> {
527    _marker: std::marker::PhantomData<T>,
528    messages: Arc<SegQueue<Bytes>>,
529    permit: Option<OwnedSemaphorePermit>,
530    serializer: Serializer,
531    num_messages: PollSemaphore,
532}
533
534impl<T: Serializable> Stream for InMemoryConsumer<T> {
535    type Item = (T, NoopAcker);
536
537    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
538        let mut this = self.as_mut();
539
540        // If we have a lock future, poll it
541        match this.permit.take() {
542            Some(permit) => {
543                // We have a lock, pop the message
544                let item = this.messages.pop();
545                // Release the permit, signaling that we've consumed a message
546                permit.forget();
547                // Clear the lock future
548                this.permit = None;
549
550                match item {
551                    Some(item) => {
552                        let item = this
553                            .serializer
554                            .from_bytes(&item)
555                            .expect("failed to deserialize");
556
557                        Poll::Ready(Some((item, NoopAcker::new())))
558                    }
559                    None => {
560                        // Should never happen given that permits should correspond 1:1 to
561                        // messages. Error out so we can debug the logic error.
562                        unreachable!("permit was acquired, but no message was available")
563                    }
564                }
565            }
566
567            // Otherwise, wait for a message
568            None => {
569                let permit = ready!(this.num_messages.poll_acquire(cx));
570                match permit {
571                    // If we have a permit, a message should be available
572                    Some(permit) => {
573                        // Create a lock future and poll ourselves
574                        this.permit = Some(permit);
575                        self.poll_next(cx)
576                    }
577                    None => Poll::Pending,
578                }
579            }
580        }
581    }
582}
583
584#[cfg(test)]
585mod helpers {
586    use std::time::Duration;
587
588    use futures::{Future, StreamExt};
589    use serde::{Deserialize, Serialize};
590    use tokio::{
591        task::{JoinError, JoinHandle},
592        try_join,
593    };
594
595    use super::*;
596    use crate::acker::Acker;
597
598    #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
599    pub(super) struct Payload {
600        pub(super) field: i64,
601    }
602
603    pub(super) fn new_payload(field: i64) -> Payload {
604        Payload { field }
605    }
606
607    pub(super) async fn with_timeout<O, F: Future<Output = Result<O, JoinError>>>(
608        fut: F,
609    ) -> Option<O> {
610        let timeout = tokio::time::sleep(Duration::from_millis(10));
611
612        tokio::select! {
613            result = fut => {
614                Some(result.unwrap())
615            }
616            _ = timeout => {
617                None
618            }
619        }
620    }
621
622    pub(super) fn consume_next(mut consumer: InMemoryConsumer<Payload>) -> JoinHandle<Payload> {
623        tokio::spawn(async move {
624            let (payload, acker) = consumer.next().await.unwrap();
625            acker.ack().await.unwrap();
626            payload
627        })
628    }
629
630    pub(super) fn consume_n(
631        consumer: InMemoryConsumer<Payload>,
632        n: usize,
633    ) -> JoinHandle<Vec<Payload>> {
634        tokio::spawn(async move {
635            consumer
636                .then(|(payload, acker)| async move {
637                    acker.ack().await.unwrap();
638                    payload
639                })
640                .take(n)
641                .collect::<Vec<_>>()
642                .await
643        })
644    }
645
646    pub(super) fn consume_n_select(
647        c1: InMemoryConsumer<Payload>,
648        c2: InMemoryConsumer<Payload>,
649        n: usize,
650    ) -> JoinHandle<Vec<Payload>> {
651        tokio::spawn(async move {
652            futures::stream::select(c1, c2)
653                .then(|(payload, acker)| async move {
654                    acker.ack().await.unwrap();
655                    payload
656                })
657                .take(n)
658                .collect::<Vec<_>>()
659                .await
660        })
661    }
662
663    pub(super) async fn consumers<P: Serializable, H: QueueHandle>(
664        queue: &H,
665    ) -> (H::Consumer<P>, H::Consumer<P>) {
666        try_join!(queue.declare_consumer("1"), queue.declare_consumer("2")).unwrap()
667    }
668
669    pub(super) fn publish<H: QueueHandle + Send + Sync + 'static>(
670        queue: &H,
671        payload: &Payload,
672    ) -> JoinHandle<Result<()>>
673    where
674        <H as QueueHandle>::Publisher<Payload>: Send,
675    {
676        let payload = payload.clone();
677        let queue = queue.clone();
678        let publisher = queue.publisher();
679        tokio::spawn(async move { publisher.publish(&payload).await })
680    }
681
682    pub(super) fn publish_multi<H: QueueHandle + Send + Sync + 'static>(
683        queue: &H,
684        payload: &[Payload],
685    ) -> Vec<JoinHandle<Result<()>>>
686    where
687        <H as QueueHandle>::Publisher<Payload>: Send,
688    {
689        payload.iter().map(|p| publish(queue, p)).collect()
690    }
691}
692
693#[cfg(test)]
694mod exactly_once {
695
696    use tokio::{join, try_join};
697
698    use super::helpers::*;
699    use super::*;
700    use crate::queue::*;
701
702    async fn queue_handle() -> InMemoryQueueHandle {
703        let connection = InMemoryConnection::new(Serializer::default());
704        connection
705            .declare_queue(
706                "my_queue",
707                QueueOptions {
708                    delivery_mode: DeliveryMode::Persistent,
709                    syndication_mode: SyndicationMode::ExactlyOnce,
710                    durability: QueueDurability::NonDurable,
711                },
712            )
713            .await
714            .unwrap()
715    }
716
717    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
718    async fn single_message_delivers_once_publish_first() {
719        let queue = queue_handle().await;
720        let clone = queue.clone();
721        publish(&clone, &new_payload(1));
722
723        let (c1, c2) = consumers(&queue).await;
724        let (r1, r2) = (consume_next(c1), consume_next(c2));
725        let (r1, r2) = join!(with_timeout(r1), with_timeout(r2));
726
727        assert!([r1.clone(), r2.clone()].iter().any(|r| r.is_none()));
728        assert!([r1.clone(), r2.clone()]
729            .into_iter()
730            .any(|r| r == Some(new_payload(1))));
731    }
732
733    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
734    async fn single_message_delivers_once_publish_last() {
735        let queue = queue_handle().await;
736
737        let (c1, c2) = consumers(&queue).await;
738        let (r1, r2) = (consume_next(c1), consume_next(c2));
739
740        publish(&queue, &new_payload(1));
741
742        let (r1, r2) = join!(with_timeout(r1), with_timeout(r2));
743
744        assert!([r1.clone(), r2.clone()].iter().any(|p| p.is_none()));
745        assert!([r1.clone(), r2.clone()]
746            .into_iter()
747            .any(|p| p == Some(new_payload(1))));
748    }
749
750    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
751    async fn double_message_delivers_once_publish_first() {
752        let queue = queue_handle().await;
753        publish(&queue, &new_payload(1));
754        publish(&queue, &new_payload(2));
755        let (c1, c2) = consumers(&queue).await;
756        let (r1, r2) = (consume_next(c1), consume_next(c2));
757        let (r1, r2) = try_join!(r1, r2).unwrap();
758
759        assert_ne!(r1, r2)
760    }
761
762    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
763    async fn double_message_delivers_once_publish_last() {
764        let queue = queue_handle().await;
765
766        let (c1, c2) = consumers(&queue).await;
767        let (r1, r2) = (consume_next(c1), consume_next(c2));
768        publish(&queue, &new_payload(1));
769        publish(&queue, &new_payload(2));
770        let (r1, r2) = try_join!(r1, r2).unwrap();
771
772        assert_ne!(r1, r2)
773    }
774
775    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
776    async fn many_messages_single_consumer() {
777        let queue = queue_handle().await;
778        let payloads = (0..100).map(new_payload).collect::<Vec<_>>();
779        publish_multi(&queue, &payloads);
780
781        let c = queue.declare_consumer("1").await.unwrap();
782        let mut results = consume_n(c, payloads.len()).await.unwrap();
783        results.sort_by_key(|a| a.field);
784        assert_eq!(payloads, results)
785    }
786
787    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
788    async fn many_messages_two_consumers() {
789        let queue = queue_handle().await;
790        let payloads = (0..100).map(new_payload).collect::<Vec<_>>();
791        publish_multi(&queue, &payloads);
792
793        let (c1, c2) = consumers(&queue).await;
794        let mut results = consume_n_select(c1, c2, payloads.len()).await.unwrap();
795        results.sort_by_key(|a| a.field);
796        assert_eq!(payloads, results)
797    }
798}
799
800#[cfg(test)]
801mod broadcast {
802    use tokio::{join, try_join};
803
804    use super::helpers::*;
805    use super::*;
806    use crate::queue::*;
807
808    async fn broadcast_handle() -> InMemoryQueueHandle {
809        let connection = InMemoryConnection::new(Default::default());
810        connection
811            .declare_queue(
812                "my_broadcast_queue",
813                QueueOptions {
814                    delivery_mode: DeliveryMode::Persistent,
815                    syndication_mode: SyndicationMode::Broadcast,
816                    durability: QueueDurability::NonDurable,
817                },
818            )
819            .await
820            .unwrap()
821    }
822
823    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
824    async fn single_message_delivers_to_all_publish_last() {
825        let queue = broadcast_handle().await;
826        let expected = new_payload(1);
827
828        let (c1, c2) = consumers(&queue).await;
829        publish(&queue, &expected);
830        let (r1, r2) = try_join!(consume_next(c1), consume_next(c2)).unwrap();
831
832        assert_eq!(expected, r1);
833        assert_eq!(r1, r2)
834    }
835
836    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
837    async fn single_message_delivers_to_at_least_one_publish_first() {
838        let queue = broadcast_handle().await;
839
840        publish(&queue, &new_payload(1));
841
842        let (c1, c2) = consumers(&queue).await;
843        let (r1, r2) = (consume_next(c1), consume_next(c2));
844        let (r1, r2) = join!(with_timeout(r1), with_timeout(r2));
845
846        assert!([r1, r2].into_iter().any(|r| r == Some(new_payload(1))));
847    }
848
849    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
850    async fn many_messages_single_consumer_publish_first() {
851        let queue = broadcast_handle().await;
852        let payloads = (0..5).map(new_payload).collect::<Vec<_>>();
853        publish_multi(&queue, &payloads);
854        let c = queue.declare_consumer("1").await.unwrap();
855
856        let mut results = consume_n(c, payloads.len()).await.unwrap();
857        results.sort_by_key(|a| a.field);
858
859        assert_eq!(payloads, results)
860    }
861
862    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
863    async fn many_messages_single_consumer_publish_last() {
864        let queue = broadcast_handle().await;
865        let payloads = (0..5).map(new_payload).collect::<Vec<_>>();
866        let c = queue.declare_consumer("1").await.unwrap();
867        publish_multi(&queue, &payloads);
868
869        let mut results = consume_n(c, payloads.len()).await.unwrap();
870        results.sort_by_key(|a| a.field);
871
872        assert_eq!(payloads, results)
873    }
874
875    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
876    async fn many_messages_multi_consumer_publish_first() {
877        let queue = broadcast_handle().await;
878        let payloads = (0..5).map(new_payload).collect::<Vec<_>>();
879        publish_multi(&queue, &payloads);
880        let (c1, c2) = consumers(&queue).await;
881
882        let (mut r1, mut r2) = join!(
883            with_timeout(consume_n(c1, payloads.len())),
884            with_timeout(consume_n(c2, payloads.len()))
885        );
886        if let Some(v) = r1.as_mut() {
887            v.sort_by_key(|a| a.field);
888        }
889        if let Some(v) = r2.as_mut() {
890            v.sort_by_key(|a| a.field);
891        }
892        let expected = Some(payloads);
893
894        assert!([r1, r2].into_iter().any(|r| r == expected));
895    }
896
897    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
898    async fn many_messages_multi_consumer_publish_last() {
899        let queue = broadcast_handle().await;
900        let payloads = (0..5).map(new_payload).collect::<Vec<_>>();
901        let (c1, c2) = consumers(&queue).await;
902        publish_multi(&queue, &payloads);
903
904        let (mut r1, mut r2) = join!(
905            with_timeout(consume_n(c1, payloads.len())),
906            with_timeout(consume_n(c2, payloads.len()))
907        );
908        if let Some(v) = r1.as_mut() {
909            v.sort_by_key(|a| a.field);
910        }
911        if let Some(v) = r2.as_mut() {
912            v.sort_by_key(|a| a.field);
913        }
914        let expected = Some(payloads);
915
916        assert!([r1, r2].into_iter().any(|r| r == expected));
917    }
918}