1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
//! An in-memory implementation of [`Queue`] and its associated types.
//!
//! This implementation is useful for testing and debugging purposes, as it
//! provides a simple, in-memory queue that can be used to emulate a real queue.
//! It uses asynchronous synchronization primitives to faithfully emulate the
//! behavior of a real queue, and is well suited for a multi-threaded and/or
//! asynchronous environment.
//!
//! The [`InMemoryConnection`] is cloneable can be used to simulate a connection
//! pool to a real queue. Each clone of the connection will maintain references
//! to same underlying queues.
use std::{
    collections::{HashMap, VecDeque},
    pin::{pin, Pin},
    sync::Arc,
    task::{Context, Poll},
};

use anyhow::Result;
use async_trait::async_trait;
use futures::{
    lock::{Mutex, OwnedMutexLockFuture},
    ready, Future, Stream,
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;

use super::{Connection, Consumer, Queue, QueueHandle};
use crate::{
    acker::NoopAcker,
    serializer::{Serializable, Serializer},
};

/// An in-memory implementation of [`Queue`].
///
/// ```
/// use paladin::queue::{Queue, in_memory::InMemoryQueue};
/// use paladin::serializer::Serializer;
/// use anyhow::Result;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
///     let queue = InMemoryQueue::new(Serializer::Cbor);
///     let connection = queue.get_connection().await?;
///     
///     Ok(())
/// }
/// ```
pub struct InMemoryQueue {
    /// The connection to the queue.
    connection: InMemoryConnection,
}

impl InMemoryQueue {
    /// Create a new in-memory queue.
    pub fn new(serializer: Serializer) -> Self {
        Self {
            connection: InMemoryConnection::new(serializer),
        }
    }
}

#[async_trait]
impl Queue for InMemoryQueue {
    type Connection = InMemoryConnection;

    async fn get_connection(&self) -> Result<Self::Connection> {
        Ok(self.connection.clone())
    }
}

/// An in-memory implementation of [`Connection`].
///
/// This implementation maintains a stable set of queues, and is cloneable. Each
/// clone of the connection will maintain references to the same underlying
/// queues.
///
/// ```
/// use paladin::queue::{Queue, Connection, in_memory::InMemoryQueue};
/// use paladin::serializer::Serializer;
/// use anyhow::Result;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
///     let queue = InMemoryQueue::new(Serializer::Cbor);
///     let connection = queue.get_connection().await?;
///     // Declare a queue
///     let handle = connection.declare_queue("my_queue").await?;
///
///     // ...
///     // Delete a queue
///     connection.delete_queue("my_queue").await?;
///     
///     Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct InMemoryConnection {
    /// The queues managed by this connection. Queues are indexed by their name
    /// and stored in an atomically reference counted pointer to allow for
    /// multiple clones of the connection to maintain references to the same
    /// queues.
    queues: Arc<Mutex<HashMap<String, InMemoryQueueHandle>>>,
    /// The serializer to use for serializing and deserializing messages.
    serializer: Serializer,
}

impl InMemoryConnection {
    pub fn new(serializer: Serializer) -> Self {
        Self {
            queues: Default::default(),
            serializer,
        }
    }
}

#[async_trait]
impl Connection for InMemoryConnection {
    type QueueHandle = InMemoryQueueHandle;

    async fn declare_queue(&self, name: &str) -> Result<Self::QueueHandle> {
        let mut lock = self.queues.lock().await;
        match lock.get(name) {
            Some(queue) => Ok(queue.clone()),
            None => {
                let queue = InMemoryQueueHandle::new(self.serializer);
                lock.insert(name.to_string(), queue.clone());
                Ok(queue)
            }
        }
    }

    async fn delete_queue(&self, name: &str) -> Result<()> {
        let mut lock = self.queues.lock().await;
        if lock.get(name).is_some() {
            lock.remove(name);
        }
        Ok(())
    }
}

/// An in-memory implementation of [`QueueHandle`].
///
/// Cloning this handle will create a new handle that points to the same set of
/// messages and synchronization state.
///
/// ```
/// use paladin::{
///     serializer::Serializer,
///     acker::Acker,
///     queue::{Queue, Connection, QueueHandle, Consumer, in_memory::InMemoryQueue}
/// };
/// use serde::{Serialize, Deserialize};
/// use anyhow::Result;
/// use futures::StreamExt;
///
/// #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
/// struct MyStruct {
///     field: String,
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
///     let queue = InMemoryQueue::new(Serializer::Cbor);
///     let connection = queue.get_connection().await?;
///     // Declare a queue
///     let handle = connection.declare_queue("my_queue").await?;
///
///     // Publish a message
///     handle.publish(&MyStruct { field: "Hello, World!".to_string() }).await?;
///
///     // Consume the message
///     let consumer = handle.declare_consumer("my_consumer").await?;
///     if let Some((message, acker)) = consumer.stream::<MyStruct>().await?.next().await {
///         acker.ack().await?;
///         assert_eq!(message, MyStruct { field: "Hello, World!".to_string() });
///     }
///     
///     Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct InMemoryQueueHandle {
    /// The messages in the queue. They're stored as raw bytes to simulate a
    /// real queue where serialization and deserialization is required.
    messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
    /// The number of messages in the queue.
    num_messages: PollSemaphore,
    /// The serializer to use for serializing and deserializing messages.
    serializer: Serializer,
}

