1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
use core::cmp::min;
use core::num::Wrapping;
use core::sync::atomic::{AtomicU32, Ordering};
use ringbuf::Rb;

use crate::util::write_from_slices;

/// A very simple implementation of the infinilog buffer.
///
/// See module level documentation on its general properties.
#[derive(Default)]
pub struct Buffer<const N: usize> {
    ring: try_lock::TryLock<CountedRing<N>>,
    /// Bytes lost due to parallel operations
    lost: AtomicU32,
}

#[derive(Default)]
struct CountedRing<const N: usize> {
    buf: ringbuf::ring_buffer::LocalRb<u8, [core::mem::MaybeUninit<u8>; N]>,
    total_written: Wrapping<u32>,
}

/// Error type for read_earliest, indicating that not only could no data be read, but not even a cursor
/// position could be returned.
#[derive(Debug, PartialEq)]
pub struct BufferUnavailable;

/// Error type for read.
///
/// Note that "that is the latest cursor position, nobody wrote anything more" is not an error but
/// expressed as a 0-length result.
#[derive(Debug, PartialEq)]
pub enum ReadErr {
    /// No information about the buffer is available at this time
    BufferUnavailable,
    /// The indicated cursor position has either not been reached yet, or the data has been
    /// overwritten already
    DataUnavailable,
}

impl<const N: usize> Buffer<N> {
    /// Read the earliest available data into outbuf. On success, returns the cursor position
    /// corresponding to the start of outbuf, and the number of bytes available (which is the
    /// number of bytes copied into outbuf unless it exceeds its size).
    pub fn read_earliest(
        &self,
        outbuf: &mut [u8],
    ) -> Result<(Wrapping<u32>, usize), BufferUnavailable> {
        let Some(ring) = self.ring.try_lock() else { return Err(BufferUnavailable) };

        let (first, second) = ring.buf.as_slices();
        // Write as much data from first and second into outbuf
        write_from_slices(outbuf, first, second);

        Ok((
            ring.total_written - Wrapping(ring.buf.len() as _),
            ring.buf.len(),
        ))
    }

    pub fn read_from_cursor(
        &self,
        cursor: Wrapping<u32>,
        outbuf: &mut [u8],
    ) -> Result<usize, ReadErr> {
        let Some(ring) = self.ring.try_lock() else { return Err(ReadErr::BufferUnavailable) };

        let (mut first, mut second) = ring.buf.as_slices();
        let cursor_in_firstsecond = cursor - ring.total_written + Wrapping(ring.buf.len() as _);
        // With this coordinate transformation, we don't have to distinguish "before" and "after"
        // (which we can't do anyway, effectively) -- it's just "we have it" or "we don't have it"
        // any more.
        let mut cursor_in_firstsecond = cursor_in_firstsecond.0 as usize;
        if cursor_in_firstsecond > first.len() + second.len() {
            return Err(ReadErr::DataUnavailable);
        }

        let remove_from_first = min(first.len(), cursor_in_firstsecond);
        cursor_in_firstsecond -= remove_from_first;
        first = &first[remove_from_first..];
        let remove_from_second = min(second.len(), cursor_in_firstsecond);
        cursor_in_firstsecond -= remove_from_second;
        second = &second[remove_from_second..];
        debug_assert!(cursor_in_firstsecond == 0);

        let len_from_cursor = first.len() + second.len();
        write_from_slices(outbuf, first, second);

        Ok(len_from_cursor)
    }

    pub fn write(&self, data: &[u8]) {
        if let Some(mut ring) = self.ring.try_lock() {
            let lost_before_us = self.lost.swap(0, Ordering::Relaxed);

            if lost_before_us != 0 {
                ring.total_written += Wrapping(lost_before_us);
                ring.buf.clear();
            }

            ring.buf.push_slice_overwrite(data);
            ring.total_written += Wrapping(data.len() as _);
        } else {
            /* as _: deliberately truncating -- our logical address space only is that big */
            self.lost.fetch_add(data.len() as _, Ordering::Relaxed);
        }
    }
}

#[test]
fn test_buffer_readwrite() {
    const N: usize = 1024;

    let b: Buffer<N> = Default::default();
    let mut outbuf = [0; 4];
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 0)));
    assert_eq!(b.read_from_cursor(Wrapping(0), &mut outbuf), Ok(0));
    assert_eq!(
        b.read_from_cursor(Wrapping(10), &mut outbuf),
        Err(ReadErr::DataUnavailable)
    );

    b.write(b"Hello");
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 5)));
    assert_eq!(&outbuf, "Hell".as_bytes());
    outbuf[0] = 0;
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), 5)));
    assert_eq!(&outbuf, "Hell".as_bytes());
    outbuf[0] = 0;

    assert_eq!(b.read_from_cursor(Wrapping(2), &mut outbuf), Ok(3));
    assert_eq!(&outbuf[..3], "llo".as_bytes());

    b.write(b" World!");
    const HWLEN: usize = "Hello World!".len();
    outbuf[0] = 0;
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), HWLEN)));
    assert_eq!(&outbuf, "Hell".as_bytes());

    let erase_h = [0; N - HWLEN + 1];
    b.write(&erase_h);
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(1), N)));
    assert_eq!(&outbuf, "ello".as_bytes());
}

#[test]
/// Test collisions in the buffer
///
/// We can't do that easily in threads because we can't force them to turn into a collision, but
/// with internal access we can block access while we're writing.
fn test_buffer_collisions() {
    const N: usize = 1024;

    let b: Buffer<N> = Default::default();
    let mut outbuf = [0; 4];

    const OFFSET: usize = N - 6; // The overflow would straddle the boundary
    b.write(&[0; OFFSET]);

    b.write(b"1234");
    let locked = b.ring.try_lock();
    b.write(b"5678");
    drop(locked);

    // We don't guarantee this -- the read could just as well say that we're at 8 and no data is
    // available
    assert_eq!(b.read_earliest(&mut outbuf), Ok((Wrapping(0), OFFSET + 4)));
    assert_eq!(
        b.read_from_cursor(Wrapping(OFFSET as _), &mut outbuf),
        Ok(4)
    );
    assert_eq!(&outbuf, b"1234");

    b.write(b"90ab");
    assert_eq!(
        b.read_earliest(&mut outbuf),
        Ok((Wrapping(OFFSET as u32 + 8), 4))
    );
    assert_eq!(&outbuf, b"90ab");
}