commonware_storage/queue/
shared.rs1use super::{Config, Error, Queue};
10use crate::{Context, Persistable};
11use commonware_codec::CodecShared;
12use commonware_utils::{channel::mpsc, sync::AsyncMutex};
13use std::{ops::Range, sync::Arc};
14use tracing::debug;
15
16pub struct Writer<E: Context, V: CodecShared> {
21 queue: Arc<AsyncMutex<Queue<E, V>>>,
22 notify: mpsc::Sender<()>,
23}
24
25impl<E: Context, V: CodecShared> Clone for Writer<E, V> {
26 fn clone(&self) -> Self {
27 Self {
28 queue: self.queue.clone(),
29 notify: self.notify.clone(),
30 }
31 }
32}
33
34impl<E: Context, V: CodecShared> Writer<E, V> {
35 pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
42 let pos = self.queue.lock().await.enqueue(item).await?;
43
44 let _ = self.notify.try_send(());
48
49 debug!(position = pos, "writer: enqueued item");
50 Ok(pos)
51 }
52
53 pub async fn enqueue_bulk(
61 &self,
62 items: impl IntoIterator<Item = V>,
63 ) -> Result<Range<u64>, Error> {
64 let mut queue = self.queue.lock().await;
65 let start = queue.size().await;
66 for item in items {
67 queue.append(item).await?;
68 }
69 let end = queue.size().await;
70 if end > start {
71 queue.commit().await?;
72 }
73 drop(queue);
74
75 if start < end {
76 let _ = self.notify.try_send(());
77 }
78 debug!(start, end, "writer: enqueued bulk");
79 Ok(start..end)
80 }
81
82 pub async fn append(&self, item: V) -> Result<u64, Error> {
92 let pos = self.queue.lock().await.append(item).await?;
93 let _ = self.notify.try_send(());
94 debug!(position = pos, "writer: appended item");
95 Ok(pos)
96 }
97
98 pub async fn commit(&self) -> Result<(), Error> {
100 self.queue.lock().await.commit().await
101 }
102
103 pub async fn sync(&self) -> Result<(), Error> {
105 self.queue.lock().await.sync().await
106 }
107
108 pub async fn size(&self) -> u64 {
110 self.queue.lock().await.size().await
111 }
112}
113
114pub struct Reader<E: Context, V: CodecShared> {
118 queue: Arc<AsyncMutex<Queue<E, V>>>,
119 notify: mpsc::Receiver<()>,
120}
121
122impl<E: Context, V: CodecShared> Reader<E, V> {
123 pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
134 loop {
135 if let Some(item) = self.queue.lock().await.dequeue().await? {
137 return Ok(Some(item));
138 }
139
140 if self.notify.recv().await.is_none() {
143 return self.queue.lock().await.dequeue().await;
145 }
146 }
147 }
148
149 pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
157 let _ = self.notify.try_recv();
159
160 self.queue.lock().await.dequeue().await
161 }
162
163 pub async fn ack(&self, position: u64) -> Result<(), Error> {
169 self.queue.lock().await.ack(position).await
170 }
171
172 pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
178 self.queue.lock().await.ack_up_to(up_to).await
179 }
180
181 pub async fn ack_floor(&self) -> u64 {
183 self.queue.lock().await.ack_floor()
184 }
185
186 pub async fn read_position(&self) -> u64 {
188 self.queue.lock().await.read_position()
189 }
190
191 pub async fn is_empty(&self) -> bool {
193 self.queue.lock().await.is_empty().await
194 }
195
196 pub async fn reset(&self) {
198 self.queue.lock().await.reset();
199 }
200}
201
202pub async fn init<E: Context, V: CodecShared>(
227 context: E,
228 cfg: Config<V::Cfg>,
229) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
230 let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
231 let (notify_tx, notify_rx) = mpsc::channel(1);
232
233 let writer = Writer {
234 queue: queue.clone(),
235 notify: notify_tx,
236 };
237
238 let reader = Reader {
239 queue,
240 notify: notify_rx,
241 };
242
243 Ok((writer, reader))
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use commonware_codec::RangeCfg;
250 use commonware_macros::{select, test_traced};
251 use commonware_runtime::{
252 buffer::paged::CacheRef, deterministic, BufferPooler, Clock, Runner, Spawner,
253 Supervisor as _,
254 };
255 use commonware_utils::{NZUsize, NZU16, NZU64};
256 use std::num::{NonZeroU16, NonZeroUsize};
257
258 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
259 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
260
261 fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
262 Config {
263 partition: partition.into(),
264 items_per_section: NZU64!(10),
265 compression: None,
266 codec_config: ((0..).into(), ()),
267 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
268 write_buffer: NZUsize!(4096),
269 }
270 }
271
272 #[test_traced]
273 fn test_shared_basic() {
274 let executor = deterministic::Runner::default();
275 executor.start(|context| async move {
276 let cfg = test_config("test_shared_basic", &context);
277 let (writer, mut reader) = init(context, cfg).await.unwrap();
278
279 let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
281 assert_eq!(pos, 0);
282
283 let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
285 assert_eq!(recv_pos, 0);
286 assert_eq!(item, b"hello".to_vec());
287
288 reader.ack(recv_pos).await.unwrap();
290 assert!(reader.is_empty().await);
291 });
292 }
293
294 #[test_traced]
295 fn test_shared_append_commit() {
296 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let cfg = test_config("test_shared_append_commit", &context);
299 let (writer, mut reader) = init(context, cfg).await.unwrap();
300
301 for i in 0..5u8 {
303 let pos = writer.append(vec![i]).await.unwrap();
304 assert_eq!(pos, i as u64);
305 }
306
307 let (pos, item) = reader.recv().await.unwrap().unwrap();
309 assert_eq!(pos, 0);
310 assert_eq!(item, vec![0]);
311
312 writer.commit().await.unwrap();
314
315 for i in 1..5 {
317 let (pos, item) = reader.recv().await.unwrap().unwrap();
318 assert_eq!(pos, i);
319 assert_eq!(item, vec![i as u8]);
320 reader.ack(pos).await.unwrap();
321 }
322
323 reader.ack(0).await.unwrap();
324 assert!(reader.is_empty().await);
325 });
326 }
327
328 #[test_traced]
329 fn test_shared_enqueue_bulk() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let cfg = test_config("test_shared_bulk", &context);
333 let (writer, mut reader) = init(context, cfg).await.unwrap();
334
335 let range = writer
336 .enqueue_bulk((0..5u8).map(|i| vec![i]))
337 .await
338 .unwrap();
339 assert_eq!(range, 0..5);
340
341 for i in 0..5 {
342 let (pos, item) = reader.recv().await.unwrap().unwrap();
343 assert_eq!(pos, i);
344 assert_eq!(item, vec![i as u8]);
345 reader.ack(pos).await.unwrap();
346 }
347 assert!(reader.is_empty().await);
348 });
349 }
350
351 #[test_traced]
352 fn test_shared_concurrent() {
353 let executor = deterministic::Runner::default();
354 executor.start(|context| async move {
355 let cfg = test_config("test_shared_concurrent", &context);
356 let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
357
358 let writer_handle = context.child("writer").spawn(|_ctx| async move {
360 for i in 0..10u8 {
361 writer.enqueue(vec![i]).await.unwrap();
362 }
363 writer
364 });
365
366 let mut received = Vec::new();
368 for _ in 0..10 {
369 let (pos, item) = reader.recv().await.unwrap().unwrap();
370 received.push((pos, item.clone()));
371 reader.ack(pos).await.unwrap();
372 }
373
374 for (i, (pos, item)) in received.iter().enumerate() {
376 assert_eq!(*pos, i as u64);
377 assert_eq!(*item, vec![i as u8]);
378 }
379
380 let _ = writer_handle.await.unwrap();
381 });
382 }
383
384 #[test_traced]
385 fn test_shared_select() {
386 let executor = deterministic::Runner::default();
387 executor.start(|context| async move {
388 let cfg = test_config("test_shared_select", &context);
389 let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
390
391 writer.enqueue(b"test".to_vec()).await.unwrap();
393
394 let result = select! {
396 item = reader.recv() => item,
397 _ = context.sleep(std::time::Duration::from_secs(1)) => {
398 panic!("timeout")
399 },
400 };
401
402 let (pos, item) = result.unwrap().unwrap();
403 assert_eq!(pos, 0);
404 assert_eq!(item, b"test".to_vec());
405
406 reader.ack(pos).await.unwrap();
407 });
408 }
409
410 #[test_traced]
411 fn test_shared_writer_dropped() {
412 let executor = deterministic::Runner::default();
413 executor.start(|context| async move {
414 let cfg = test_config("test_shared_writer_dropped", &context);
415 let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
416
417 writer.enqueue(b"item1".to_vec()).await.unwrap();
419 writer.enqueue(b"item2".to_vec()).await.unwrap();
420
421 let queue = writer.queue.clone();
423 drop(writer);
424
425 let (pos1, _) = reader.recv().await.unwrap().unwrap();
427 reader.ack(pos1).await.unwrap();
428
429 let (pos2, _) = reader.recv().await.unwrap().unwrap();
430 reader.ack(pos2).await.unwrap();
431
432 let result = reader.recv().await.unwrap();
434 assert!(result.is_none());
435
436 drop(reader);
437 let _ = Arc::try_unwrap(queue)
438 .unwrap_or_else(|_| panic!("queue should have a single reference"))
439 .into_inner();
440 });
441 }
442
443 #[test_traced]
444 fn test_shared_try_recv() {
445 let executor = deterministic::Runner::default();
446 executor.start(|context| async move {
447 let cfg = test_config("test_shared_try_recv", &context);
448 let (writer, mut reader) = init(context, cfg).await.unwrap();
449
450 let result = reader.try_recv().await.unwrap();
452 assert!(result.is_none());
453
454 writer.enqueue(b"item".to_vec()).await.unwrap();
456 let (pos, item) = reader.try_recv().await.unwrap().unwrap();
457 assert_eq!(pos, 0);
458 assert_eq!(item, b"item".to_vec());
459
460 reader.ack(pos).await.unwrap();
461 });
462 }
463
464 #[test_traced]
465 fn test_shared_multiple_writers() {
466 let executor = deterministic::Runner::default();
467 executor.start(|context| async move {
468 let cfg = test_config("test_shared_multi_writer", &context);
469 let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
470
471 let writer2 = writer.clone();
473
474 let handle1 =
476 context
477 .child("writer")
478 .with_attribute("index", 1)
479 .spawn(|_ctx| async move {
480 for i in 0..5u8 {
481 writer.enqueue(vec![i]).await.unwrap();
482 }
483 writer
484 });
485
486 let handle2 =
487 context
488 .child("writer")
489 .with_attribute("index", 2)
490 .spawn(|_ctx| async move {
491 for i in 5..10u8 {
492 writer2.enqueue(vec![i]).await.unwrap();
493 }
494 });
495
496 let mut received = Vec::new();
498 for _ in 0..10 {
499 let (pos, item) = reader.recv().await.unwrap().unwrap();
500 received.push(item[0]);
501 reader.ack(pos).await.unwrap();
502 }
503
504 received.sort();
506 assert_eq!(received, (0..10u8).collect::<Vec<_>>());
507
508 let _ = handle1.await.unwrap();
509 handle2.await.unwrap();
510 });
511 }
512}