#![allow(dead_code)]
use std::fmt;
#[derive(Debug, Clone)]
pub struct SpliceConfig {
pub pipe_buffer_size: usize,
pub max_transfer_size: usize,
pub non_blocking: bool,
pub hint_more: bool,
}
impl Default for SpliceConfig {
fn default() -> Self {
Self {
pipe_buffer_size: 64 * 1024,
max_transfer_size: 1024 * 1024,
non_blocking: false,
hint_more: false,
}
}
}
impl SpliceConfig {
#[must_use]
pub fn for_media() -> Self {
Self {
pipe_buffer_size: 256 * 1024,
max_transfer_size: 4 * 1024 * 1024,
non_blocking: true,
hint_more: true,
}
}
#[must_use]
pub fn for_metadata() -> Self {
Self {
pipe_buffer_size: 4096,
max_transfer_size: 64 * 1024,
non_blocking: false,
hint_more: false,
}
}
#[must_use]
pub fn is_valid(&self) -> bool {
self.pipe_buffer_size > 0
&& self.max_transfer_size > 0
&& self.max_transfer_size >= self.pipe_buffer_size
}
}
#[derive(Debug, Clone)]
pub struct SpliceResult {
pub bytes_transferred: u64,
pub splice_calls: u64,
pub completed: bool,
pub zero_copy: bool,
}
impl SpliceResult {
#[must_use]
pub fn new() -> Self {
Self {
bytes_transferred: 0,
splice_calls: 0,
completed: false,
zero_copy: false,
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn avg_bytes_per_call(&self) -> f64 {
if self.splice_calls == 0 {
return 0.0;
}
self.bytes_transferred as f64 / self.splice_calls as f64
}
}
impl Default for SpliceResult {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for SpliceResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"transferred {} bytes in {} calls (zero_copy={}, completed={})",
self.bytes_transferred, self.splice_calls, self.zero_copy, self.completed
)
}
}
#[derive(Clone)]
pub struct PipeBuffer {
data: Vec<u8>,
head: usize,
tail: usize,
full: bool,
}
impl PipeBuffer {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
data: vec![0u8; capacity],
head: 0,
tail: 0,
full: false,
}
}
#[must_use]
pub fn capacity(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn available(&self) -> usize {
if self.full {
self.data.len()
} else if self.tail >= self.head {
self.tail - self.head
} else {
self.data.len() - self.head + self.tail
}
}
#[must_use]
pub fn free_space(&self) -> usize {
self.capacity() - self.available()
}
#[must_use]
pub fn is_empty(&self) -> bool {
!self.full && self.head == self.tail
}
#[must_use]
pub fn is_full(&self) -> bool {
self.full
}
pub fn write(&mut self, src: &[u8]) -> usize {
if self.full {
return 0;
}
let cap = self.data.len();
let mut written = 0;
for &byte in src {
if self.full {
break;
}
self.data[self.tail] = byte;
self.tail = (self.tail + 1) % cap;
if self.tail == self.head {
self.full = true;
}
written += 1;
}
written
}
pub fn read(&mut self, dst: &mut [u8]) -> usize {
if self.is_empty() {
return 0;
}
let cap = self.data.len();
let mut count = 0;
for slot in dst.iter_mut() {
if !self.full && self.head == self.tail {
break;
}
*slot = self.data[self.head];
self.head = (self.head + 1) % cap;
self.full = false;
count += 1;
}
count
}
pub fn clear(&mut self) {
self.head = 0;
self.tail = 0;
self.full = false;
}
}
impl fmt::Debug for PipeBuffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipeBuffer")
.field("head", &self.head)
.field("tail", &self.tail)
.field("full", &self.full)
.field("capacity", &self.capacity())
.field("available", &self.available())
.field("free_space", &self.free_space())
.finish_non_exhaustive()
}
}
pub struct SplicePipe {
buffer: PipeBuffer,
config: SpliceConfig,
result: SpliceResult,
}
impl SplicePipe {
#[must_use]
pub fn new(config: SpliceConfig) -> Self {
let buffer = PipeBuffer::new(config.pipe_buffer_size);
Self {
buffer,
config,
result: SpliceResult::new(),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(SpliceConfig::default())
}
#[allow(clippy::cast_possible_truncation)]
pub fn transfer<R: std::io::Read, W: std::io::Write>(
&mut self,
reader: &mut R,
writer: &mut W,
limit: u64,
) -> std::io::Result<SpliceResult> {
let mut total: u64 = 0;
let mut calls: u64 = 0;
loop {
if total >= limit {
break;
}
let remaining = (limit - total).min(self.buffer.free_space() as u64) as usize;
if remaining == 0 && self.buffer.is_empty() {
break;
}
if remaining > 0 && !self.buffer.is_full() {
let mut tmp = vec![0u8; remaining];
let n = reader.read(&mut tmp)?;
if n == 0 {
if self.buffer.is_empty() {
self.result.completed = true;
break;
}
} else {
self.buffer.write(&tmp[..n]);
}
}
let avail = self.buffer.available();
if avail > 0 {
let mut tmp = vec![0u8; avail];
let read_count = self.buffer.read(&mut tmp);
writer.write_all(&tmp[..read_count])?;
total += read_count as u64;
calls += 1;
}
}
let result = SpliceResult {
bytes_transferred: total,
splice_calls: calls,
completed: true,
zero_copy: false,
};
self.result = result.clone();
Ok(result)
}
#[must_use]
pub fn cumulative_result(&self) -> &SpliceResult {
&self.result
}
#[must_use]
pub fn config(&self) -> &SpliceConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_splice_config_default() {
let cfg = SpliceConfig::default();
assert_eq!(cfg.pipe_buffer_size, 64 * 1024);
assert!(!cfg.non_blocking);
assert!(cfg.is_valid());
}
#[test]
fn test_splice_config_for_media() {
let cfg = SpliceConfig::for_media();
assert_eq!(cfg.pipe_buffer_size, 256 * 1024);
assert!(cfg.non_blocking);
assert!(cfg.hint_more);
assert!(cfg.is_valid());
}
#[test]
fn test_splice_config_for_metadata() {
let cfg = SpliceConfig::for_metadata();
assert_eq!(cfg.pipe_buffer_size, 4096);
assert!(!cfg.non_blocking);
assert!(cfg.is_valid());
}
#[test]
fn test_splice_result_default() {
let r = SpliceResult::new();
assert_eq!(r.bytes_transferred, 0);
assert_eq!(r.splice_calls, 0);
assert!(!r.completed);
assert!((r.avg_bytes_per_call() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_splice_result_display() {
let r = SpliceResult {
bytes_transferred: 1024,
splice_calls: 2,
completed: true,
zero_copy: false,
};
let s = r.to_string();
assert!(s.contains("1024"));
assert!(s.contains("2 calls"));
}
#[test]
fn test_pipe_buffer_new() {
let buf = PipeBuffer::new(128);
assert_eq!(buf.capacity(), 128);
assert_eq!(buf.available(), 0);
assert_eq!(buf.free_space(), 128);
assert!(buf.is_empty());
assert!(!buf.is_full());
}
#[test]
fn test_pipe_buffer_write_and_read() {
let mut buf = PipeBuffer::new(16);
let written = buf.write(b"hello");
assert_eq!(written, 5);
assert_eq!(buf.available(), 5);
let mut out = [0u8; 16];
let read = buf.read(&mut out);
assert_eq!(read, 5);
assert_eq!(&out[..5], b"hello");
assert!(buf.is_empty());
}
#[test]
fn test_pipe_buffer_full() {
let mut buf = PipeBuffer::new(4);
let written = buf.write(b"ABCDEF");
assert_eq!(written, 4);
assert!(buf.is_full());
assert_eq!(buf.free_space(), 0);
let w2 = buf.write(b"X");
assert_eq!(w2, 0);
}
#[test]
fn test_pipe_buffer_wrap_around() {
let mut buf = PipeBuffer::new(8);
buf.write(b"12345"); let mut out = [0u8; 3];
buf.read(&mut out); assert_eq!(&out, b"123");
buf.write(b"ABCDEF"); assert_eq!(buf.available(), 8);
assert!(buf.is_full());
}
#[test]
fn test_pipe_buffer_clear() {
let mut buf = PipeBuffer::new(32);
buf.write(b"data");
assert!(!buf.is_empty());
buf.clear();
assert!(buf.is_empty());
assert_eq!(buf.available(), 0);
}
#[test]
fn test_splice_pipe_transfer() {
let data = b"The quick brown fox jumps over the lazy dog";
let mut reader = Cursor::new(data.to_vec());
let mut writer: Vec<u8> = Vec::new();
let mut pipe = SplicePipe::with_defaults();
let result = pipe
.transfer(&mut reader, &mut writer, data.len() as u64)
.expect("operation should succeed");
assert_eq!(result.bytes_transferred, data.len() as u64);
assert!(result.completed);
assert_eq!(&writer, data);
}
#[test]
fn test_splice_pipe_transfer_with_limit() {
let data = vec![0xABu8; 1000];
let mut reader = Cursor::new(data);
let mut writer: Vec<u8> = Vec::new();
let mut pipe = SplicePipe::with_defaults();
let result = pipe
.transfer(&mut reader, &mut writer, 500)
.expect("transfer should succeed");
assert_eq!(result.bytes_transferred, 500);
assert_eq!(writer.len(), 500);
}
#[test]
fn test_splice_pipe_cumulative_result() {
let data = b"short";
let mut reader = Cursor::new(data.to_vec());
let mut writer: Vec<u8> = Vec::new();
let mut pipe = SplicePipe::with_defaults();
pipe.transfer(&mut reader, &mut writer, 100)
.expect("transfer should succeed");
let cum = pipe.cumulative_result();
assert!(cum.bytes_transferred > 0);
}
}