scroll-ring 0.1.3

An MPSC overwriting ring buffer tuned for character data scrollback
Documentation
//! Currently unused sketches of how an implementation that's better than the current one in
//! minimal could work

#![allow(unused)] // to stay sane in the editor

use portable_atomic::{AtomicBool, AtomicI16, AtomicU32, AtomicUsize, Ordering};
use core::cell::UnsafeCell;

// Laying low on generics for getting started...
/// Type that can index into buf. Its underlying numeric type must be Into<AtomicLogicalIndex>.
type AtomicBufIndex = AtomicI16;
type BufIndex = i16;
/// Type that can index into the larger logical address space underpinning the ring buffer.
type AtomicLogicalIndex = AtomicU32;
type LogicalIndex = u32;

const BUFSIZE: usize = 1024;

/// A  wide atomic bit field
///
/// The metods for setting and clearing bits do not return their old values -- this is because the
/// use case of the ring buffer doesn't need them (that bit's value is always known, we're only
/// using atomics to conserve the neighboring bits).
#[derive(Default)]
struct AtomicBitfield {
    words: [AtomicUsize; BUFSIZE / usize::BITS as usize],
}

impl AtomicBitfield {
    const BITCOUNT: usize = usize::BITS.ilog2() as _;
    const MASK: usize = !(!0 << Self::BITCOUNT);

    const fn new() -> Self {
        const NULL: AtomicUsize = AtomicUsize::new(0);
        Self {
            words: [NULL; BUFSIZE / usize::BITS as usize],
        }
    }

    fn set(&self, index: usize, ordering: Ordering) {
        let offset = index >> Self::BITCOUNT;
        let shifted = 1 << (index & Self::MASK);
        self.words[offset].fetch_or(shifted, ordering);
    }

    fn set_range(&self, start: usize, end: usize, ordering: Ordering) {
        // FIXME this is the worst possible implementation
        for i in start..end {
            self.set(i, ordering);
        }
    }

    fn clear(&self, index: usize, ordering: Ordering) {
        let offset = index >> Self::BITCOUNT;
        let shifted = 1 << (index & Self::MASK);
        self.words[offset].fetch_and(!shifted, ordering);
    }

    fn clear_range(&self, start: usize, end: usize, ordering: Ordering) {
        // FIXME this is the worst possible implementation
        for i in start..end {
            self.clear(i, ordering);
        }
    }

    fn get(&self, index: usize, ordering: Ordering) -> bool {
        let offset = index >> Self::BITCOUNT;
        let shifted = 1 << (index & Self::MASK);
        self.words[offset].load(ordering) & shifted != 0
    }

    fn get_first_set(&self, start: usize, ordering: Ordering) -> Option<usize> {
        // FIXME this is the worst possible implementation
        for i in start..BUFSIZE {
            if self.get(i, ordering) {
                return Some(i);
            }
        }
        None
    }
}

#[test]
fn test_atomic_bitfield() {
    let bf: AtomicBitfield = Default::default();

    struct BitGuardedUnsafeCell(UnsafeCell<bool>);
    unsafe impl Send for BitGuardedUnsafeCell {}
    unsafe impl Sync for BitGuardedUnsafeCell {}
    let value = BitGuardedUnsafeCell(UnsafeCell::new(false));
    let threads_value = &value;

    std::thread::scope(|s| {
        bf.set(1, Ordering::Relaxed);
        bf.set(32, Ordering::Relaxed);
        bf.set(63, Ordering::Relaxed);
        for i in 0..BUFSIZE {
            assert!(bf.get(i, Ordering::Relaxed) == (i == 1 || i == 32 || i == 63), "Bit {} not as expected", i);
        }

        bf.set(128, Ordering::Relaxed);
        // To see how well this works, put either of the later orderings to Relaxed and watch
        // `cargo +nightly miri test` fail
        s.spawn(|| {
            unsafe { *threads_value.0.get() = true; }
            bf.clear(128, Ordering::Release);
        });
        while bf.get(128, Ordering::Acquire) {}
        unsafe { *threads_value.0.get() };
    });
}

