use std::cell::RefCell;
use std::io::{self, Read};
use std::rc::Rc;
const ASSUMED_LINE_LENGTH: usize = 512;
pub(crate) const RING_BUFFER_SIZE: usize = 6 * ASSUMED_LINE_LENGTH;
pub(crate) const MAX_READ_AHEAD: usize = 2 * ASSUMED_LINE_LENGTH;
struct FixedRingBuffer<const N: usize> {
data: [u8; N],
head: usize,
count: usize,
}
impl<const N: usize> FixedRingBuffer<N> {
const fn new() -> Self {
Self {
data: [0u8; N],
head: 0,
count: 0,
}
}
#[inline]
fn is_empty(&self) -> bool {
self.count == 0
}
#[inline]
fn len(&self) -> usize {
self.count
}
#[inline]
fn push_back(&mut self, value: u8) {
let tail = (self.head + self.count) % N;
self.data[tail] = value;
if self.count == N {
self.head = (self.head + 1) % N;
} else {
self.count += 1;
}
}
#[inline]
fn pop_front(&mut self) -> Option<u8> {
if self.count == 0 {
None
} else {
let value = self.data[self.head];
self.head = (self.head + 1) % N;
self.count -= 1;
Some(value)
}
}
fn iter(&self) -> FixedRingBufferIter<'_, N> {
FixedRingBufferIter {
buffer: self,
pos: 0,
}
}
}
impl<const N: usize> std::fmt::Debug for FixedRingBuffer<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FixedRingBuffer")
.field("head", &self.head)
.field("count", &self.count)
.field("capacity", &N)
.finish()
}
}
struct FixedRingBufferIter<'a, const N: usize> {
buffer: &'a FixedRingBuffer<N>,
pos: usize,
}
impl<'a, const N: usize> Iterator for FixedRingBufferIter<'a, N> {
type Item = u8;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.buffer.count {
None
} else {
let idx = (self.buffer.head + self.pos) % N;
self.pos += 1;
Some(self.buffer.data[idx])
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.buffer.count - self.pos;
(remaining, Some(remaining))
}
}
impl<'a, const N: usize> ExactSizeIterator for FixedRingBufferIter<'a, N> {}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub(crate) struct RecentSnapshot {
pub start_offset: u64,
pub end_offset: u64,
pub start_line: usize,
pub bytes: Vec<u8>,
}
pub(crate) struct RingReader<R> {
inner: R,
ring: FixedRingBuffer<RING_BUFFER_SIZE>,
ring_start_offset: u64,
ring_start_line: usize,
stash: FixedRingBuffer<MAX_READ_AHEAD>,
returned_total: u64,
}
impl<R> RingReader<R> {
pub(crate) fn new(inner: R) -> Self {
Self {
inner,
ring: FixedRingBuffer::new(),
ring_start_offset: 0,
ring_start_line: 1,
stash: FixedRingBuffer::new(),
returned_total: 0,
}
}
#[allow(dead_code)] pub(crate) fn offset(&self) -> u64 {
self.returned_total
}
#[allow(dead_code)] pub(crate) fn read_ahead_len(&self) -> usize {
self.stash.len()
}
#[allow(dead_code)] pub(crate) fn inner(&self) -> &R {
&self.inner
}
#[allow(dead_code)] pub(crate) fn inner_mut(&mut self) -> &mut R {
&mut self.inner
}
#[allow(dead_code)] pub(crate) fn into_inner(self) -> R {
self.inner
}
pub(crate) fn get_recent(&mut self) -> io::Result<RecentSnapshot>
where
R: Read,
{
let already_ahead = self.stash.len();
let can_read_more = MAX_READ_AHEAD.saturating_sub(already_ahead);
if can_read_more > 0 {
let _ = self.read_ahead_at_most(can_read_more)?;
}
let (mut start_offset, mut start_line, mut bytes) = self.ring_snapshot();
if !bytes.is_empty() {
(start_offset, start_line, bytes) =
trim_to_utf8_boundaries_with_line(bytes, start_offset, start_line);
}
let end_offset = start_offset.saturating_add(bytes.len() as u64);
Ok(RecentSnapshot {
start_offset,
end_offset,
start_line,
bytes,
})
}
fn next_inner_offset(&self) -> u64 {
self.returned_total.saturating_add(self.stash.len() as u64)
}
fn ring_snapshot(&self) -> (u64, usize, Vec<u8>) {
if self.ring.is_empty() {
return (self.returned_total, self.ring_start_line, Vec::new());
}
let start_offset = self.ring_start_offset;
let start_line = self.ring_start_line;
let bytes: Vec<u8> = self.ring.iter().collect();
(start_offset, start_line, bytes)
}
pub(crate) fn push_ring_bytes(&mut self, bytes: &[u8], abs_start: u64) {
let mut off = abs_start;
for &b in bytes {
if self.ring.is_empty() {
self.ring_start_offset = off;
}
if self.ring.len() == RING_BUFFER_SIZE {
let evicted = self.ring.pop_front();
self.ring_start_offset = self.ring_start_offset.saturating_add(1);
if evicted == Some(b'\n') {
self.ring_start_line = self.ring_start_line.saturating_add(1);
}
}
self.ring.push_back(b);
off = off.saturating_add(1);
}
}
fn read_ahead_at_most(&mut self, max_additional: usize) -> io::Result<usize>
where
R: Read,
{
if max_additional == 0 {
return Ok(0);
}
const SCRATCH: usize = 8 * 1024;
let mut scratch = [0u8; SCRATCH];
let mut remaining = max_additional;
let mut total = 0usize;
while remaining > 0 {
let want = remaining.min(SCRATCH);
let n = self.inner.read(&mut scratch[..want])?;
if n == 0 {
break; }
let abs_start = self.next_inner_offset();
let chunk = &scratch[..n];
for &b in chunk {
self.stash.push_back(b);
}
self.push_ring_bytes(chunk, abs_start);
total = total.saturating_add(n);
remaining = remaining.saturating_sub(n);
}
Ok(total)
}
fn drain_stash_into(&mut self, out: &mut [u8]) -> usize {
let mut n = 0usize;
while n < out.len() {
let b = match self.stash.pop_front() {
Some(x) => x,
None => break,
};
out[n] = b;
n = n.saturating_add(1);
}
n
}
}
impl<R: Read> Read for RingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if !self.stash.is_empty() {
let n = self.drain_stash_into(buf);
self.returned_total = self.returned_total.saturating_add(n as u64);
return Ok(n);
}
let n = self.inner.read(buf)?;
if n == 0 {
return Ok(0);
}
let chunk = match buf.get(..n) {
Some(s) => s,
None => return Ok(0), };
let abs_start = self.returned_total; self.push_ring_bytes(chunk, abs_start);
self.returned_total = self.returned_total.saturating_add(n as u64);
Ok(n)
}
}
pub(crate) struct SharedRingReader<R> {
inner: Rc<RefCell<RingReader<R>>>,
}
impl<R> SharedRingReader<R> {
pub(crate) fn new(reader: R) -> Self {
Self {
inner: Rc::new(RefCell::new(RingReader::new(reader))),
}
}
pub(crate) fn get_recent(&self) -> io::Result<RecentSnapshot>
where
R: Read,
{
self.inner.borrow_mut().get_recent()
}
pub(crate) fn clone_inner(&self) -> Rc<RefCell<RingReader<R>>> {
Rc::clone(&self.inner)
}
}
pub(crate) struct SharedRingReaderHandle<R> {
inner: Rc<RefCell<RingReader<R>>>,
}
impl<R> SharedRingReaderHandle<R> {
pub(crate) fn new(shared: &SharedRingReader<R>) -> Self {
Self {
inner: shared.clone_inner(),
}
}
}
impl<R: Read> Read for SharedRingReaderHandle<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.borrow_mut().read(buf)
}
}
fn is_utf8_continuation(b: u8) -> bool {
(b & 0b1100_0000) == 0b1000_0000
}
fn utf8_expected_len(lead: u8) -> Option<usize> {
if lead <= 0x7F {
Some(1)
} else if (0xC2..=0xDF).contains(&lead) {
Some(2)
} else if (0xE0..=0xEF).contains(&lead) {
Some(3)
} else if (0xF0..=0xF4).contains(&lead) {
Some(4)
} else {
None
}
}
fn trim_to_utf8_boundaries_with_line(
mut bytes: Vec<u8>,
mut start_offset: u64,
mut start_line: usize,
) -> (u64, usize, Vec<u8>) {
if bytes.is_empty() {
return (start_offset, start_line, bytes);
}
let mut cut = 0usize;
while cut < bytes.len() && is_utf8_continuation(bytes[cut]) {
if bytes[cut] == b'\n' {
start_line = start_line.saturating_add(1);
}
cut = cut.saturating_add(1);
}
if cut > 0 {
bytes.drain(..cut);
start_offset = start_offset.saturating_add(cut as u64);
}
trim_incomplete_utf8_tail(&mut bytes);
(start_offset, start_line, bytes)
}
fn trim_incomplete_utf8_tail(bytes: &mut Vec<u8>) {
loop {
if bytes.is_empty() {
return;
}
let mut cont = 0usize;
let mut i = bytes.len();
while i > 0 && cont < 3 {
let b = bytes[i - 1];
if is_utf8_continuation(b) {
cont = cont.saturating_add(1);
i = i.saturating_sub(1);
} else {
break;
}
}
if i == 0 {
bytes.clear();
return;
}
let lead_idx = i - 1;
let lead = bytes[lead_idx];
let expected = match utf8_expected_len(lead) {
Some(n) => n,
None => {
return;
}
};
let actual = bytes.len().saturating_sub(lead_idx);
if actual < expected {
bytes.truncate(lead_idx);
continue;
}
return;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{self, Read};
#[derive(Debug)]
struct CountingReader {
data: Vec<u8>,
pos: usize,
bytes_read_total: usize,
max_chunk: usize,
}
impl CountingReader {
fn new(data: Vec<u8>) -> Self {
Self::with_max_chunk(data, usize::MAX)
}
fn with_max_chunk(data: Vec<u8>, max_chunk: usize) -> Self {
Self {
data,
pos: 0,
bytes_read_total: 0,
max_chunk: max_chunk.max(1),
}
}
fn bytes_read(&self) -> usize {
self.bytes_read_total
}
}
impl Read for CountingReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if self.pos >= self.data.len() {
return Ok(0);
}
let remaining = self.data.len().saturating_sub(self.pos);
let n = buf.len().min(self.max_chunk).min(remaining);
if n == 0 {
return Ok(0);
}
buf[..n].copy_from_slice(&self.data[self.pos..self.pos + n]);
self.pos += n;
self.bytes_read_total += n;
Ok(n)
}
}
fn make_ascii_data(len: usize) -> Vec<u8> {
(0..len).map(|i| b'a' + (i % 26) as u8).collect()
}
fn assert_snapshot_offsets_consistent(s: &RecentSnapshot) {
assert_eq!(s.end_offset, s.start_offset + s.bytes.len() as u64);
}
#[test]
fn empty_input_snapshot_is_empty_and_offsets_zero() {
let inner = CountingReader::new(Vec::new());
let mut rr = RingReader::new(inner);
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), 0);
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert_eq!(snap.start_offset, 0);
assert_eq!(snap.end_offset, 0);
assert!(snap.bytes.is_empty());
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), 0);
assert_eq!(rr.inner().bytes_read(), 0);
}
#[test]
fn read_with_empty_buffer_returns_0_and_does_not_touch_inner() {
let data = make_ascii_data(128);
let inner = CountingReader::new(data);
let mut rr = RingReader::new(inner);
let mut buf = [0u8; 0];
let n = rr.read(&mut buf).unwrap();
assert_eq!(n, 0);
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), 0);
assert_eq!(rr.inner().bytes_read(), 0);
}
#[test]
fn get_recent_reads_ahead_up_to_max_and_does_not_advance_offset() {
let data = make_ascii_data(MAX_READ_AHEAD * 3 + 123);
let inner = CountingReader::with_max_chunk(data.clone(), 7);
let mut rr = RingReader::new(inner);
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), 0);
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_eq!(rr.inner().bytes_read(), MAX_READ_AHEAD);
assert_eq!(snap.start_offset, 0);
assert_eq!(snap.bytes.len(), MAX_READ_AHEAD);
assert_eq!(&snap.bytes[..], &data[..MAX_READ_AHEAD]);
}
#[test]
fn get_recent_does_not_read_more_when_stash_already_full() {
let data = make_ascii_data(MAX_READ_AHEAD * 2 + 5);
let inner = CountingReader::with_max_chunk(data.clone(), 5);
let mut rr = RingReader::new(inner);
let snap1 = rr.get_recent().unwrap();
let inner_read1 = rr.inner().bytes_read();
assert_eq!(inner_read1, MAX_READ_AHEAD);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_snapshot_offsets_consistent(&snap1);
let snap2 = rr.get_recent().unwrap();
let inner_read2 = rr.inner().bytes_read();
assert_eq!(inner_read2, inner_read1, "inner must not be read again");
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_snapshot_offsets_consistent(&snap2);
assert_eq!(snap2.bytes, snap1.bytes);
assert_eq!(snap2.start_offset, snap1.start_offset);
assert_eq!(snap2.end_offset, snap1.end_offset);
}
#[test]
fn read_drains_stash_before_touching_inner_again() {
let data = make_ascii_data(MAX_READ_AHEAD * 2 + 77);
let inner = CountingReader::with_max_chunk(data.clone(), 11);
let mut rr = RingReader::new(inner);
rr.get_recent().unwrap();
assert_eq!(rr.offset(), 0);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
let inner_before = rr.inner().bytes_read();
let mut first = vec![0u8; 17];
rr.read_exact(&mut first).unwrap();
assert_eq!(&first[..], &data[..17]);
assert_eq!(rr.offset(), 17);
assert_eq!(
rr.inner().bytes_read(),
inner_before,
"must not read inner while draining stash"
);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD - 17);
}
#[test]
fn get_recent_refills_only_the_missing_amount_to_reach_cap() {
let data = make_ascii_data(MAX_READ_AHEAD * 3 + 10);
let inner = CountingReader::with_max_chunk(data.clone(), 3);
let mut rr = RingReader::new(inner);
rr.get_recent().unwrap();
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_eq!(rr.inner().bytes_read(), MAX_READ_AHEAD);
let drain = 100usize.min(MAX_READ_AHEAD);
let mut tmp = vec![0u8; drain];
rr.read_exact(&mut tmp).unwrap();
assert_eq!(&tmp[..], &data[..drain]);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD - drain);
let inner_before = rr.inner().bytes_read();
rr.get_recent().unwrap();
let inner_after = rr.inner().bytes_read();
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_eq!(
inner_after - inner_before,
drain,
"must read only the missing bytes"
);
}
#[test]
fn stream_integrity_with_read_ahead_matches_original_input() {
let data = make_ascii_data(MAX_READ_AHEAD * 2 + 123);
let inner = CountingReader::with_max_chunk(data.clone(), 9);
let mut rr = RingReader::new(inner);
rr.get_recent().unwrap();
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_eq!(rr.offset(), 0);
let mut out = Vec::new();
rr.read_to_end(&mut out).unwrap();
assert_eq!(out, data);
assert_eq!(rr.offset(), data.len() as u64);
assert_eq!(rr.read_ahead_len(), 0);
assert_eq!(rr.inner().bytes_read(), data.len());
}
#[test]
fn stream_integrity_with_periodic_snapshots_matches_original_input() {
let data = make_ascii_data(MAX_READ_AHEAD * 3 + 2000);
let inner = CountingReader::with_max_chunk(data.clone(), 17);
let mut rr = RingReader::new(inner);
let mut out = Vec::new();
let mut buf = vec![0u8; 57];
while out.len() < data.len() {
if out.len() % 400 == 0 {
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert!(rr.read_ahead_len() <= MAX_READ_AHEAD);
}
let n = rr.read(&mut buf).unwrap();
if n == 0 {
break;
}
out.extend_from_slice(&buf[..n]);
}
assert_eq!(out, data);
assert_eq!(rr.offset(), data.len() as u64);
assert_eq!(rr.inner().bytes_read(), data.len());
}
#[test]
fn ring_eviction_reports_correct_window_and_offset() {
let extra = 123usize;
let total = RING_BUFFER_SIZE + extra;
let data = make_ascii_data(total);
let inner = std::io::Cursor::new(data.clone());
let mut rr = RingReader::new(inner);
let mut out = Vec::new();
rr.read_to_end(&mut out).unwrap();
assert_eq!(out, data);
assert_eq!(rr.offset(), total as u64);
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert_eq!(snap.bytes.len(), RING_BUFFER_SIZE);
assert_eq!(snap.start_offset, extra as u64);
assert_eq!(snap.end_offset, total as u64);
assert_eq!(&snap.bytes[..], &data[extra..]);
}
#[test]
fn get_recent_trims_leading_utf8_continuations_and_adjusts_start_offset() {
let inner = std::io::Cursor::new(Vec::<u8>::new());
let mut rr = RingReader::new(inner);
rr.push_ring_bytes(&[0x82, 0xAC, b'a', b'b'], 100);
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert_eq!(snap.start_offset, 102);
assert_eq!(snap.bytes, vec![b'a', b'b']);
assert_eq!(snap.end_offset, 104);
}
#[test]
fn get_recent_trims_trailing_incomplete_utf8_sequence() {
let inner = std::io::Cursor::new(Vec::<u8>::new());
let mut rr = RingReader::new(inner);
rr.push_ring_bytes(&[b'a', b'b', b'c', 0xE2], 5);
let snap = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap);
assert_eq!(snap.start_offset, 5);
assert_eq!(snap.bytes, b"abc".to_vec());
assert_eq!(snap.end_offset, 8);
}
#[test]
fn snapshot_contains_read_ahead_bytes_and_grows_when_refilled() {
let data = make_ascii_data(MAX_READ_AHEAD + 10);
let inner = CountingReader::with_max_chunk(data.clone(), 13);
let mut rr = RingReader::new(inner);
let snap1 = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap1);
assert_eq!(snap1.bytes.len(), MAX_READ_AHEAD);
assert_eq!(&snap1.bytes[..], &data[..MAX_READ_AHEAD]);
let mut one = [0u8; 1];
rr.read_exact(&mut one).unwrap();
assert_eq!(one[0], data[0]);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD - 1);
let snap2 = rr.get_recent().unwrap();
assert_snapshot_offsets_consistent(&snap2);
assert_eq!(rr.read_ahead_len(), MAX_READ_AHEAD);
assert_eq!(snap2.start_offset, 0);
assert_eq!(snap2.bytes.len(), MAX_READ_AHEAD + 1);
assert_eq!(&snap2.bytes[..], &data[..MAX_READ_AHEAD + 1]);
}
}