once_array/lib.rs
1//! A single-producer multiple-consumer append-only fixed capacity array.
2//!
3//! Creating a `OnceArrayWriter<T>` allocates a fixed-capacity buffer and
4//! represents exclusive access to append elements. Any number of
5//! `Arc<OnceArray<T>>` references can be created and shared across threads.
6//! These readers can access the slice of committed elements, and see new
7//! elements as they are committed by the writer without any locking.
8//!
9//! `OnceArray` serves as a building block for streaming data to multiple
10//! consumers while amortizing the cost of allocation and synchronization across
11//! chunks of many elements.
12//!
13//! # Example:
14//!
15//! ```rust
16//! use once_array::{OnceArrayWriter, OnceArray};
17//! let mut writer = OnceArrayWriter::with_capacity(1024);
18//!
19//! // Clone the reader to share it across threads.
20//! let reader1 = writer.reader().clone();
21//! let reader2 = writer.reader().clone();
22//!
23//! // Append some data to the writer.
24//! writer.try_push(42).unwrap();
25//! writer.try_push(43).unwrap();
26//!
27//! // Commit the new elements to make them visible to readers.
28//! writer.commit();
29//!
30//! assert_eq!(reader1.as_slice(), &[42, 43]);
31//! assert_eq!(reader2.as_slice(), &[42, 43]);
32//! ```
33
34#![no_std]
35
36extern crate alloc;
37
38use alloc::{sync::Arc, vec::Vec};
39use core::mem::ManuallyDrop;
40use core::ops::Deref;
41use core::sync::atomic::{AtomicUsize, Ordering};
42use core::{ptr, slice};
43
44/// The reader side of a single-producer multiple-consumer append-only fixed capacity array.
45///
46/// A `OnceArray` is normally behind `Arc` and constructed by creating a
47/// [`OnceArrayWriter`] and then cloning its `.reader()`.
48///
49/// An owned `OnceArray<T>` is semantically identical to a `Vec<T>` but without methods to mutate it.
50/// They can be inter-converted with `From` and `Into`, which may be useful in cases like:
51/// * Constructing a `OnceArray` from a `Vec` populated upfront, to pass to an API that requires `OnceArray`.
52/// * Unwrapping the underlying `Vec` after after claiming ownership with [`Arc::into_inner`] or [`Arc::try_unwrap`].
53pub struct OnceArray<T> {
54 // safety invariants:
55 // * `data` and `cap` may not change
56 // * `len` may never decrease
57 // * `len` is always less than or equal to `cap`
58 // * the first `len` elements of `data` are initialized
59 // * nothing may write to or invalidate `*data..*data.add(len)`, because
60 // another thread may have a reference to it
61 data: *mut T,
62 len: AtomicUsize,
63 cap: usize,
64}
65
66unsafe impl<T> Send for OnceArray<T> where T: Send {}
67unsafe impl<T> Sync for OnceArray<T> where T: Sync {}
68
69impl<T> Drop for OnceArray<T> {
70 fn drop(&mut self) {
71 unsafe {
72 // SAFETY:
73 // * We have exclusive access guaranteed by &mut.
74 // * `self.data` and `self.cap` came from a Vec,
75 // so can be turned back into a Vec.
76 // * `self.len` elements are properly initialized
77 drop(Vec::from_raw_parts(
78 self.data,
79 *self.len.get_mut(),
80 self.cap,
81 ))
82 }
83 }
84}
85
86impl<T> OnceArray<T> {
87 fn from_vec(v: Vec<T>) -> Self {
88 let mut v = ManuallyDrop::new(v);
89 OnceArray {
90 data: v.as_mut_ptr(),
91 cap: v.capacity(),
92 len: AtomicUsize::new(v.len()),
93 }
94 }
95
96 fn into_vec(self) -> Vec<T> {
97 unsafe {
98 // SAFETY:
99 // * We have exclusive access guaranteed by self.
100 // * `self.data` and `self.cap` came from a Vec,
101 // so can be turned back into a Vec.
102 // * `self.len` elements are properly initialized
103 let mut v = ManuallyDrop::new(self);
104 Vec::from_raw_parts(v.data, *v.len.get_mut(), v.cap)
105 }
106 }
107
108 /// Returns the maximum number of elements this buffer can hold.
109 ///
110 /// The capacity can't change once allocated.
111 pub fn capacity(&self) -> usize {
112 self.cap
113 }
114
115 /// Returns the current number of elements in the buffer.
116 ///
117 /// This increases when the [`OnceArrayWriter`] commits new elements, but
118 /// can never decrease.
119 pub fn len(&self) -> usize {
120 self.len.load(Ordering::Acquire)
121 }
122
123 /// Returns `true` if the buffer contains no elements.
124 pub fn is_empty(&self) -> bool {
125 self.len() == 0
126 }
127
128 /// Returns `true` if the buffer is at full capacity.
129 pub fn is_full(&self) -> bool {
130 self.len() == self.cap
131 }
132
133 /// Obtain a slice of the committed part of the buffer.
134 pub fn as_slice(&self) -> &[T] {
135 unsafe {
136 // SAFETY: This came from a vector and is properly aligned.
137 // The part up to len is initialized, and won't change
138 slice::from_raw_parts(self.data, self.len())
139 }
140 }
141}
142
143impl<T> Deref for OnceArray<T> {
144 type Target = [T];
145
146 fn deref(&self) -> &Self::Target {
147 self.as_slice()
148 }
149}
150
151impl<T> AsRef<[T]> for OnceArray<T> {
152 fn as_ref(&self) -> &[T] {
153 self.as_slice()
154 }
155}
156
157impl<T> core::borrow::Borrow<[T]> for OnceArray<T> {
158 fn borrow(&self) -> &[T] {
159 self.as_slice()
160 }
161}
162
163impl<T> From<Vec<T>> for OnceArray<T> {
164 fn from(val: Vec<T>) -> Self {
165 OnceArray::from_vec(val)
166 }
167}
168
169impl<T> From<OnceArray<T>> for Vec<T> {
170 fn from(v: OnceArray<T>) -> Self {
171 v.into_vec()
172 }
173}
174
175/// Exclusive write access to a [`OnceArray`].
176///
177/// The `OnceArrayWriter` provides methods to append elements to the uncommitted
178/// portion of the array. The uncommitted portion is not visible to readers and
179/// can be [mutated](OnceArrayWriter::uncommitted_mut) or
180/// [discarded](OnceArrayWriter::revert) because the writer retains exclusive access.
181///
182/// Once the writer is ready to make new elements visible to readers, it can
183/// call [`commit()`](OnceArrayWriter::commit) or
184/// [`commit_partial(n)`](OnceArrayWriter::commit_partial) to make elements
185/// immutable and atomically visible to readers. As long as there is
186/// remaining capacity, the writer can continue to append and commit more elements.
187///
188/// The API is optimized for scenarios where data is written to a series of new
189/// `OnceArrayWriter` chunks as they fill, so the append APIs return a `Result`
190/// handing back the unconsumed data when full, so the caller can easily continue
191/// filling the next chunk.
192pub struct OnceArrayWriter<T> {
193 // safety invariants:
194 // * This is the only `OnceArrayWriter` that wraps `inner`.
195 // * `uncommitted_len` is greater than or equal to `inner.len`, and less than or equal to `inner.cap`.
196 // * `uncommitted_len` elements have been initialized.
197 inner: Arc<OnceArray<T>>,
198 uncommitted_len: usize,
199}
200
201impl<T> OnceArrayWriter<T> {
202 fn from_vec(v: Vec<T>) -> OnceArrayWriter<T> {
203 Self {
204 uncommitted_len: v.len(),
205 inner: Arc::new(OnceArray::from_vec(v)),
206 }
207 }
208
209 /// Creates a new `OnceArrayWriter` with the specified capacity.
210 pub fn with_capacity(n: usize) -> OnceArrayWriter<T> {
211 Self::from_vec(Vec::with_capacity(n))
212 }
213
214 /// Obtain a read-only reference to the committed part of the array.
215 pub fn reader(&self) -> &Arc<OnceArray<T>> {
216 &self.inner
217 }
218
219 /// Returns the number of additional elements that can be written to the buffer before it is full.
220 pub fn remaining_capacity(&self) -> usize {
221 self.inner.cap - self.uncommitted_len
222 }
223
224 /// Obtain an immutable slice of the entire array, including committed and uncommitted parts.
225 pub fn as_slice(&self) -> &[T] {
226 unsafe {
227 // SAFETY:
228 // * the array has been initialized up to uncommitted_len
229 slice::from_raw_parts(self.inner.data, self.uncommitted_len)
230 }
231 }
232
233 /// Obtain a mutable slice of the uncommitted part of the array.
234 pub fn uncommitted_mut(&mut self) -> &mut [T] {
235 // SAFETY:
236 // * this is above the committed len, so these elements are not shared.
237 // * this is below the uncommitted_len, so these elements have been initialized.
238 unsafe {
239 let committed_len = self.inner.len.load(Ordering::Relaxed);
240 slice::from_raw_parts_mut(
241 self.inner.data.add(committed_len),
242 self.uncommitted_len - committed_len,
243 )
244 }
245 }
246
247 unsafe fn push_unchecked(&mut self, val: T) {
248 // SAFETY:
249 // * caller must ensure that uncommitted_len is less than capacity
250 // * uncommitted_len is greater than or equal to inner.len, so this doesn't invalidate shared slices
251 // * this has &mut exclusive access to the only `OnceArrayWriter`
252 // wrapping `inner`, so no other thread is writing.
253 unsafe {
254 self.inner.data.add(self.uncommitted_len).write(val);
255 self.uncommitted_len += 1;
256 }
257 }
258
259 /// Attempts to append an element to the buffer.
260 ///
261 /// If the buffer is full, returns `Err(val)`, returning ownership of the value
262 /// that could not be added.
263 ///
264 /// The new element is not visible to readers until a call to `commit()`.
265 pub fn try_push(&mut self, val: T) -> Result<(), T> {
266 if self.uncommitted_len < self.inner.cap {
267 // SAFETY: checked that uncommitted_len is less than capacity
268 unsafe {
269 self.push_unchecked(val);
270 }
271 Ok(())
272 } else {
273 Err(val)
274 }
275 }
276
277 /// Attempts to append elements from `iter` to the buffer.
278 ///
279 /// If the buffer becomes full before `iter` is exhausted, returns
280 /// `Err(iter)`, returning ownership of the iterator.
281 ///
282 /// Note that if the iterator exactly fills the remaining capacity, this
283 /// will return `Err` with an empty iterator, since the `Iterator` trait
284 /// does not allow checking if an iterator is exhausted without calling
285 /// `next()`.
286 ///
287 /// The new elements are not visible to readers until a call to `commit()`.
288 pub fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) -> Result<(), I::IntoIter> {
289 let mut iter = iter.into_iter();
290 while self.uncommitted_len < self.inner.cap {
291 if let Some(val) = iter.next() {
292 // SAFETY: checked that uncommitted_len is less than capacity
293 unsafe {
294 self.push_unchecked(val);
295 }
296 } else {
297 return Ok(());
298 }
299 }
300 Err(iter)
301 }
302
303 /// Attempts to append elements from `src` to the array.
304 ///
305 /// Returns the tail of the slice that could not be written to the buffer.
306 /// If the buffer is not filled and all elements were written, this will be
307 /// an empty slice.
308 ///
309 /// The new elements are not visible to readers until a call to `commit()`.
310 pub fn extend_from_slice<'a>(&mut self, src: &'a [T]) -> &'a [T]
311 where
312 T: Copy,
313 {
314 let count = self.remaining_capacity().min(src.len());
315 unsafe {
316 // SAFETY:
317 // * checked that position is less than capacity so
318 // address is in bounds.
319 // * this is above the current len so doesn't invalidate slices
320 // * this has &mut exclusive access to the only `OnceArrayWriter`
321 // wrapping `inner`, so no other thread is writing.
322 self.inner
323 .data
324 .add(self.uncommitted_len)
325 .copy_from_nonoverlapping(src.as_ptr(), count);
326 }
327
328 self.uncommitted_len += count;
329 &src[count..]
330 }
331
332 /// Makes newly written elements immutable and atomically visible to readers.
333 pub fn commit(&mut self) {
334 self.inner
335 .len
336 .store(self.uncommitted_len, Ordering::Release);
337 }
338
339 /// Makes the first `n` newly written elements immutable and atomically visible to readers.
340 ///
341 /// **Panics** if `n` is greater than the number of initialized but uncommitted elements.
342 pub fn commit_partial(&mut self, n: usize) {
343 let committed_len = self.inner.len.load(Ordering::Relaxed);
344 assert!(
345 n <= self.uncommitted_len - committed_len,
346 "Cannot commit more elements than have been initialized"
347 );
348 self.inner.len.store(committed_len + n, Ordering::Release);
349 }
350
351 /// Discards any uncommitted elements, reverting the buffer to the last committed state.
352 pub fn revert(&mut self) {
353 let committed_len = self.inner.len.load(Ordering::Relaxed);
354 let uncommitted_len = self.uncommitted_len;
355
356 // truncate first, in case dropping an element panics
357 self.uncommitted_len = committed_len;
358
359 // SAFETY:
360 // These elements have been initialized and are not shared.
361 unsafe {
362 ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
363 self.inner.data.add(committed_len),
364 uncommitted_len - committed_len,
365 ));
366 }
367 }
368}
369
370impl<T> Drop for OnceArrayWriter<T> {
371 fn drop(&mut self) {
372 self.revert();
373 }
374}
375
376impl<T> From<Vec<T>> for OnceArrayWriter<T> {
377 fn from(vec: Vec<T>) -> OnceArrayWriter<T> {
378 OnceArrayWriter::from_vec(vec)
379 }
380}
381
382#[test]
383fn test_to_from_vec() {
384 let v = OnceArray::from(alloc::vec![1, 2, 3]);
385 assert_eq!(v.as_slice(), &[1, 2, 3]);
386 let v = Vec::from(v);
387 assert_eq!(v.as_slice(), &[1, 2, 3]);
388}
389
390#[test]
391fn test_push() {
392 let mut writer = OnceArrayWriter::with_capacity(4);
393 let reader = writer.reader().clone();
394 assert_eq!(reader.capacity(), 4);
395 assert_eq!(reader.len(), 0);
396
397 assert_eq!(writer.try_push(1), Ok(()));
398 assert_eq!(reader.len(), 0);
399 writer.commit();
400 assert_eq!(reader.len(), 1);
401 assert_eq!(reader.as_slice(), &[1]);
402
403 assert_eq!(writer.try_push(2), Ok(()));
404 assert_eq!(writer.try_push(3), Ok(()));
405 assert_eq!(writer.try_push(4), Ok(()));
406 assert_eq!(writer.try_push(5), Err(5));
407 writer.commit();
408
409 assert_eq!(reader.len(), 4);
410 assert_eq!(reader.as_slice(), &[1, 2, 3, 4]);
411}
412
413#[test]
414fn test_extend_from_slice() {
415 let mut writer = OnceArrayWriter::with_capacity(4);
416 let reader = writer.reader().clone();
417 assert_eq!(reader.capacity(), 4);
418 assert_eq!(reader.len(), 0);
419
420 assert_eq!(writer.extend_from_slice(&[1, 2]), &[]);
421 assert_eq!(reader.len(), 0);
422 writer.commit();
423 assert_eq!(reader.len(), 2);
424 assert_eq!(reader.as_slice(), &[1, 2]);
425
426 assert_eq!(writer.extend_from_slice(&[3, 4, 5, 6]), &[5, 6]);
427 writer.commit();
428 assert_eq!(reader.len(), 4);
429 assert_eq!(reader.as_slice(), &[1, 2, 3, 4]);
430}
431
432#[test]
433fn test_commit_revert() {
434 let mut writer = OnceArrayWriter::with_capacity(4);
435 let reader = writer.reader().clone();
436
437 assert_eq!(writer.try_push(1), Ok(()));
438 assert_eq!(writer.try_push(2), Ok(()));
439 assert_eq!(writer.as_slice(), &[1, 2]);
440 assert_eq!(writer.uncommitted_mut(), &mut [1, 2]);
441 writer.commit();
442 assert_eq!(reader.as_slice(), &[1, 2]);
443 assert_eq!(writer.uncommitted_mut(), &mut []);
444
445 assert_eq!(writer.try_push(3), Ok(()));
446 assert_eq!(writer.try_push(4), Ok(()));
447
448 writer.revert();
449 assert_eq!(reader.as_slice(), &[1, 2]);
450 assert_eq!(writer.uncommitted_mut(), &mut []);
451
452 assert_eq!(writer.try_push(5), Ok(()));
453 assert_eq!(writer.try_push(6), Ok(()));
454 assert_eq!(writer.as_slice(), &[1, 2, 5, 6]);
455
456 writer.commit_partial(1);
457 assert_eq!(reader.as_slice(), &[1, 2, 5]);
458 assert_eq!(writer.uncommitted_mut(), &[6]);
459
460 drop(writer);
461 assert_eq!(reader.as_slice(), &[1, 2, 5]);
462}
463
464#[test]
465#[should_panic(expected = "Cannot commit more elements than have been initialized")]
466fn test_commit_partial_panic() {
467 let mut writer = OnceArrayWriter::with_capacity(4);
468 assert_eq!(writer.try_push(1), Ok(()));
469 writer.commit_partial(2);
470}
471
472#[test]
473fn test_extend() {
474 let mut writer = OnceArrayWriter::with_capacity(4);
475 let reader = writer.reader().clone();
476
477 assert!(writer.extend([1, 2, 3]).is_ok());
478 assert_eq!(writer.as_slice(), &[1, 2, 3]);
479 writer.commit();
480 assert_eq!(reader.as_slice(), &[1, 2, 3]);
481
482 let mut remainder = writer.extend([4, 5]).unwrap_err();
483 assert_eq!(writer.as_slice(), &[1, 2, 3, 4]);
484 assert_eq!(remainder.next(), Some(5));
485}
486
487#[test]
488fn test_drop() {
489 struct DropCounter<'a>(&'a AtomicUsize);
490
491 impl<'a> Drop for DropCounter<'a> {
492 fn drop(&mut self) {
493 self.0.fetch_add(1, Ordering::Relaxed);
494 }
495 }
496
497 let drop_count = &AtomicUsize::new(0);
498
499 let mut writer = OnceArrayWriter::with_capacity(4);
500 let reader = writer.reader().clone();
501
502 assert!(writer.try_push(DropCounter(drop_count)).is_ok());
503 assert!(writer.try_push(DropCounter(drop_count)).is_ok());
504 writer.commit();
505
506 assert!(writer.try_push(DropCounter(drop_count)).is_ok());
507 writer.revert();
508 assert_eq!(drop_count.load(Ordering::Relaxed), 1);
509
510 // this one won't be committed, so should be dropped when the writer is dropped
511 assert!(writer.try_push(DropCounter(drop_count)).is_ok());
512 drop(writer);
513 assert_eq!(drop_count.load(Ordering::Relaxed), 2);
514
515 drop(reader);
516 assert_eq!(drop_count.load(Ordering::Relaxed), 4);
517}
518
519#[test]
520fn test_concurrent_read() {
521 extern crate std;
522 use std::thread;
523
524 let mut writer = OnceArrayWriter::<usize>::with_capacity(1024);
525 let reader = writer.reader().clone();
526
527 let handle = thread::spawn(move || {
528 while reader.len() < 1024 {
529 let slice = reader.as_slice();
530 // every committed element should equal its index
531 for (i, &v) in slice.iter().enumerate() {
532 assert_eq!(v, i);
533 }
534 }
535 });
536
537 for i in 0..1024 {
538 writer.try_push(i).unwrap();
539 writer.commit();
540 }
541 handle.join().unwrap();
542}