atomic-log 0.2.0

A segmented, zero-copy rolling log for one writer and many readers.
Documentation
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ops::Range;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};

pub(crate) struct Segment<T> {
    pub(crate) sequence: u64,
    pub(crate) previous: Weak<Segment<T>>,
    published: AtomicUsize,
    storage: Box<[UnsafeCell<MaybeUninit<T>>]>,
}

impl<T> Segment<T> {
    pub(crate) fn new(sequence: u64, previous: Weak<Segment<T>>, capacity: usize) -> Arc<Self> {
        let mut storage = Vec::with_capacity(capacity);
        storage.resize_with(capacity, || UnsafeCell::new(MaybeUninit::uninit()));

        Arc::new(Self {
            sequence,
            previous,
            published: AtomicUsize::new(0),
            storage: storage.into_boxed_slice(),
        })
    }

    #[inline]
    pub(crate) fn published_len(&self) -> usize {
        self.published.load(Ordering::Acquire)
    }

    pub(crate) fn push(&self, value: T) {
        let index = self.published.load(Ordering::Relaxed);
        assert!(index < self.storage.len(), "segment is full");

        unsafe {
            (*self.storage[index].get()).write(value);
        }
        self.published.store(index + 1, Ordering::Release);
    }

    pub(crate) fn push_batch(&self, values: &mut impl Iterator<Item = T>) -> usize {
        let start = self.published.load(Ordering::Relaxed);
        let capacity = self.storage.len();
        let mut index = start;

        while index < capacity {
            let Some(value) = values.next() else { break };
            unsafe {
                (*self.storage[index].get()).write(value);
            }
            index += 1;
        }

        let written = index - start;
        if written > 0 {
            self.published.store(index, Ordering::Release);
        }
        written
    }

    pub(crate) fn slice(&self, range: Range<usize>) -> &[T] {
        debug_assert!(range.start <= range.end);
        debug_assert!(range.end <= self.published_len());

        unsafe {
            std::slice::from_raw_parts(
                self.storage[range.start].get().cast::<T>(),
                range.end - range.start,
            )
        }
    }
}

impl<T> Drop for Segment<T> {
    fn drop(&mut self) {
        let initialized = self.published.load(Ordering::Acquire);
        for slot in &mut self.storage[..initialized] {
            unsafe {
                ptr::drop_in_place((*slot.get()).as_mut_ptr());
            }
        }
    }
}

unsafe impl<T: Send + Sync> Send for Segment<T> {}
unsafe impl<T: Send + Sync> Sync for Segment<T> {}