commonware_storage/queue/
shared.rs1use super::{Config, Error, Queue};
10use crate::{Context, Persistable};
11use commonware_codec::CodecShared;
12use commonware_utils::{channel::mpsc, sync::AsyncMutex};
13use std::{ops::Range, sync::Arc};
14use tracing::debug;
15
16pub struct Writer<E: Context, V: CodecShared> {
21 queue: Arc<AsyncMutex<Queue<E, V>>>,
22 notify: mpsc::Sender<()>,
23}
24
25impl<E: Context, V: CodecShared> Clone for Writer<E, V> {
26 fn clone(&self) -> Self {
27 Self {
28 queue: self.queue.clone(),
29 notify: self.notify.clone(),
30 }
31 }
32}
33
34impl<E: Context, V: CodecShared> Writer<E, V> {
35 pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
42 let pos = self.queue.lock().await.enqueue(item).await?;
43
44 let _ = self.notify.try_send(());
48
49 debug!(position = pos, "writer: enqueued item");
50 Ok(pos)
51 }
52
53 pub async fn enqueue_bulk(
61 &self,
62 items: impl IntoIterator<Item = V>,
63 ) -> Result<Range<u64>, Error> {
64 let mut queue = self.queue.lock().await;
65 let start = queue.size().await;
66 for item in items {
67 queue.append(item).await?;
68 }
69 let end = queue.size().await;
70 if end > start {
71 queue.commit().await?;
72 }
73 drop(queue);
74
75 if start < end {
76 let _ = self.notify.try_send(());
77 }
78 debug!(start, end, "writer: enqueued bulk");
79 Ok(start..end)
80 }
81
82 pub async fn append(&self, item: V) -> Result<u64, Error> {
92 let pos = self.queue.lock().await.append(item).await?;
93 let _ = self.notify.try_send(());
94 debug!(position = pos, "writer: appended item");
95 Ok(pos)
96 }
97
98 pub async fn commit(&self) -> Result<(), Error> {
100 self.queue.lock().await.commit().await
101 }
102
103 pub async fn sync(&self) -> Result<(), Error> {
105 self.queue.lock().await.sync().await
106 }
107
108 pub async fn size(&self) -> u64 {
110 self.queue.lock().await.size().await
111 }
112}
113
114pub struct Reader<E: Context, V: CodecShared> {
118 queue: Arc<AsyncMutex<Queue<E, V>>>,
119 notify: mpsc::Receiver<()>,
120}
121
122impl<E: Context, V: CodecShared> Reader<E, V> {
123 pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
134 loop {
135 if let Some(item) = self.queue.lock().await.dequeue().await? {
137 return Ok(Some(item));
138 }
139
140 if self.notify.recv().await.is_none() {
143 return self.queue.lock().await.dequeue().await;
145 }
146 }
147 }
148
149 pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
157 let _ = self.notify.try_recv();
159
160 self.queue.lock().await.dequeue().await
161 }
162
163 pub async fn ack(&self, position: u64) -> Result<(), Error> {
169 self.queue.lock().await.ack(position).await
170 }
171
172 pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
178 self.queue.lock().await.ack_up_to(up_to).await
179 }
180
181 pub async fn ack_floor(&self) -> u64 {
183 self.queue.lock().await.ack_floor()
184 }
185
186 pub async fn read_position(&self) -> u64 {
188 self.queue.lock().await.read_position()
189 }
190
191 pub async fn is_empty(&self) -> bool {
193 self.queue.lock().await.is_empty().await
194 }
195
196 pub async fn reset(&self) {
198 self.queue.lock().await.reset();
199 }
200}
201
202pub async fn init<E: Context, V: CodecShared>(
227 context: E,
228 cfg: Config<V::Cfg>,
229) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
230 let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
231 let (notify_tx, notify_rx) = mpsc::channel(1);
232
233 let writer = Writer {
234 queue: queue.clone(),
235 notify: notify_tx,
236 };
237
238 let reader = Reader {
239 queue,
240 notify: notify_rx,
241 };
242
243 Ok((writer, reader))
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use commonware_codec::RangeCfg;
250 use commonware_macros::{select, test_traced};
251 use commonware_runtime::{
252 buffer::paged::CacheRef, deterministic, BufferPooler, Clock, Metrics, Runner, Spawner,
253 };
254 use commonware_utils::{NZUsize, NZU16, NZU64};
255 use std::num::{NonZeroU16, NonZeroUsize};
256
257 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
258 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
259
260 fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
261 Config {
262 partition: partition.into(),
263 items_per_section: NZU64!(10),
264 compression: None,
265 codec_config: ((0..).into(), ()),
266 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
267 write_buffer: NZUsize!(4096),
268 }
269 }
270
271 #[test_traced]
272 fn test_shared_basic() {
273 let executor = deterministic::Runner::default();
274 executor.start(|context| async move {
275 let cfg = test_config("test_shared_basic", &context);
276 let (writer, mut reader) = init(context, cfg).await.unwrap();
277
278 let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
280 assert_eq!(pos, 0);
281
282 let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
284 assert_eq!(recv_pos, 0);
285 assert_eq!(item, b"hello".to_vec());
286
287 reader.ack(recv_pos).await.unwrap();
289 assert!(reader.is_empty().await);
290 });
291 }
292
293 #[test_traced]
294 fn test_shared_append_commit() {
295 let executor = deterministic::Runner::default();
296 executor.start(|context| async move {
297 let cfg = test_config("test_shared_append_commit", &context);
298 let (writer, mut reader) = init(context, cfg).await.unwrap();
299
300 for i in 0..5u8 {
302 let pos = writer.append(vec![i]).await.unwrap();
303 assert_eq!(pos, i as u64);
304 }
305
306 let (pos, item) = reader.recv().await.unwrap().unwrap();
308 assert_eq!(pos, 0);
309 assert_eq!(item, vec![0]);
310
311 writer.commit().await.unwrap();
313
314 for i in 1..5 {
316 let (pos, item) = reader.recv().await.unwrap().unwrap();
317 assert_eq!(pos, i);
318 assert_eq!(item, vec![i as u8]);
319 reader.ack(pos).await.unwrap();
320 }
321
322 reader.ack(0).await.unwrap();
323 assert!(reader.is_empty().await);
324 });
325 }
326
327 #[test_traced]
328 fn test_shared_enqueue_bulk() {
329 let executor = deterministic::Runner::default();
330 executor.start(|context| async move {
331 let cfg = test_config("test_shared_bulk", &context);
332 let (writer, mut reader) = init(context, cfg).await.unwrap();
333
334 let range = writer
335 .enqueue_bulk((0..5u8).map(|i| vec![i]))
336 .await
337 .unwrap();
338 assert_eq!(range, 0..5);
339
340 for i in 0..5 {
341 let (pos, item) = reader.recv().await.unwrap().unwrap();
342 assert_eq!(pos, i);
343 assert_eq!(item, vec![i as u8]);
344 reader.ack(pos).await.unwrap();
345 }
346 assert!(reader.is_empty().await);
347 });
348 }
349
350 #[test_traced]
351 fn test_shared_concurrent() {
352 let executor = deterministic::Runner::default();
353 executor.start(|context| async move {
354 let cfg = test_config("test_shared_concurrent", &context);
355 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
356
357 let writer_handle = context.with_label("writer").spawn(|_ctx| async move {
359 for i in 0..10u8 {
360 writer.enqueue(vec![i]).await.unwrap();
361 }
362 writer
363 });
364
365 let mut received = Vec::new();
367 for _ in 0..10 {
368 let (pos, item) = reader.recv().await.unwrap().unwrap();
369 received.push((pos, item.clone()));
370 reader.ack(pos).await.unwrap();
371 }
372
373 for (i, (pos, item)) in received.iter().enumerate() {
375 assert_eq!(*pos, i as u64);
376 assert_eq!(*item, vec![i as u8]);
377 }
378
379 let _ = writer_handle.await.unwrap();
380 });
381 }
382
383 #[test_traced]
384 fn test_shared_select() {
385 let executor = deterministic::Runner::default();
386 executor.start(|context| async move {
387 let cfg = test_config("test_shared_select", &context);
388 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
389
390 writer.enqueue(b"test".to_vec()).await.unwrap();
392
393 let result = select! {
395 item = reader.recv() => item,
396 _ = context.sleep(std::time::Duration::from_secs(1)) => {
397 panic!("timeout")
398 },
399 };
400
401 let (pos, item) = result.unwrap().unwrap();
402 assert_eq!(pos, 0);
403 assert_eq!(item, b"test".to_vec());
404
405 reader.ack(pos).await.unwrap();
406 });
407 }
408
409 #[test_traced]
410 fn test_shared_writer_dropped() {
411 let executor = deterministic::Runner::default();
412 executor.start(|context| async move {
413 let cfg = test_config("test_shared_writer_dropped", &context);
414 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
415
416 writer.enqueue(b"item1".to_vec()).await.unwrap();
418 writer.enqueue(b"item2".to_vec()).await.unwrap();
419
420 let queue = writer.queue.clone();
422 drop(writer);
423
424 let (pos1, _) = reader.recv().await.unwrap().unwrap();
426 reader.ack(pos1).await.unwrap();
427
428 let (pos2, _) = reader.recv().await.unwrap().unwrap();
429 reader.ack(pos2).await.unwrap();
430
431 let result = reader.recv().await.unwrap();
433 assert!(result.is_none());
434
435 drop(reader);
436 let _ = Arc::try_unwrap(queue)
437 .unwrap_or_else(|_| panic!("queue should have a single reference"))
438 .into_inner();
439 });
440 }
441
442 #[test_traced]
443 fn test_shared_try_recv() {
444 let executor = deterministic::Runner::default();
445 executor.start(|context| async move {
446 let cfg = test_config("test_shared_try_recv", &context);
447 let (writer, mut reader) = init(context, cfg).await.unwrap();
448
449 let result = reader.try_recv().await.unwrap();
451 assert!(result.is_none());
452
453 writer.enqueue(b"item".to_vec()).await.unwrap();
455 let (pos, item) = reader.try_recv().await.unwrap().unwrap();
456 assert_eq!(pos, 0);
457 assert_eq!(item, b"item".to_vec());
458
459 reader.ack(pos).await.unwrap();
460 });
461 }
462
463 #[test_traced]
464 fn test_shared_multiple_writers() {
465 let executor = deterministic::Runner::default();
466 executor.start(|context| async move {
467 let cfg = test_config("test_shared_multi_writer", &context);
468 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
469
470 let writer2 = writer.clone();
472
473 let handle1 = context.with_label("writer1").spawn(|_ctx| async move {
475 for i in 0..5u8 {
476 writer.enqueue(vec![i]).await.unwrap();
477 }
478 writer
479 });
480
481 let handle2 = context.with_label("writer2").spawn(|_ctx| async move {
482 for i in 5..10u8 {
483 writer2.enqueue(vec![i]).await.unwrap();
484 }
485 });
486
487 let mut received = Vec::new();
489 for _ in 0..10 {
490 let (pos, item) = reader.recv().await.unwrap().unwrap();
491 received.push(item[0]);
492 reader.ack(pos).await.unwrap();
493 }
494
495 received.sort();
497 assert_eq!(received, (0..10u8).collect::<Vec<_>>());
498
499 let _ = handle1.await.unwrap();
500 handle2.await.unwrap();
501 });
502 }
503}