use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone)]
pub struct LargeBufferConfig {
pub initial_capacity: usize,
pub max_capacity: usize,
pub spill_threshold: usize,
pub temp_dir: Option<PathBuf>,
}
impl Default for LargeBufferConfig {
fn default() -> Self {
Self {
initial_capacity: 64 * 1024, max_capacity: 1024 * 1024 * 1024, spill_threshold: 64 * 1024 * 1024, temp_dir: None,
}
}
}
impl LargeBufferConfig {
#[must_use]
pub fn new(max_capacity: usize) -> Self {
Self {
max_capacity,
..Default::default()
}
}
#[must_use]
pub const fn initial_capacity(mut self, capacity: usize) -> Self {
self.initial_capacity = capacity;
self
}
#[must_use]
pub const fn spill_threshold(mut self, threshold: usize) -> Self {
self.spill_threshold = threshold;
self
}
#[must_use]
pub fn temp_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.temp_dir = Some(dir.into());
self
}
}
#[derive(Debug)]
pub struct RingBuffer {
data: Vec<u8>,
capacity: usize,
head: usize,
tail: usize,
full: bool,
}
impl RingBuffer {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
data: vec![0u8; capacity],
capacity,
head: 0,
tail: 0,
full: false,
}
}
#[must_use]
pub const fn len(&self) -> usize {
if self.full {
self.capacity
} else if self.head >= self.tail {
self.head - self.tail
} else {
self.capacity - self.tail + self.head
}
}
#[must_use]
pub const fn is_empty(&self) -> bool {
!self.full && self.head == self.tail
}
#[must_use]
pub const fn is_full(&self) -> bool {
self.full
}
#[must_use]
pub const fn capacity(&self) -> usize {
self.capacity
}
pub fn write(&mut self, data: &[u8]) {
for &byte in data {
self.data[self.head] = byte;
self.head = (self.head + 1) % self.capacity;
if self.full {
self.tail = (self.tail + 1) % self.capacity;
}
if self.head == self.tail {
self.full = true;
}
}
}
#[must_use]
pub fn read_all(&self) -> Vec<u8> {
let len = self.len();
let mut result = Vec::with_capacity(len);
if len == 0 {
return result;
}
if self.head > self.tail {
result.extend_from_slice(&self.data[self.tail..self.head]);
} else {
result.extend_from_slice(&self.data[self.tail..]);
result.extend_from_slice(&self.data[..self.head]);
}
result
}
#[must_use]
pub fn as_string(&self) -> String {
String::from_utf8_lossy(&self.read_all()).into_owned()
}
pub const fn clear(&mut self) {
self.head = 0;
self.tail = 0;
self.full = false;
}
#[must_use]
pub fn tail_bytes(&self, n: usize) -> Vec<u8> {
let len = self.len();
if n >= len {
return self.read_all();
}
let all = self.read_all();
all[len - n..].to_vec()
}
}
enum Storage {
Memory(Vec<u8>),
File {
file: std::fs::File,
path: PathBuf,
size: usize,
},
}
pub struct SpillBuffer {
storage: Storage,
config: LargeBufferConfig,
write_pos: usize,
spilled: bool,
}
impl SpillBuffer {
#[must_use]
pub fn new() -> Self {
Self::with_config(LargeBufferConfig::default())
}
#[must_use]
pub fn with_config(config: LargeBufferConfig) -> Self {
Self {
storage: Storage::Memory(Vec::with_capacity(config.initial_capacity)),
config,
write_pos: 0,
spilled: false,
}
}
#[must_use]
pub const fn is_spilled(&self) -> bool {
self.spilled
}
#[must_use]
pub const fn len(&self) -> usize {
self.write_pos
}
#[must_use]
pub const fn is_empty(&self) -> bool {
self.write_pos == 0
}
pub fn write(&mut self, data: &[u8]) -> io::Result<()> {
let new_size = self.write_pos + data.len();
if !self.spilled
&& self.config.spill_threshold > 0
&& new_size > self.config.spill_threshold
{
self.spill_to_disk()?;
}
if new_size > self.config.max_capacity {
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"Buffer exceeded maximum capacity",
));
}
match &mut self.storage {
Storage::Memory(vec) => {
vec.extend_from_slice(data);
self.write_pos = vec.len();
}
Storage::File { file, size, .. } => {
file.seek(SeekFrom::End(0))?;
file.write_all(data)?;
*size += data.len();
self.write_pos = *size;
}
}
Ok(())
}
fn spill_to_disk(&mut self) -> io::Result<()> {
if self.spilled {
return Ok(());
}
let temp_dir = self
.config
.temp_dir
.as_ref()
.map_or_else(std::env::temp_dir, std::clone::Clone::clone);
let path = temp_dir.join(format!("rust_expect_buffer_{}", std::process::id()));
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
if let Storage::Memory(vec) = &self.storage {
file.write_all(vec)?;
}
let size = self.write_pos;
self.storage = Storage::File { file, path, size };
self.spilled = true;
Ok(())
}
pub fn read_all(&mut self) -> io::Result<Vec<u8>> {
match &mut self.storage {
Storage::Memory(vec) => Ok(vec.clone()),
Storage::File { file, size, .. } => {
file.seek(SeekFrom::Start(0))?;
let mut data = vec![0u8; *size];
file.read_exact(&mut data)?;
Ok(data)
}
}
}
pub fn as_string(&mut self) -> io::Result<String> {
Ok(String::from_utf8_lossy(&self.read_all()?).into_owned())
}
pub fn clear(&mut self) -> io::Result<()> {
match &mut self.storage {
Storage::Memory(vec) => {
vec.clear();
}
Storage::File { file, size, .. } => {
file.set_len(0)?;
*size = 0;
}
}
self.write_pos = 0;
Ok(())
}
}
impl Default for SpillBuffer {
fn default() -> Self {
Self::new()
}
}
impl Drop for SpillBuffer {
fn drop(&mut self) {
if let Storage::File { path, .. } = &self.storage {
let _ = std::fs::remove_file(path);
}
}
}
impl std::fmt::Debug for SpillBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpillBuffer")
.field("len", &self.write_pos)
.field("spilled", &self.spilled)
.field("max_capacity", &self.config.max_capacity)
.finish()
}
}
#[derive(Debug, Default)]
pub struct AtomicBufferSize {
size: AtomicUsize,
}
impl AtomicBufferSize {
#[must_use]
pub const fn new() -> Self {
Self {
size: AtomicUsize::new(0),
}
}
#[must_use]
pub fn get(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
pub fn add(&self, n: usize) {
self.size.fetch_add(n, Ordering::Relaxed);
}
pub fn sub(&self, n: usize) {
self.size.fetch_sub(n, Ordering::Relaxed);
}
pub fn set(&self, n: usize) {
self.size.store(n, Ordering::Relaxed);
}
pub fn reset(&self) {
self.size.store(0, Ordering::Relaxed);
}
}
#[cfg(unix)]
#[must_use]
pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
let page_size = page_size();
let aligned_size = (size + page_size - 1) & !(page_size - 1);
vec![0u8; aligned_size]
}
#[cfg(unix)]
#[must_use]
#[allow(unsafe_code)]
pub fn page_size() -> usize {
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096 } else {
size as usize
}
}
#[cfg(windows)]
#[must_use]
pub fn page_size() -> usize {
4096 }
#[cfg(windows)]
#[must_use]
pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
vec![0u8; size]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ring_buffer_basic() {
let mut buf = RingBuffer::new(10);
assert!(buf.is_empty());
assert_eq!(buf.capacity(), 10);
buf.write(b"hello");
assert_eq!(buf.len(), 5);
assert_eq!(buf.as_string(), "hello");
}
#[test]
fn ring_buffer_wrap() {
let mut buf = RingBuffer::new(10);
buf.write(b"12345678"); assert_eq!(buf.len(), 8);
buf.write(b"ABCD"); assert_eq!(buf.len(), 10); assert!(buf.is_full());
let content = buf.as_string();
assert_eq!(content.len(), 10);
assert!(content.ends_with("ABCD"));
}
#[test]
fn ring_buffer_tail_bytes() {
let mut buf = RingBuffer::new(20);
buf.write(b"hello world");
let tail = buf.tail_bytes(5);
assert_eq!(tail, b"world");
let tail = buf.tail_bytes(100);
assert_eq!(tail, b"hello world");
}
#[test]
fn ring_buffer_clear() {
let mut buf = RingBuffer::new(10);
buf.write(b"hello");
buf.clear();
assert!(buf.is_empty());
assert_eq!(buf.len(), 0);
}
#[test]
fn spill_buffer_memory() {
let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(0);
let mut buf = SpillBuffer::with_config(config);
buf.write(b"hello world").unwrap();
assert!(!buf.is_spilled());
assert_eq!(buf.len(), 11);
assert_eq!(buf.as_string().unwrap(), "hello world");
}
#[test]
fn spill_buffer_spill() {
let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(10);
let mut buf = SpillBuffer::with_config(config);
buf.write(b"hello").unwrap();
assert!(!buf.is_spilled());
buf.write(b"world!!!").unwrap();
assert!(buf.is_spilled());
assert_eq!(buf.as_string().unwrap(), "helloworld!!!");
}
#[test]
fn atomic_buffer_size() {
let size = AtomicBufferSize::new();
assert_eq!(size.get(), 0);
size.add(100);
assert_eq!(size.get(), 100);
size.sub(30);
assert_eq!(size.get(), 70);
size.set(500);
assert_eq!(size.get(), 500);
size.reset();
assert_eq!(size.get(), 0);
}
#[test]
fn page_aligned_allocation() {
let buf = allocate_page_aligned(1000);
assert!(buf.len() >= 1000);
let page = page_size();
assert!(page >= 4096);
}
}