/// A ring buffer that is
/// * lock-free,
/// * MPSC (with multithreaded producers),
/// * generally overwriting, unless the reader (actually peaker) currently locks, in which case the
///   number of the lost bytes is preserved, or
/// * keeps its cursor positions in a large number space.
///
/// The overwriting behavior means that there is no cursor-advancing "read", just a "peek"
/// operation.
///
/// As we don't store arbitrarily many boundaries at which overflow happened, any overflow also
/// invalidates data that was written (but not available yet when the read started, or the read was
/// shorter than the available data), effectively restarting the ring buffer while just retaining
/// the corrected offset. This is an implementation detail, though: If someone puts in the work,
/// the ring buffer might manage multiple contiguous slices of readable data separated by
/// inaccessible (lost) gaps, until the writes overtake the old slices.
///
/// To allow simultaneous writes (even in the presence of threads of altering priorities, which
/// means we can't rely on the first of a nested set of writes finishing last after any nested
/// one), this keeps a bitmap of "valid" bytes. These bits also replace quite a few other
/// boundaries.
///
/// # Use with non-bytewise data
///
/// Right now, this is only targeting bytewise (probably UTF-8 encoded) data -- these can be
/// resumed from any point, and seeing gaps is a nice extra to avoid confusion.
///
/// To use this for buffering CBOR or defmt, it would probably help to over-invalidate on the read
/// end when an overflow happens, such that reads always start at a recoverable position. This
/// could be implemented either by advancing along the format's state machine, or by having a
/// trivial (say, 8bitlength-data) encapsulation format on which a similar advancement can happen.
pub struct Buffer {
    /// The actual ring buffer
    buf: UnsafeCell<[u8; BUFSIZE]>,
    /// Bit-wise validity of the single bytes
    valid: AtomicBitfield,
    /// Index of the next byte to be written
    cursor: AtomicBufIndex,
    /// Offset of the buffer start position into an infinite 
    buf_start_pos: AtomicLogicalIndex,
    /// First byte locked by the current reader, or -1 if no reader is active
    read_start: AtomicBufIndex,
}

pub struct Writer<'b>(&'b Buffer);
// Unsafety constraint: There can only exist one instance.
pub struct Reader<'b>(&'b Buffer);

impl Buffer {
    pub const fn new() -> Self {
        Buffer {
            buf: UnsafeCell::new([0; BUFSIZE]),
            valid: AtomicBitfield::new(),
            cursor: AtomicBufIndex::new(0),
            buf_start_pos: AtomicLogicalIndex::new(0),
            read_start: AtomicBufIndex::new(-1),
        }
    }

    pub fn split(&mut self) -> (Writer<'_>, Reader<'_>) {
        (Writer(&*self), Reader(&*self))
    }
}

impl<'b> Reader<'b> {
    pub fn create_reader(&self, start: BufIndex) {
        self.0.read_start.store(start, Ordering::Relaxed);
        // how do we know that we're through the buffer? do all this in full-length address space
        // maybe?
        //
        // at any rate, the idea is to store read start and then again check whether this is
        // compliant with the current cursor. if it's not, we bail. otherwise, our read start is
        // locked, because it was already set at a time when the cursor was good.
    }

    pub fn read_earliest(&self, buf: &mut [u8]) -> ReadEarliestResult {
        let attempted_read_start = self.0.cursor.load(Ordering::Relaxed);
        self.create_reader(attempted_read_start);
        todo!()
    }

    pub fn read_from(&self, buf: &mut [u8], start: LogicalIndex) -> usize {
        todo!()
    }
}

struct ReadEarliestResult {
    bytes_read: usize,
    slice_start: LogicalIndex,
}

impl<'b> Writer<'b> {
    /// Advance the write cursor, returning the start of buffer indices into which now may be
    /// written, or an error when hitting a read block.
    fn advance_write_cursor(&self, delta: usize) -> Result<usize, ()> {
//         let read_start = self.0.read_start.load(Ordering::Relaxed);
//         let probably_my_start_position = self.0.cursor.load(Ordering::Relaxed);
//         if read_start < 0 || read_start outside of probably_my_start_position .. +delte {
//             // we go ahead; and if between now and when we start writing to atomics a reader comes
//             // along, it'll need to bail in its double-check
//             todo!();
//         } else {
//             Err(())
//         }
        todo!()
    }

    fn write(&self, data: &[u8]) {
        todo!()
    }
}