use crate::{BufferPool, IoBuf, IoBufMut};
use bytes::BufMut;
use std::ops::{Bound, RangeBounds};
pub(super) struct Buffer {
data: IoBuf,
len: usize,
pub(super) offset: u64,
pub(super) capacity: usize,
pool: BufferPool,
}
impl Buffer {
pub(super) fn new(offset: u64, capacity: usize, pool: BufferPool) -> Self {
Self::from(offset, IoBuf::default(), capacity, pool)
}
pub(super) fn from(offset: u64, data: IoBuf, capacity: usize, pool: BufferPool) -> Self {
let len = data.len();
Self {
data,
len,
offset,
capacity,
pool,
}
}
pub(super) const fn size(&self) -> u64 {
self.offset + self.len as u64
}
pub(super) const fn len(&self) -> usize {
self.len
}
pub(super) const fn is_empty(&self) -> bool {
self.len == 0
}
pub(super) fn slice(&self, range: impl RangeBounds<usize>) -> IoBuf {
let start = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.checked_add(1).expect("range start overflow"),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).expect("range end overflow"),
Bound::Excluded(&n) => n,
Bound::Unbounded => self.len,
};
assert!(start <= end, "slice start must be <= end");
assert!(end <= self.len, "slice out of bounds");
self.data.slice(start..end)
}
pub(super) fn resize(&mut self, len: u64) -> Option<(IoBuf, u64)> {
if self.is_empty() {
self.offset = len;
return None;
}
if len >= self.size() {
let previous = self
.take()
.expect("take must succeed when resize observes buffered data");
self.offset = len;
Some(previous)
} else if len >= self.offset {
self.len = (len - self.offset) as usize;
None
} else {
self.len = 0;
self.offset = len;
None
}
}
pub(super) fn take(&mut self) -> Option<(IoBuf, u64)> {
if self.is_empty() {
return None;
}
let len = std::mem::take(&mut self.len);
let offset = self.offset;
self.offset += len as u64;
let mut data = std::mem::take(&mut self.data);
Some((data.split_to(len), offset))
}
fn writable(&mut self, needed: usize) -> IoBufMut {
let current = std::mem::take(&mut self.data);
let source = match current.try_into_mut() {
Ok(mut writable) => {
writable.truncate(self.len);
if writable.capacity() >= needed {
return writable;
}
writable.freeze()
}
Err(shared) => shared,
};
let target = needed.max(self.capacity);
let mut grown = self.pool.alloc(target);
grown.put_slice(&source.as_ref()[..self.len]);
grown
}
pub(super) fn merge(&mut self, data: &[u8], offset: u64) -> bool {
let end_offset = offset
.checked_add(data.len() as u64)
.expect("end_offset overflow");
let can_merge_into_buffer =
offset >= self.offset && end_offset <= self.offset + self.capacity as u64;
if !can_merge_into_buffer {
return false;
}
let start = (offset - self.offset) as usize;
let end = start + data.len();
let mut writable = self.writable(end);
let prev = writable.len();
if end > prev {
writable.put_bytes(0, end - prev);
}
writable.as_mut()[start..end].copy_from_slice(data.as_ref());
self.len = writable.len();
self.data = writable.freeze();
true
}
pub(super) fn append(&mut self, data: &[u8]) -> bool {
let end = self.len + data.len();
let mut writable = self.writable(end);
writable.put_slice(data);
let over_capacity = writable.len() > self.capacity;
self.len = writable.len();
self.data = writable.freeze();
over_capacity
}
pub(super) fn drop_prefix(&mut self, len: usize) {
assert!(len <= self.len);
if len == 0 {
return;
}
let current_len = self.len;
if len == current_len {
self.len = 0;
return;
}
self.data = self.data.slice(len..current_len);
self.len = current_len - len;
}
pub(super) const fn clear(&mut self) {
self.len = 0;
}
}
impl AsRef<[u8]> for Buffer {
fn as_ref(&self) -> &[u8] {
&self.data.as_ref()[..self.len]
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometheus_client::registry::Registry;
#[test]
fn test_tip_append() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::new(50, 100, pool);
assert_eq!(buffer.size(), 50);
assert!(buffer.is_empty());
assert!(buffer.take().is_none());
assert!(!buffer.append(&[1, 2, 3]));
assert_eq!(buffer.size(), 53);
assert!(!buffer.is_empty());
let taken = buffer.take().unwrap();
assert_eq!(taken.0.as_ref(), &[1, 2, 3]);
assert_eq!(taken.1, 50);
assert_eq!(buffer.size(), 53);
assert!(buffer.take().is_none());
let mut buf = vec![42; 100];
assert!(!buffer.append(&buf));
assert_eq!(buffer.size(), 153);
assert!(buffer.append(&[43]));
assert_eq!(buffer.size(), 154);
buf.push(43);
let taken = buffer.take().unwrap();
assert_eq!(taken.0.as_ref(), buf.as_slice());
assert_eq!(taken.1, 53);
}
#[test]
fn test_tip_resize() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::new(50, 100, pool);
buffer.append(&[1, 2, 3]);
assert_eq!(buffer.size(), 53);
let resized = buffer.resize(60).unwrap();
assert_eq!(resized.0.as_ref(), &[1, 2, 3]);
assert_eq!(resized.1, 50);
assert_eq!(buffer.size(), 60);
assert!(buffer.take().is_none());
buffer.append(&[4, 5, 6]);
assert_eq!(buffer.size(), 63);
assert!(buffer.resize(61).is_none());
assert_eq!(buffer.size(), 61);
let taken = buffer.take().unwrap();
assert_eq!(taken.0.as_ref(), &[4]);
assert_eq!(taken.1, 60);
assert_eq!(buffer.size(), 61);
buffer.append(&[7, 8, 9]);
assert!(buffer.resize(59).is_none());
assert_eq!(buffer.size(), 59);
assert!(buffer.take().is_none());
assert_eq!(buffer.size(), 59);
}
#[test]
fn test_tip_first_merge_from_empty() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::new(0, 16, pool);
assert!(buffer.data.is_empty());
assert!(buffer.merge(b"abc", 0));
assert_eq!(buffer.data.as_ref(), b"abc");
}
#[test]
fn test_tip_slice_uses_resolved_bounds() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::new(0, 16, pool);
buffer.append(b"stale");
let _ = buffer.take().expect("buffer should contain data");
assert!(buffer.slice(..).is_empty());
assert!(buffer.slice(0..).is_empty());
}
#[test]
fn test_tip_writable_copies_when_slice_is_live() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::new(0, 16, pool);
assert!(!buffer.append(b"abc"));
let snapshot = buffer.slice(..);
let mut writable = buffer.writable(6);
assert_eq!(writable.as_ref(), b"abc");
assert_ne!(writable.as_ref().as_ptr(), snapshot.as_ref().as_ptr());
writable.put_slice(b"def");
writable.as_mut()[0] = b'X';
assert_eq!(snapshot.as_ref(), b"abc");
assert_eq!(writable.as_ref(), b"Xbcdef");
}
#[test]
fn test_tip_from_preserves_seed_bytes_until_mutated() {
let mut registry = Registry::default();
let pool = crate::BufferPool::new(crate::BufferPoolConfig::for_storage(), &mut registry);
let mut buffer = Buffer::from(7, IoBuf::from(&b"abc"[..]), 16, pool);
assert_eq!(buffer.offset, 7);
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.as_ref(), b"abc");
assert!(!buffer.append(b"def"));
assert_eq!(buffer.as_ref(), b"abcdef");
}
}