commonware_storage/queue/
shared.rs1use super::{Config, Error, Queue};
10use crate::Persistable;
11use commonware_codec::CodecShared;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::{channel::mpsc, sync::AsyncMutex};
14use std::{ops::Range, sync::Arc};
15use tracing::debug;
16
17pub struct Writer<E: Clock + Storage + Metrics, V: CodecShared> {
22 queue: Arc<AsyncMutex<Queue<E, V>>>,
23 notify: mpsc::Sender<()>,
24}
25
26impl<E: Clock + Storage + Metrics, V: CodecShared> Clone for Writer<E, V> {
27 fn clone(&self) -> Self {
28 Self {
29 queue: self.queue.clone(),
30 notify: self.notify.clone(),
31 }
32 }
33}
34
35impl<E: Clock + Storage + Metrics, V: CodecShared> Writer<E, V> {
36 pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
43 let pos = self.queue.lock().await.enqueue(item).await?;
44
45 let _ = self.notify.try_send(());
49
50 debug!(position = pos, "writer: enqueued item");
51 Ok(pos)
52 }
53
54 pub async fn enqueue_bulk(
62 &self,
63 items: impl IntoIterator<Item = V>,
64 ) -> Result<Range<u64>, Error> {
65 let mut queue = self.queue.lock().await;
66 let start = queue.size().await;
67 for item in items {
68 queue.append(item).await?;
69 }
70 let end = queue.size().await;
71 if end > start {
72 queue.commit().await?;
73 }
74 drop(queue);
75
76 if start < end {
77 let _ = self.notify.try_send(());
78 }
79 debug!(start, end, "writer: enqueued bulk");
80 Ok(start..end)
81 }
82
83 pub async fn append(&self, item: V) -> Result<u64, Error> {
93 let pos = self.queue.lock().await.append(item).await?;
94 let _ = self.notify.try_send(());
95 debug!(position = pos, "writer: appended item");
96 Ok(pos)
97 }
98
99 pub async fn commit(&self) -> Result<(), Error> {
101 self.queue.lock().await.commit().await
102 }
103
104 pub async fn sync(&self) -> Result<(), Error> {
106 self.queue.lock().await.sync().await
107 }
108
109 pub async fn size(&self) -> u64 {
111 self.queue.lock().await.size().await
112 }
113}
114
115pub struct Reader<E: Clock + Storage + Metrics, V: CodecShared> {
119 queue: Arc<AsyncMutex<Queue<E, V>>>,
120 notify: mpsc::Receiver<()>,
121}
122
123impl<E: Clock + Storage + Metrics, V: CodecShared> Reader<E, V> {
124 pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
135 loop {
136 if let Some(item) = self.queue.lock().await.dequeue().await? {
138 return Ok(Some(item));
139 }
140
141 if self.notify.recv().await.is_none() {
144 return self.queue.lock().await.dequeue().await;
146 }
147 }
148 }
149
150 pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
158 let _ = self.notify.try_recv();
160
161 self.queue.lock().await.dequeue().await
162 }
163
164 pub async fn ack(&self, position: u64) -> Result<(), Error> {
170 self.queue.lock().await.ack(position).await
171 }
172
173 pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
179 self.queue.lock().await.ack_up_to(up_to).await
180 }
181
182 pub async fn ack_floor(&self) -> u64 {
184 self.queue.lock().await.ack_floor()
185 }
186
187 pub async fn read_position(&self) -> u64 {
189 self.queue.lock().await.read_position()
190 }
191
192 pub async fn is_empty(&self) -> bool {
194 self.queue.lock().await.is_empty().await
195 }
196
197 pub async fn reset(&self) {
199 self.queue.lock().await.reset();
200 }
201}
202
203pub async fn init<E: Clock + Storage + Metrics, V: CodecShared>(
228 context: E,
229 cfg: Config<V::Cfg>,
230) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
231 let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
232 let (notify_tx, notify_rx) = mpsc::channel(1);
233
234 let writer = Writer {
235 queue: queue.clone(),
236 notify: notify_tx,
237 };
238
239 let reader = Reader {
240 queue,
241 notify: notify_rx,
242 };
243
244 Ok((writer, reader))
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use commonware_codec::RangeCfg;
251 use commonware_macros::{select, test_traced};
252 use commonware_runtime::{
253 buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Spawner,
254 };
255 use commonware_utils::{NZUsize, NZU16, NZU64};
256 use std::num::{NonZeroU16, NonZeroUsize};
257
258 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
259 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
260
261 fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
262 Config {
263 partition: partition.into(),
264 items_per_section: NZU64!(10),
265 compression: None,
266 codec_config: ((0..).into(), ()),
267 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
268 write_buffer: NZUsize!(4096),
269 }
270 }
271
272 #[test_traced]
273 fn test_shared_basic() {
274 let executor = deterministic::Runner::default();
275 executor.start(|context| async move {
276 let cfg = test_config("test_shared_basic", &context);
277 let (writer, mut reader) = init(context, cfg).await.unwrap();
278
279 let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
281 assert_eq!(pos, 0);
282
283 let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
285 assert_eq!(recv_pos, 0);
286 assert_eq!(item, b"hello".to_vec());
287
288 reader.ack(recv_pos).await.unwrap();
290 assert!(reader.is_empty().await);
291 });
292 }
293
294 #[test_traced]
295 fn test_shared_append_commit() {
296 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let cfg = test_config("test_shared_append_commit", &context);
299 let (writer, mut reader) = init(context, cfg).await.unwrap();
300
301 for i in 0..5u8 {
303 let pos = writer.append(vec![i]).await.unwrap();
304 assert_eq!(pos, i as u64);
305 }
306
307 let (pos, item) = reader.recv().await.unwrap().unwrap();
309 assert_eq!(pos, 0);
310 assert_eq!(item, vec![0]);
311
312 writer.commit().await.unwrap();
314
315 for i in 1..5 {
317 let (pos, item) = reader.recv().await.unwrap().unwrap();
318 assert_eq!(pos, i);
319 assert_eq!(item, vec![i as u8]);
320 reader.ack(pos).await.unwrap();
321 }
322
323 reader.ack(0).await.unwrap();
324 assert!(reader.is_empty().await);
325 });
326 }
327
328 #[test_traced]
329 fn test_shared_enqueue_bulk() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let cfg = test_config("test_shared_bulk", &context);
333 let (writer, mut reader) = init(context, cfg).await.unwrap();
334
335 let range = writer
336 .enqueue_bulk((0..5u8).map(|i| vec![i]))
337 .await
338 .unwrap();
339 assert_eq!(range, 0..5);
340
341 for i in 0..5 {
342 let (pos, item) = reader.recv().await.unwrap().unwrap();
343 assert_eq!(pos, i);
344 assert_eq!(item, vec![i as u8]);
345 reader.ack(pos).await.unwrap();
346 }
347 assert!(reader.is_empty().await);
348 });
349 }
350
351 #[test_traced]
352 fn test_shared_concurrent() {
353 let executor = deterministic::Runner::default();
354 executor.start(|context| async move {
355 let cfg = test_config("test_shared_concurrent", &context);
356 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
357
358 let writer_handle = context.with_label("writer").spawn(|_ctx| async move {
360 for i in 0..10u8 {
361 writer.enqueue(vec![i]).await.unwrap();
362 }
363 writer
364 });
365
366 let mut received = Vec::new();
368 for _ in 0..10 {
369 let (pos, item) = reader.recv().await.unwrap().unwrap();
370 received.push((pos, item.clone()));
371 reader.ack(pos).await.unwrap();
372 }
373
374 for (i, (pos, item)) in received.iter().enumerate() {
376 assert_eq!(*pos, i as u64);
377 assert_eq!(*item, vec![i as u8]);
378 }
379
380 let _ = writer_handle.await.unwrap();
381 });
382 }
383
384 #[test_traced]
385 fn test_shared_select() {
386 let executor = deterministic::Runner::default();
387 executor.start(|context| async move {
388 let cfg = test_config("test_shared_select", &context);
389 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
390
391 writer.enqueue(b"test".to_vec()).await.unwrap();
393
394 let result = select! {
396 item = reader.recv() => item,
397 _ = context.sleep(std::time::Duration::from_secs(1)) => {
398 panic!("timeout")
399 },
400 };
401
402 let (pos, item) = result.unwrap().unwrap();
403 assert_eq!(pos, 0);
404 assert_eq!(item, b"test".to_vec());
405
406 reader.ack(pos).await.unwrap();
407 });
408 }
409
410 #[test_traced]
411 fn test_shared_writer_dropped() {
412 let executor = deterministic::Runner::default();
413 executor.start(|context| async move {
414 let cfg = test_config("test_shared_writer_dropped", &context);
415 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
416
417 writer.enqueue(b"item1".to_vec()).await.unwrap();
419 writer.enqueue(b"item2".to_vec()).await.unwrap();
420
421 let queue = writer.queue.clone();
423 drop(writer);
424
425 let (pos1, _) = reader.recv().await.unwrap().unwrap();
427 reader.ack(pos1).await.unwrap();
428
429 let (pos2, _) = reader.recv().await.unwrap().unwrap();
430 reader.ack(pos2).await.unwrap();
431
432 let result = reader.recv().await.unwrap();
434 assert!(result.is_none());
435
436 drop(reader);
437 let _ = Arc::try_unwrap(queue)
438 .unwrap_or_else(|_| panic!("queue should have a single reference"))
439 .into_inner();
440 });
441 }
442
443 #[test_traced]
444 fn test_shared_try_recv() {
445 let executor = deterministic::Runner::default();
446 executor.start(|context| async move {
447 let cfg = test_config("test_shared_try_recv", &context);
448 let (writer, mut reader) = init(context, cfg).await.unwrap();
449
450 let result = reader.try_recv().await.unwrap();
452 assert!(result.is_none());
453
454 writer.enqueue(b"item".to_vec()).await.unwrap();
456 let (pos, item) = reader.try_recv().await.unwrap().unwrap();
457 assert_eq!(pos, 0);
458 assert_eq!(item, b"item".to_vec());
459
460 reader.ack(pos).await.unwrap();
461 });
462 }
463
464 #[test_traced]
465 fn test_shared_multiple_writers() {
466 let executor = deterministic::Runner::default();
467 executor.start(|context| async move {
468 let cfg = test_config("test_shared_multi_writer", &context);
469 let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
470
471 let writer2 = writer.clone();
473
474 let handle1 = context.with_label("writer1").spawn(|_ctx| async move {
476 for i in 0..5u8 {
477 writer.enqueue(vec![i]).await.unwrap();
478 }
479 writer
480 });
481
482 let handle2 = context.with_label("writer2").spawn(|_ctx| async move {
483 for i in 5..10u8 {
484 writer2.enqueue(vec![i]).await.unwrap();
485 }
486 });
487
488 let mut received = Vec::new();
490 for _ in 0..10 {
491 let (pos, item) = reader.recv().await.unwrap().unwrap();
492 received.push(item[0]);
493 reader.ack(pos).await.unwrap();
494 }
495
496 received.sort();
498 assert_eq!(received, (0..10u8).collect::<Vec<_>>());
499
500 let _ = handle1.await.unwrap();
501 handle2.await.unwrap();
502 });
503 }
504}