impl InMemoryQueueHandle {
    pub fn new(serializer: Serializer) -> Self {
        Self {
            messages: Default::default(),
            num_messages: PollSemaphore::new(Arc::new(Semaphore::new(0))),
            serializer,
        }
    }
}

#[async_trait]
impl QueueHandle for InMemoryQueueHandle {
    type Consumer = InMemoryConsumer;

    async fn publish<PayloadTarget: Serializable>(&self, payload: &PayloadTarget) -> Result<()> {
        let mut lock = self.messages.lock().await;
        lock.push_back(self.serializer.to_bytes(payload)?);
        self.num_messages.add_permits(1);
        drop(lock);

        Ok(())
    }

    async fn declare_consumer(&self, _consumer_name: &str) -> Result<Self::Consumer> {
        Ok(InMemoryConsumer {
            messages: self.messages.clone(),
            num_messages: self.num_messages.clone(),
            serializer: self.serializer,
        })
    }
}

/// An in-memory implementation of [`Consumer`].
///
/// ```
/// use paladin::{
///     serializer::Serializer,
///     acker::Acker,
///     queue::{Queue, Connection, QueueHandle, Consumer, in_memory::InMemoryQueue}
/// };
/// use serde::{Serialize, Deserialize};
/// use anyhow::Result;
/// use futures::StreamExt;
///
/// #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
/// struct MyStruct {
///     field: String,
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
///     let queue = InMemoryQueue::new(Serializer::Cbor);
///     let connection = queue.get_connection().await?;
///     // Declare a queue
///     let handle = connection.declare_queue("my_queue").await?;
///
///     // Publish a message
///     handle.publish(&MyStruct { field: "Hello, World!".to_string() }).await?;
///
///     // Consume the message
///     let consumer = handle.declare_consumer("my_consumer").await?;
///     if let Some((message, acker)) = consumer.stream::<MyStruct>().await?.next().await {
///         acker.ack().await?;
///         assert_eq!(message, MyStruct { field: "Hello, World!".to_string() });
///     }
///     
///     Ok(())
/// }
/// ```
///
/// # Design
/// The consumer holds a reference to its owning queue's messages and a
/// [`PollSemaphore`] that is used to synchronize with the queue's message
/// availability. When the consumer is polled, it will acquire a
/// permit from the semaphore, which will return pending if no messages are
/// available. This effectively simulates a message push.
/// Once a permit is acquired, the consumer will pop a message from the queue
/// and release the permit, signaling to the queue that a message has been
/// consumed.
#[derive(Clone)]
pub struct InMemoryConsumer {
    messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
    serializer: Serializer,
    num_messages: PollSemaphore,
}

/// A [`Stream`] implementation for [`InMemoryConsumer`].
///
/// # Design
/// This stream will poll the semaphore to acquire a permit, which will return
/// pending if no messages are available. Once a permit is acquired, the stream
/// will attempt to acquire a lock on the queue's messages. Once the lock is
/// acquired, the stream will pop a message from the queue and release the
/// permit, signaling to the queue that a message has been consumed.
pub struct ConsumerStream<T: Serializable> {
    _marker: std::marker::PhantomData<T>,
    messages: Arc<Mutex<VecDeque<Vec<u8>>>>,
    lock_fut: Option<(
        OwnedMutexLockFuture<VecDeque<Vec<u8>>>,
        OwnedSemaphorePermit,
    )>,
    serializer: Serializer,
    num_messages: PollSemaphore,
}

impl<T: Serializable> Stream for ConsumerStream<T> {
    type Item = (T, NoopAcker);

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.as_mut();

        // If we have a lock future, poll it
        match this.lock_fut.take() {
            Some((mut fut, permit)) => {
                match pin!(&mut fut).poll(cx) {
                    Poll::Ready(mut lock) => {
                        // We have a lock, pop the message
                        let item = lock.pop_front();
                        // Release the permit, signaling that we've consumed a message
                        permit.forget();
                        // Release the lock
                        drop(lock);
                        // Clear the lock future
                        this.lock_fut = None;

                        match item {
                            Some(item) => {
                                let item = this
                                    .serializer
                                    .from_bytes(&item)
                                    .expect("failed to deserialize");

                                Poll::Ready(Some((item, NoopAcker::new())))
                            }
                            None => {
                                // Should never happen given that permits should correspond 1:1 to
                                // messages. Error out so we can debug the logic error.
                                unreachable!("permit was acquired, but no message was available")
                            }
                        }
                    }
                    // Lock future is not ready
                    Poll::Pending => {
                        this.lock_fut = Some((fut, permit));
                        Poll::Pending
                    }
                }
            }
            // Otherwise, wait for a message
            None => {
                let permit = ready!(this.num_messages.poll_acquire(cx));
                match permit {
                    // If we have a permit, a message should be available
                    Some(permit) => {
                        // Create a lock future and poll ourselves
                        this.lock_fut = Some((this.messages.clone().lock_owned(), permit));
                        self.poll_next(cx)
                    }
                    None => Poll::Pending,
                }
            }
        }
    }
}

#[async_trait]
impl Consumer for InMemoryConsumer {
    type Acker = NoopAcker;
    type Stream<T: Serializable> = ConsumerStream<T>;

    async fn stream<T: Serializable>(self) -> Result<Self::Stream<T>> {
        Ok(ConsumerStream {
            messages: self.messages,
            serializer: self.serializer,
            num_messages: self.num_messages,
            lock_fut: None,
            _marker: std::marker::PhantomData,
        })
    }
}