scroll_ring/
minimal.rs

1use core::cmp::min;
2use core::num::Wrapping;
3use portable_atomic::{AtomicU32, Ordering};
4use ringbuf::Rb;
5
6use crate::util::write_from_slices;
7
8/// A very simple implementation of the infinilog buffer.
9///
10/// See module level documentation on its general properties.
11#[derive(Default)]
12pub struct Buffer<const N: usize> {
13    ring: try_lock::TryLock<CountedRing<N>>,
14    /// Bytes lost due to parallel operations
15    lost: AtomicU32,
16}
17
18#[derive(Default)]
19struct CountedRing<const N: usize> {
20    buf: ringbuf::ring_buffer::LocalRb<u8, [core::mem::MaybeUninit<u8>; N]>,
21    total_written: Wrapping<u32>,
22}
23
24/// Error type for read_earliest, indicating that not only could no data be read, but not even a cursor
25/// position could be returned.
26#[derive(Debug, PartialEq)]
27pub struct BufferUnavailable;
28
29/// Error type for read.
30///
31/// Note that "that is the latest cursor position, nobody wrote anything more" is not an error but
32/// expressed as a 0-length result.
33#[derive(Debug, PartialEq)]
34pub enum ReadErr {
35    /// No information about the buffer is available at this time
36    BufferUnavailable,
37    /// The indicated cursor position has either not been reached yet, or the data has been
38    /// overwritten already
39    DataUnavailable,
40}
41
42impl<const N: usize> Buffer<N> {
43    /// Read the earliest available data into outbuf. On success, returns the cursor position
44    /// corresponding to the start of outbuf, and the number of bytes available (which is the
45    /// number of bytes copied into outbuf unless it exceeds its size).
46    pub fn read_earliest(
47        &self,
48        outbuf: &mut [u8],
49    ) -> Result<(Wrapping<u32>, usize), BufferUnavailable> {
50        let Some(ring) = self.ring.try_lock() else {
51            return Err(BufferUnavailable);
52        };
53
54        let (first, second) = ring.buf.as_slices();
55        // Write as much data from first and second into outbuf
56        write_from_slices(outbuf, first, second);
57
58        Ok((
59            ring.total_written - Wrapping(ring.buf.len() as _),
60            ring.buf.len(),
61        ))
62    }
63
64    pub fn read_from_cursor(
65        &self,
66        cursor: Wrapping<u32>,
67        outbuf: &mut [u8],
68    ) -> Result<usize, ReadErr> {
69        let Some(ring) = self.ring.try_lock() else {
70            return Err(ReadErr::BufferUnavailable);
71        };
72
73        let (mut first, mut second) = ring.buf.as_slices();
74        let cursor_in_firstsecond = cursor - ring.total_written + Wrapping(ring.buf.len() as _);
75        // With this coordinate transformation, we don't have to distinguish "before" and "after"
76        // (which we can't do anyway, effectively) -- it's just "we have it" or "we don't have it"
77        // any more.
78        let mut cursor_in_firstsecond = cursor_in_firstsecond.0 as usize;
79        if cursor_in_firstsecond > first.len() + second.len() {
80            return Err(ReadErr::DataUnavailable);
81        }
82
83        let remove_from_first = min(first.len(), cursor_in_firstsecond);
84        cursor_in_firstsecond -= remove_from_first;
85        first = &first[remove_from_first..];
86        let remove_from_second = min(second.len(), cursor_in_firstsecond);
87        cursor_in_firstsecond -= remove_from_second;
88        second = &second[remove_from_second..];
89        debug_assert!(cursor_in_firstsecond == 0);
90
91        let len_from_cursor = first.len() + second.len();
92        write_from_slices(outbuf, first, second);
93
94        Ok(len_from_cursor)
95    }
96
97    pub fn write(&self, data: &[u8]) {
98        if let Some(mut ring) = self.ring.try_lock() {
99            let lost_before_us = self.lost.swap(0, Ordering::Relaxed);
100
101            if lost_before_us != 0 {
102                ring.total_written += Wrapping(lost_before_us);
103                ring.buf.clear();
104            }
105
106            ring.buf.push_slice_overwrite(data);
107            ring.total_written += Wrapping(data.len() as _);
108        } else {
109            /* as _: deliberately truncating -- our logical address space only is that big */
110            self.lost.fetch_add(data.len() as _, Ordering::Relaxed);
111        }
112    }
113}
114
115#[test]
116fn test_buffer_readwrite() {
117    const N: usize = 1024;
118
119    let b: Buffer<N> = Default::default();
120    let mut outbuf = [0; 4];
121    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 0)));
122    assert_eq!(b.read_from_cursor(Wrapping(0), &mut outbuf), Ok(0));
123    assert_eq!(
124        b.read_from_cursor(Wrapping(10), &mut outbuf),
125        Err(ReadErr::DataUnavailable)
126    );
127
128    b.write(b"Hello");
129    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 5)));
130    assert_eq!(&outbuf, "Hell".as_bytes());
131    outbuf[0] = 0;
132    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 5)));
133    assert_eq!(&outbuf, "Hell".as_bytes());
134    outbuf[0] = 0;
135
136    assert_eq!(b.read_from_cursor(Wrapping(2), &mut outbuf), Ok(3));
137    assert_eq!(&outbuf[..3], "llo".as_bytes());
138
139    b.write(b" World!");
140    const HWLEN: usize = "Hello World!".len();
141    outbuf[0] = 0;
142    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), HWLEN)));
143    assert_eq!(&outbuf, "Hell".as_bytes());
144
145    let erase_h = [0; N - HWLEN + 1];
146    b.write(&erase_h);
147    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(1), N)));
148    assert_eq!(&outbuf, "ello".as_bytes());
149}
150
151#[test]
152/// Test collisions in the buffer
153///
154/// We can't do that easily in threads because we can't force them to turn into a collision, but
155/// with internal access we can block access while we're writing.
156fn test_buffer_collisions() {
157    const N: usize = 1024;
158
159    let b: Buffer<N> = Default::default();
160    let mut outbuf = [0; 4];
161
162    const OFFSET: usize = N - 6; // The overflow would straddle the boundary
163    b.write(&[0; OFFSET]);
164
165    b.write(b"1234");
166    let locked = b.ring.try_lock();
167    b.write(b"5678");
168    drop(locked);
169
170    // We don't guarantee this -- the read could just as well say that we're at 8 and no data is
171    // available
172    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), OFFSET + 4)));
173    assert_eq!(
174        b.read_from_cursor(Wrapping(OFFSET as _), &mut outbuf),
175        Ok(4)
176    );
177    assert_eq!(&outbuf, b"1234");
178
179    b.write(b"90ab");
180    assert_eq!(
181        b.read_earliest(&mut outbuf),
182        Ok((Wrapping(OFFSET as u32 + 8), 4))
183    );
184    assert_eq!(&outbuf, b"90ab");
185}