Skip to main content

once_array/
lib.rs

1//! A single-producer multiple-consumer append-only fixed capacity array.
2//!
3//! Creating a `OnceArrayWriter<T>` allocates a fixed-capacity buffer and
4//! represents exclusive access to append elements. Any number of
5//! `Arc<OnceArray<T>>` references can be created and shared across threads.
6//! These readers can access the slice of committed elements, and see new
7//! elements as they are committed by the writer without any locking.
8//!
9//! `OnceArray` serves as a building block for streaming data to multiple
10//! consumers while amortizing the cost of allocation and synchronization across
11//! chunks of many elements.
12//!
13//! # Example:
14//!
15//! ```rust
16//! use once_array::{OnceArrayWriter, OnceArray};
17//! let mut writer = OnceArrayWriter::with_capacity(1024);
18//!
19//! // Clone the reader to share it across threads.
20//! let reader1 = writer.reader().clone();
21//! let reader2 = writer.reader().clone();
22//!
23//! // Append some data to the writer.
24//! writer.try_push(42).unwrap();
25//! writer.try_push(43).unwrap();
26//!
27//! // Commit the new elements to make them visible to readers.
28//! writer.commit();
29//!
30//! assert_eq!(reader1.as_slice(), &[42, 43]);
31//! assert_eq!(reader2.as_slice(), &[42, 43]);
32//! ```
33
34#![no_std]
35
36extern crate alloc;
37
38use alloc::{sync::Arc, vec::Vec};
39use core::mem::ManuallyDrop;
40use core::ops::Deref;
41use core::sync::atomic::{AtomicUsize, Ordering};
42use core::{ptr, slice};
43
44/// The reader side of a single-producer multiple-consumer append-only fixed capacity array.
45///
46/// A `OnceArray` is normally behind `Arc` and constructed by creating a
47/// [`OnceArrayWriter`] and then cloning its `.reader()`.
48///
49/// An owned `OnceArray<T>` is semantically identical to a `Vec<T>` but without methods to mutate it.
50/// They can be inter-converted with `From` and `Into`, which may be useful in cases like:
51///   * Constructing a `OnceArray` from a `Vec` populated upfront, to pass to an API that requires `OnceArray`.
52///   * Unwrapping the underlying `Vec` after after claiming ownership with [`Arc::into_inner`] or [`Arc::try_unwrap`].
53pub struct OnceArray<T> {
54    // safety invariants:
55    // * `data` and `cap` may not change
56    // * `len` may never decrease
57    // * `len` is always less than or equal to `cap`
58    // * the first `len` elements of `data` are initialized
59    // * nothing may write to or invalidate `*data..*data.add(len)`, because
60    //   another thread may have a reference to it
61    data: *mut T,
62    len: AtomicUsize,
63    cap: usize,
64}
65
66unsafe impl<T> Send for OnceArray<T> where T: Send {}
67unsafe impl<T> Sync for OnceArray<T> where T: Sync {}
68
69impl<T> Drop for OnceArray<T> {
70    fn drop(&mut self) {
71        unsafe {
72            // SAFETY:
73            // * We have exclusive access guaranteed by &mut.
74            // * `self.data` and `self.cap` came from a Vec,
75            //    so can be turned back into a Vec.
76            // * `self.len` elements are properly initialized
77            drop(Vec::from_raw_parts(
78                self.data,
79                *self.len.get_mut(),
80                self.cap,
81            ))
82        }
83    }
84}
85
86impl<T> OnceArray<T> {
87    fn from_vec(v: Vec<T>) -> Self {
88        let mut v = ManuallyDrop::new(v);
89        OnceArray {
90            data: v.as_mut_ptr(),
91            cap: v.capacity(),
92            len: AtomicUsize::new(v.len()),
93        }
94    }
95
96    fn into_vec(self) -> Vec<T> {
97        unsafe {
98            // SAFETY:
99            // * We have exclusive access guaranteed by self.
100            // * `self.data` and `self.cap` came from a Vec,
101            //    so can be turned back into a Vec.
102            // * `self.len` elements are properly initialized
103            let mut v = ManuallyDrop::new(self);
104            Vec::from_raw_parts(v.data, *v.len.get_mut(), v.cap)
105        }
106    }
107
108    /// Returns the maximum number of elements this buffer can hold.
109    ///
110    /// The capacity can't change once allocated.
111    pub fn capacity(&self) -> usize {
112        self.cap
113    }
114
115    /// Returns the current number of elements in the buffer.
116    ///
117    /// This increases when the [`OnceArrayWriter`] commits new elements, but
118    /// can never decrease.
119    pub fn len(&self) -> usize {
120        self.len.load(Ordering::Acquire)
121    }
122
123    /// Returns `true` if the buffer contains no elements.
124    pub fn is_empty(&self) -> bool {
125        self.len() == 0
126    }
127
128    /// Returns `true` if the buffer is at full capacity.
129    pub fn is_full(&self) -> bool {
130        self.len() == self.cap
131    }
132
133    /// Obtain a slice of the committed part of the buffer.
134    pub fn as_slice(&self) -> &[T] {
135        unsafe {
136            // SAFETY: This came from a vector and is properly aligned.
137            // The part up to len is initialized, and won't change
138            slice::from_raw_parts(self.data, self.len())
139        }
140    }
141}
142
143impl<T> Deref for OnceArray<T> {
144    type Target = [T];
145
146    fn deref(&self) -> &Self::Target {
147        self.as_slice()
148    }
149}
150
151impl<T> AsRef<[T]> for OnceArray<T> {
152    fn as_ref(&self) -> &[T] {
153        self.as_slice()
154    }
155}
156
157impl<T> core::borrow::Borrow<[T]> for OnceArray<T> {
158    fn borrow(&self) -> &[T] {
159        self.as_slice()
160    }
161}
162
163impl<T> From<Vec<T>> for OnceArray<T> {
164    fn from(val: Vec<T>) -> Self {
165        OnceArray::from_vec(val)
166    }
167}
168
169impl<T> From<OnceArray<T>> for Vec<T> {
170    fn from(v: OnceArray<T>) -> Self {
171        v.into_vec()
172    }
173}
174
175/// Exclusive write access to a [`OnceArray`].
176///
177/// The `OnceArrayWriter` provides methods to append elements to the uncommitted
178/// portion of the array. The uncommitted portion is not visible to readers and
179/// can be [mutated](OnceArrayWriter::uncommitted_mut) or
180/// [discarded](OnceArrayWriter::revert) because the writer retains exclusive access.
181///
182/// Once the writer is ready to make new elements visible to readers, it can
183/// call [`commit()`](OnceArrayWriter::commit) or
184/// [`commit_partial(n)`](OnceArrayWriter::commit_partial) to make elements
185/// immutable and atomically visible to readers. As long as there is
186/// remaining capacity, the writer can continue to append and commit more elements.
187///
188/// The API is optimized for scenarios where data is written to a series of new
189/// `OnceArrayWriter` chunks as they fill, so the append APIs return a `Result`
190/// handing back the unconsumed data when full, so the caller can easily continue
191/// filling the next chunk.
192pub struct OnceArrayWriter<T> {
193    // safety invariants:
194    // * This is the only `OnceArrayWriter` that wraps `inner`.
195    // * `uncommitted_len` is greater than or equal to `inner.len`, and less than or equal to `inner.cap`.
196    // * `uncommitted_len` elements have been initialized.
197    inner: Arc<OnceArray<T>>,
198    uncommitted_len: usize,
199}
200
201impl<T> OnceArrayWriter<T> {
202    fn from_vec(v: Vec<T>) -> OnceArrayWriter<T> {
203        Self {
204            uncommitted_len: v.len(),
205            inner: Arc::new(OnceArray::from_vec(v)),
206        }
207    }
208
209    /// Creates a new `OnceArrayWriter` with the specified capacity.
210    pub fn with_capacity(n: usize) -> OnceArrayWriter<T> {
211        Self::from_vec(Vec::with_capacity(n))
212    }
213
214    /// Obtain a read-only reference to the committed part of the array.
215    pub fn reader(&self) -> &Arc<OnceArray<T>> {
216        &self.inner
217    }
218
219    /// Returns the number of additional elements that can be written to the buffer before it is full.
220    pub fn remaining_capacity(&self) -> usize {
221        self.inner.cap - self.uncommitted_len
222    }
223
224    /// Obtain an immutable slice of the entire array, including committed and uncommitted parts.
225    pub fn as_slice(&self) -> &[T] {
226        unsafe {
227            // SAFETY:
228            // * the array has been initialized up to uncommitted_len
229            slice::from_raw_parts(self.inner.data, self.uncommitted_len)
230        }
231    }
232
233    /// Obtain a mutable slice of the uncommitted part of the array.
234    pub fn uncommitted_mut(&mut self) -> &mut [T] {
235        // SAFETY:
236        // * this is above the committed len, so these elements are not shared.
237        // * this is below the uncommitted_len, so these elements have been initialized.
238        unsafe {
239            let committed_len = self.inner.len.load(Ordering::Relaxed);
240            slice::from_raw_parts_mut(
241                self.inner.data.add(committed_len),
242                self.uncommitted_len - committed_len,
243            )
244        }
245    }
246
247    unsafe fn push_unchecked(&mut self, val: T) {
248        // SAFETY:
249        // * caller must ensure that uncommitted_len is less than capacity
250        // * uncommitted_len is greater than or equal to inner.len, so this doesn't invalidate shared slices
251        // * this has &mut exclusive access to the only `OnceArrayWriter`
252        //   wrapping `inner`, so no other thread is writing.
253        unsafe {
254            self.inner.data.add(self.uncommitted_len).write(val);
255            self.uncommitted_len += 1;
256        }
257    }
258
259    /// Attempts to append an element to the buffer.
260    ///
261    /// If the buffer is full, returns `Err(val)`, returning ownership of the value
262    /// that could not be added.
263    ///
264    /// The new element is not visible to readers until a call to `commit()`.
265    pub fn try_push(&mut self, val: T) -> Result<(), T> {
266        if self.uncommitted_len < self.inner.cap {
267            // SAFETY: checked that uncommitted_len is less than capacity
268            unsafe {
269                self.push_unchecked(val);
270            }
271            Ok(())
272        } else {
273            Err(val)
274        }
275    }
276
277    /// Attempts to append elements from `iter` to the buffer.
278    ///
279    /// If the buffer becomes full before `iter` is exhausted, returns
280    /// `Err(iter)`, returning ownership of the iterator.
281    ///
282    /// Note that if the iterator exactly fills the remaining capacity, this
283    /// will return `Err` with an empty iterator, since the `Iterator` trait
284    /// does not allow checking if an iterator is exhausted without calling
285    /// `next()`.
286    ///
287    /// The new elements are not visible to readers until a call to `commit()`.
288    pub fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) -> Result<(), I::IntoIter> {
289        let mut iter = iter.into_iter();
290        while self.uncommitted_len < self.inner.cap {
291            if let Some(val) = iter.next() {
292                // SAFETY: checked that uncommitted_len is less than capacity
293                unsafe {
294                    self.push_unchecked(val);
295                }
296            } else {
297                return Ok(());
298            }
299        }
300        Err(iter)
301    }
302
303    /// Attempts to append elements from `src` to the array.
304    ///
305    /// Returns the tail of the slice that could not be written to the buffer.
306    /// If the buffer is not filled and all elements were written, this will be
307    /// an empty slice.
308    ///
309    /// The new elements are not visible to readers until a call to `commit()`.
310    pub fn extend_from_slice<'a>(&mut self, src: &'a [T]) -> &'a [T]
311    where
312        T: Copy,
313    {
314        let count = self.remaining_capacity().min(src.len());
315        unsafe {
316            // SAFETY:
317            // * checked that position is less than capacity so
318            //   address is in bounds.
319            // * this is above the current len so doesn't invalidate slices
320            // * this has &mut exclusive access to the only `OnceArrayWriter`
321            //   wrapping `inner`, so no other thread is writing.
322            self.inner
323                .data
324                .add(self.uncommitted_len)
325                .copy_from_nonoverlapping(src.as_ptr(), count);
326        }
327
328        self.uncommitted_len += count;
329        &src[count..]
330    }
331
332    /// Makes newly written elements immutable and atomically visible to readers.
333    pub fn commit(&mut self) {
334        self.inner
335            .len
336            .store(self.uncommitted_len, Ordering::Release);
337    }
338
339    /// Makes the first `n` newly written elements immutable and atomically visible to readers.
340    ///
341    /// **Panics** if `n` is greater than the number of initialized but uncommitted elements.
342    pub fn commit_partial(&mut self, n: usize) {
343        let committed_len = self.inner.len.load(Ordering::Relaxed);
344        assert!(
345            n <= self.uncommitted_len - committed_len,
346            "Cannot commit more elements than have been initialized"
347        );
348        self.inner.len.store(committed_len + n, Ordering::Release);
349    }
350
351    /// Discards any uncommitted elements, reverting the buffer to the last committed state.
352    pub fn revert(&mut self) {
353        let committed_len = self.inner.len.load(Ordering::Relaxed);
354        let uncommitted_len = self.uncommitted_len;
355
356        // truncate first, in case dropping an element panics
357        self.uncommitted_len = committed_len;
358
359        // SAFETY:
360        // These elements have been initialized and are not shared.
361        unsafe {
362            ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
363                self.inner.data.add(committed_len),
364                uncommitted_len - committed_len,
365            ));
366        }
367    }
368}
369
370impl<T> Drop for OnceArrayWriter<T> {
371    fn drop(&mut self) {
372        self.revert();
373    }
374}
375
376impl<T> From<Vec<T>> for OnceArrayWriter<T> {
377    fn from(vec: Vec<T>) -> OnceArrayWriter<T> {
378        OnceArrayWriter::from_vec(vec)
379    }
380}
381
382#[test]
383fn test_to_from_vec() {
384    let v = OnceArray::from(alloc::vec![1, 2, 3]);
385    assert_eq!(v.as_slice(), &[1, 2, 3]);
386    let v = Vec::from(v);
387    assert_eq!(v.as_slice(), &[1, 2, 3]);
388}
389
390#[test]
391fn test_push() {
392    let mut writer = OnceArrayWriter::with_capacity(4);
393    let reader = writer.reader().clone();
394    assert_eq!(reader.capacity(), 4);
395    assert_eq!(reader.len(), 0);
396
397    assert_eq!(writer.try_push(1), Ok(()));
398    assert_eq!(reader.len(), 0);
399    writer.commit();
400    assert_eq!(reader.len(), 1);
401    assert_eq!(reader.as_slice(), &[1]);
402
403    assert_eq!(writer.try_push(2), Ok(()));
404    assert_eq!(writer.try_push(3), Ok(()));
405    assert_eq!(writer.try_push(4), Ok(()));
406    assert_eq!(writer.try_push(5), Err(5));
407    writer.commit();
408
409    assert_eq!(reader.len(), 4);
410    assert_eq!(reader.as_slice(), &[1, 2, 3, 4]);
411}
412
413#[test]
414fn test_extend_from_slice() {
415    let mut writer = OnceArrayWriter::with_capacity(4);
416    let reader = writer.reader().clone();
417    assert_eq!(reader.capacity(), 4);
418    assert_eq!(reader.len(), 0);
419
420    assert_eq!(writer.extend_from_slice(&[1, 2]), &[]);
421    assert_eq!(reader.len(), 0);
422    writer.commit();
423    assert_eq!(reader.len(), 2);
424    assert_eq!(reader.as_slice(), &[1, 2]);
425
426    assert_eq!(writer.extend_from_slice(&[3, 4, 5, 6]), &[5, 6]);
427    writer.commit();
428    assert_eq!(reader.len(), 4);
429    assert_eq!(reader.as_slice(), &[1, 2, 3, 4]);
430}
431
432#[test]
433fn test_commit_revert() {
434    let mut writer = OnceArrayWriter::with_capacity(4);
435    let reader = writer.reader().clone();
436
437    assert_eq!(writer.try_push(1), Ok(()));
438    assert_eq!(writer.try_push(2), Ok(()));
439    assert_eq!(writer.as_slice(), &[1, 2]);
440    assert_eq!(writer.uncommitted_mut(), &mut [1, 2]);
441    writer.commit();
442    assert_eq!(reader.as_slice(), &[1, 2]);
443    assert_eq!(writer.uncommitted_mut(), &mut []);
444
445    assert_eq!(writer.try_push(3), Ok(()));
446    assert_eq!(writer.try_push(4), Ok(()));
447
448    writer.revert();
449    assert_eq!(reader.as_slice(), &[1, 2]);
450    assert_eq!(writer.uncommitted_mut(), &mut []);
451
452    assert_eq!(writer.try_push(5), Ok(()));
453    assert_eq!(writer.try_push(6), Ok(()));
454    assert_eq!(writer.as_slice(), &[1, 2, 5, 6]);
455
456    writer.commit_partial(1);
457    assert_eq!(reader.as_slice(), &[1, 2, 5]);
458    assert_eq!(writer.uncommitted_mut(), &[6]);
459
460    drop(writer);
461    assert_eq!(reader.as_slice(), &[1, 2, 5]);
462}
463
464#[test]
465#[should_panic(expected = "Cannot commit more elements than have been initialized")]
466fn test_commit_partial_panic() {
467    let mut writer = OnceArrayWriter::with_capacity(4);
468    assert_eq!(writer.try_push(1), Ok(()));
469    writer.commit_partial(2);
470}
471
472#[test]
473fn test_extend() {
474    let mut writer = OnceArrayWriter::with_capacity(4);
475    let reader = writer.reader().clone();
476
477    assert!(writer.extend([1, 2, 3]).is_ok());
478    assert_eq!(writer.as_slice(), &[1, 2, 3]);
479    writer.commit();
480    assert_eq!(reader.as_slice(), &[1, 2, 3]);
481
482    let mut remainder = writer.extend([4, 5]).unwrap_err();
483    assert_eq!(writer.as_slice(), &[1, 2, 3, 4]);
484    assert_eq!(remainder.next(), Some(5));
485}
486
487#[test]
488fn test_drop() {
489    struct DropCounter<'a>(&'a AtomicUsize);
490
491    impl<'a> Drop for DropCounter<'a> {
492        fn drop(&mut self) {
493            self.0.fetch_add(1, Ordering::Relaxed);
494        }
495    }
496
497    let drop_count = &AtomicUsize::new(0);
498
499    let mut writer = OnceArrayWriter::with_capacity(4);
500    let reader = writer.reader().clone();
501
502    assert!(writer.try_push(DropCounter(drop_count)).is_ok());
503    assert!(writer.try_push(DropCounter(drop_count)).is_ok());
504    writer.commit();
505
506    assert!(writer.try_push(DropCounter(drop_count)).is_ok());
507    writer.revert();
508    assert_eq!(drop_count.load(Ordering::Relaxed), 1);
509
510    // this one won't be committed, so should be dropped when the writer is dropped
511    assert!(writer.try_push(DropCounter(drop_count)).is_ok());
512    drop(writer);
513    assert_eq!(drop_count.load(Ordering::Relaxed), 2);
514
515    drop(reader);
516    assert_eq!(drop_count.load(Ordering::Relaxed), 4);
517}
518
519#[test]
520fn test_concurrent_read() {
521    extern crate std;
522    use std::thread;
523
524    let mut writer = OnceArrayWriter::<usize>::with_capacity(1024);
525    let reader = writer.reader().clone();
526
527    let handle = thread::spawn(move || {
528        while reader.len() < 1024 {
529            let slice = reader.as_slice();
530            // every committed element should equal its index
531            for (i, &v) in slice.iter().enumerate() {
532                assert_eq!(v, i);
533            }
534        }
535    });
536
537    for i in 0..1024 {
538        writer.try_push(i).unwrap();
539        writer.commit();
540    }
541    handle.join().unwrap();
542}