use bytes::{Bytes, BytesMut};
use std::alloc::{alloc, dealloc, Layout};
use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
const CACHE_LINE_SIZE: usize = 64;
const DEFAULT_BUFFER_SIZE: usize = 256 * 1024;
#[derive(Debug)]
pub struct ZeroCopyBuffer {
data: NonNull<u8>,
capacity: usize,
write_pos: AtomicUsize,
id: u64,
layout: Layout,
}
unsafe impl Send for ZeroCopyBuffer {}
unsafe impl Sync for ZeroCopyBuffer {}
impl ZeroCopyBuffer {
pub fn new(capacity: usize) -> Self {
Self::with_id(capacity, 0)
}
pub fn with_id(capacity: usize, id: u64) -> Self {
let capacity = capacity.max(CACHE_LINE_SIZE);
let aligned_capacity = (capacity + CACHE_LINE_SIZE - 1) & !(CACHE_LINE_SIZE - 1);
let layout = Layout::from_size_align(aligned_capacity, CACHE_LINE_SIZE)
.expect("BUG: aligned_capacity is always >= CACHE_LINE_SIZE and a multiple of it");
let data = unsafe {
let ptr = alloc(layout);
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
NonNull::new_unchecked(ptr)
};
Self {
data,
capacity: aligned_capacity,
write_pos: AtomicUsize::new(0),
id,
layout,
}
}
pub fn reserve(self: &Arc<Self>, len: usize) -> Option<BufferSlice> {
loop {
let current = self.write_pos.load(Ordering::Acquire);
let new_pos = current + len;
if new_pos > self.capacity {
return None;
}
if self
.write_pos
.compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return Some(BufferSlice::new(Arc::clone(self), current, len));
}
std::hint::spin_loop();
}
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_slice(&self, offset: usize, len: usize) -> &mut [u8] {
assert!(
offset + len <= self.capacity,
"get_mut_slice out of bounds: offset={} len={} capacity={}",
offset,
len,
self.capacity
);
std::slice::from_raw_parts_mut(self.data.as_ptr().add(offset), len)
}
pub fn get_slice(&self, offset: usize, len: usize) -> &[u8] {
let write_pos = self.write_pos.load(Ordering::Acquire);
assert!(
offset + len <= write_pos,
"get_slice out of bounds: offset={} len={} write_pos={}",
offset,
len,
write_pos
);
unsafe { std::slice::from_raw_parts(self.data.as_ptr().add(offset), len) }
}
pub fn len(&self) -> usize {
self.write_pos.load(Ordering::Acquire)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn remaining(&self) -> usize {
self.capacity - self.len()
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn id(&self) -> u64 {
self.id
}
pub(crate) fn reset(&self) -> bool {
self.write_pos.store(0, Ordering::Release);
true
}
pub fn try_allocate(&self, len: usize) -> Option<usize> {
loop {
let current = self.write_pos.load(Ordering::Acquire);
let new_pos = current + len;
if new_pos > self.capacity {
return None;
}
if self
.write_pos
.compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return Some(current);
}
std::hint::spin_loop();
}
}
pub fn freeze(self: &Arc<Self>) -> Bytes {
let len = self.len();
if len == 0 {
return Bytes::new();
}
let owner = OwnedBufferSlice {
buffer: Arc::clone(self),
len,
};
Bytes::from_owner(owner)
}
}
struct OwnedBufferSlice {
buffer: Arc<ZeroCopyBuffer>,
len: usize,
}
unsafe impl Send for OwnedBufferSlice {}
unsafe impl Sync for OwnedBufferSlice {}
impl AsRef<[u8]> for OwnedBufferSlice {
fn as_ref(&self) -> &[u8] {
self.buffer.get_slice(0, self.len)
}
}
impl Drop for ZeroCopyBuffer {
fn drop(&mut self) {
unsafe {
dealloc(self.data.as_ptr(), self.layout);
}
}
}
#[derive(Debug)]
pub struct BufferSlice {
buffer: Arc<ZeroCopyBuffer>,
offset: usize,
len: usize,
}
impl BufferSlice {
pub(crate) fn new(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
Self {
buffer,
offset,
len,
}
}
pub fn as_bytes(&self) -> &[u8] {
self.buffer.get_slice(self.offset, self.len)
}
pub unsafe fn as_mut_bytes(&mut self) -> &mut [u8] {
self.buffer.get_mut_slice(self.offset, self.len)
}
pub fn write(&mut self, data: &[u8]) -> usize {
let write_len = data.len().min(self.len);
unsafe {
let dest = self.as_mut_bytes();
dest[..write_len].copy_from_slice(&data[..write_len]);
}
write_len
}
pub fn len(&self) -> usize {
self.len
}
pub fn share(&self) -> BufferRef {
BufferRef::Slice {
buffer: Arc::clone(&self.buffer),
offset: self.offset,
len: self.len,
}
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn to_bytes(&self) -> Bytes {
Bytes::copy_from_slice(self.as_bytes())
}
}
impl Deref for BufferSlice {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_bytes()
}
}
impl AsRef<[u8]> for BufferSlice {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
pub struct ZeroCopyBufferPool {
free_buffers: crossbeam_channel::Sender<Arc<ZeroCopyBuffer>>,
buffer_receiver: crossbeam_channel::Receiver<Arc<ZeroCopyBuffer>>,
buffer_size: usize,
next_id: AtomicU64,
total_created: AtomicU64,
in_use: AtomicU64,
max_buffers: u64,
}
impl ZeroCopyBufferPool {
pub fn new(buffer_size: usize, initial_count: usize) -> Self {
Self::with_max_buffers(buffer_size, initial_count, (initial_count * 4) as u64)
}
pub fn with_max_buffers(buffer_size: usize, initial_count: usize, max_buffers: u64) -> Self {
let (tx, rx) = crossbeam_channel::bounded(initial_count * 2);
let pool = Self {
free_buffers: tx,
buffer_receiver: rx,
buffer_size,
next_id: AtomicU64::new(0),
total_created: AtomicU64::new(0),
in_use: AtomicU64::new(0),
max_buffers: max_buffers.max(initial_count as u64),
};
for _ in 0..initial_count {
let id = pool.next_id.fetch_add(1, Ordering::Relaxed);
let buffer = Arc::new(ZeroCopyBuffer::with_id(buffer_size, id));
pool.total_created.fetch_add(1, Ordering::Relaxed);
let _ = pool.free_buffers.try_send(buffer);
}
pool
}
pub fn acquire(&self) -> Option<Arc<ZeroCopyBuffer>> {
match self.buffer_receiver.try_recv() {
Ok(buffer) => {
if Arc::strong_count(&buffer) == 1 {
buffer.reset();
}
self.in_use.fetch_add(1, Ordering::Relaxed);
Some(buffer)
}
Err(_) => {
let prev = self.total_created.fetch_add(1, Ordering::Relaxed);
if prev >= self.max_buffers {
self.total_created.fetch_sub(1, Ordering::Relaxed);
tracing::warn!(
total_created = prev,
max_buffers = self.max_buffers,
in_use = self.in_use.load(Ordering::Relaxed),
"Buffer pool exhausted — apply back-pressure"
);
return None;
}
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let buffer = Arc::new(ZeroCopyBuffer::with_id(self.buffer_size, id));
self.in_use.fetch_add(1, Ordering::Relaxed);
Some(buffer)
}
}
}
pub fn release(&self, buffer: Arc<ZeroCopyBuffer>) {
self.in_use.fetch_sub(1, Ordering::Relaxed);
if Arc::strong_count(&buffer) == 1 {
buffer.reset();
let _ = self.free_buffers.try_send(buffer);
}
}
pub fn stats(&self) -> BufferPoolStats {
BufferPoolStats {
buffer_size: self.buffer_size,
total_created: self.total_created.load(Ordering::Relaxed),
in_use: self.in_use.load(Ordering::Relaxed),
available: self.buffer_receiver.len() as u64,
}
}
}
#[derive(Debug, Clone)]
pub struct BufferPoolStats {
pub buffer_size: usize,
pub total_created: u64,
pub in_use: u64,
pub available: u64,
}
#[derive(Debug)]
pub struct ZeroCopyMessage {
pub topic: Arc<str>,
pub partition: u32,
pub key: Option<BufferRef>,
pub value: BufferRef,
pub headers: Vec<(Arc<str>, BufferRef)>,
pub timestamp: i64,
}
#[derive(Debug, Clone)]
pub enum BufferRef {
Inline(SmallVec),
External(Bytes),
Slice {
buffer: Arc<ZeroCopyBuffer>,
offset: usize,
len: usize,
},
}
impl BufferRef {
pub fn from_bytes(data: &[u8]) -> Self {
if data.len() <= 64 {
BufferRef::Inline(SmallVec::from_slice(data))
} else {
BufferRef::External(Bytes::copy_from_slice(data))
}
}
pub fn from_external(data: Bytes) -> Self {
if data.len() <= 64 {
BufferRef::Inline(SmallVec::from_slice(&data))
} else {
BufferRef::External(data)
}
}
pub fn from_slice(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
if len <= 64 {
let data = buffer.get_slice(offset, len);
BufferRef::Inline(SmallVec::from_slice(data))
} else {
BufferRef::Slice {
buffer,
offset,
len,
}
}
}
pub fn as_bytes(&self) -> &[u8] {
match self {
BufferRef::Inline(sv) => sv.as_slice(),
BufferRef::External(b) => b,
BufferRef::Slice {
buffer,
offset,
len,
} => buffer.get_slice(*offset, *len),
}
}
pub fn len(&self) -> usize {
match self {
BufferRef::Inline(sv) => sv.len(),
BufferRef::External(b) => b.len(),
BufferRef::Slice { len, .. } => *len,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn to_bytes(&self) -> Bytes {
match self {
BufferRef::Inline(sv) => Bytes::copy_from_slice(sv.as_slice()),
BufferRef::External(b) => b.clone(),
BufferRef::Slice {
buffer,
offset,
len,
} => Bytes::copy_from_slice(buffer.get_slice(*offset, *len)),
}
}
}
impl AsRef<[u8]> for BufferRef {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
#[derive(Debug, Clone)]
pub struct SmallVec {
data: [u8; 64],
len: u8,
}
impl SmallVec {
pub fn new() -> Self {
Self {
data: [0u8; 64],
len: 0,
}
}
pub fn from_slice(slice: &[u8]) -> Self {
let len = slice.len().min(64);
let mut sv = Self::new();
sv.data[..len].copy_from_slice(&slice[..len]);
sv.len = len as u8;
sv
}
pub fn as_slice(&self) -> &[u8] {
&self.data[..self.len as usize]
}
pub fn len(&self) -> usize {
self.len as usize
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
impl Default for SmallVec {
fn default() -> Self {
Self::new()
}
}
pub struct ZeroCopyProducer {
buffer_pool: Arc<ZeroCopyBufferPool>,
current_buffer: parking_lot::Mutex<Option<Arc<ZeroCopyBuffer>>>,
topic_cache: dashmap::DashMap<String, Arc<str>>,
stats: ProducerStats,
}
impl ZeroCopyProducer {
pub fn new(buffer_pool: Arc<ZeroCopyBufferPool>) -> Self {
Self {
buffer_pool,
current_buffer: parking_lot::Mutex::new(None),
topic_cache: dashmap::DashMap::new(),
stats: ProducerStats::new(),
}
}
pub fn with_defaults() -> Self {
let pool = Arc::new(ZeroCopyBufferPool::new(DEFAULT_BUFFER_SIZE, 16));
Self::new(pool)
}
fn intern_topic(&self, topic: &str) -> Arc<str> {
if let Some(interned) = self.topic_cache.get(topic) {
return interned.clone();
}
let interned: Arc<str> = Arc::from(topic);
self.topic_cache.insert(topic.to_string(), interned.clone());
interned
}
pub fn create_message(
&self,
topic: &str,
partition: u32,
key: Option<&[u8]>,
value: &[u8],
) -> ZeroCopyMessage {
self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_written
.fetch_add(value.len() as u64, Ordering::Relaxed);
let topic = self.intern_topic(topic);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
ZeroCopyMessage {
topic,
partition,
key: key.map(BufferRef::from_bytes),
value: BufferRef::from_bytes(value),
headers: Vec::new(),
timestamp,
}
}
pub fn create_message_from_bytes(
&self,
topic: &str,
partition: u32,
key: Option<Bytes>,
value: Bytes,
) -> ZeroCopyMessage {
self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_written
.fetch_add(value.len() as u64, Ordering::Relaxed);
let topic = self.intern_topic(topic);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
ZeroCopyMessage {
topic,
partition,
key: key.map(BufferRef::from_external),
value: BufferRef::from_external(value),
headers: Vec::new(),
timestamp,
}
}
pub fn allocate(&self, size: usize) -> Option<(Arc<ZeroCopyBuffer>, usize)> {
let mut guard = self.current_buffer.lock();
if let Some(ref buffer) = *guard {
if let Some(offset) = buffer.try_allocate(size) {
return Some((buffer.clone(), offset));
}
}
let buffer = self.buffer_pool.acquire()?;
if let Some(offset) = buffer.try_allocate(size) {
*guard = Some(buffer.clone());
return Some((buffer, offset));
}
None
}
pub fn stats(&self) -> ProducerStatsSnapshot {
ProducerStatsSnapshot {
messages_created: self.stats.messages_created.load(Ordering::Relaxed),
bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
buffer_pool: self.buffer_pool.stats(),
}
}
}
struct ProducerStats {
messages_created: AtomicU64,
bytes_written: AtomicU64,
}
impl ProducerStats {
fn new() -> Self {
Self {
messages_created: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ProducerStatsSnapshot {
pub messages_created: u64,
pub bytes_written: u64,
pub buffer_pool: BufferPoolStats,
}
pub struct ZeroCopyConsumer {
read_buffer: parking_lot::Mutex<BytesMut>,
stats: ConsumerStats,
}
impl ZeroCopyConsumer {
pub fn new() -> Self {
Self {
read_buffer: parking_lot::Mutex::new(BytesMut::with_capacity(DEFAULT_BUFFER_SIZE)),
stats: ConsumerStats::new(),
}
}
pub fn parse_messages(&self, data: Bytes) -> Vec<ConsumedMessage> {
let mut messages = Vec::new();
let mut offset = 0;
while offset < data.len() {
if offset + 20 > data.len() {
break;
}
let msg_len = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
if offset + 4 + msg_len > data.len() {
break;
}
let msg_data = data.slice(offset + 4..offset + 4 + msg_len);
if let Some(msg) = self.parse_single_message(msg_data) {
messages.push(msg);
self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
}
offset += 4 + msg_len;
}
self.stats
.bytes_read
.fetch_add(offset as u64, Ordering::Relaxed);
messages
}
fn parse_single_message(&self, data: Bytes) -> Option<ConsumedMessage> {
if data.len() < 16 {
return None;
}
let msg_offset = u64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let timestamp = i64::from_be_bytes([
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
]);
let value = data.slice(16..);
Some(ConsumedMessage {
offset: msg_offset,
timestamp,
key: None,
value,
})
}
pub fn stats(&self) -> ConsumerStatsSnapshot {
ConsumerStatsSnapshot {
messages_consumed: self.stats.messages_consumed.load(Ordering::Relaxed),
bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
}
}
pub fn buffer_data(&self, data: &[u8]) -> Bytes {
let mut buffer = self.read_buffer.lock();
buffer.clear();
buffer.extend_from_slice(data);
buffer.clone().freeze()
}
}
impl Default for ZeroCopyConsumer {
fn default() -> Self {
Self::new()
}
}
struct ConsumerStats {
messages_consumed: AtomicU64,
bytes_read: AtomicU64,
}
impl ConsumerStats {
fn new() -> Self {
Self {
messages_consumed: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ConsumerStatsSnapshot {
pub messages_consumed: u64,
pub bytes_read: u64,
}
#[derive(Debug, Clone)]
pub struct ConsumedMessage {
pub offset: u64,
pub timestamp: i64,
pub key: Option<Bytes>,
pub value: Bytes,
}
impl ConsumedMessage {
pub fn value_str(&self) -> Option<&str> {
std::str::from_utf8(&self.value).ok()
}
pub fn key_str(&self) -> Option<&str> {
self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_copy_buffer_basic() {
let buffer = Arc::new(ZeroCopyBuffer::new(1024));
assert_eq!(buffer.len(), 0);
assert!(buffer.remaining() >= 1024);
let slice = buffer.reserve(100).unwrap();
assert_eq!(slice.len(), 100);
assert_eq!(buffer.len(), 100);
}
#[test]
fn test_zero_copy_buffer_write() {
let buffer = Arc::new(ZeroCopyBuffer::new(1024));
let mut slice = buffer.reserve(11).unwrap();
slice.write(b"Hello World");
assert_eq!(slice.as_bytes(), b"Hello World");
}
#[test]
fn test_buffer_pool() {
let pool = ZeroCopyBufferPool::new(1024, 4);
let stats = pool.stats();
assert_eq!(stats.total_created, 4);
assert_eq!(stats.available, 4);
let b1 = pool.acquire().unwrap();
let b2 = pool.acquire().unwrap();
let stats = pool.stats();
assert_eq!(stats.in_use, 2);
pool.release(b1);
pool.release(b2);
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
}
#[test]
fn test_buffer_ref_inline() {
let small_data = b"small";
let buf_ref = BufferRef::from_bytes(small_data);
match buf_ref {
BufferRef::Inline(_) => {}
_ => panic!("Expected inline storage for small data"),
}
assert_eq!(buf_ref.as_bytes(), small_data);
}
#[test]
fn test_buffer_ref_external() {
let large_data = vec![0u8; 100];
let buf_ref = BufferRef::from_bytes(&large_data);
match buf_ref {
BufferRef::External(_) => {}
_ => panic!("Expected external storage for large data"),
}
assert_eq!(buf_ref.len(), 100);
}
#[test]
fn test_zero_copy_producer() {
let producer = ZeroCopyProducer::with_defaults();
let msg = producer.create_message("test-topic", 0, Some(b"key1"), b"value1");
assert_eq!(&*msg.topic, "test-topic");
assert_eq!(msg.partition, 0);
assert_eq!(msg.key.unwrap().as_bytes(), b"key1");
assert_eq!(msg.value.as_bytes(), b"value1");
let stats = producer.stats();
assert_eq!(stats.messages_created, 1);
}
#[test]
fn test_zero_copy_consumer() {
let consumer = ZeroCopyConsumer::new();
let mut data = BytesMut::new();
data.extend_from_slice(&21u32.to_be_bytes());
data.extend_from_slice(&42u64.to_be_bytes());
data.extend_from_slice(&1234567890i64.to_be_bytes());
data.extend_from_slice(b"hello");
let messages = consumer.parse_messages(data.freeze());
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].offset, 42);
assert_eq!(messages[0].timestamp, 1234567890);
assert_eq!(&messages[0].value[..], b"hello");
}
#[test]
fn test_small_vec() {
let sv = SmallVec::from_slice(b"test data");
assert_eq!(sv.as_slice(), b"test data");
assert_eq!(sv.len(), 9);
}
#[test]
fn test_topic_interning() {
let producer = ZeroCopyProducer::with_defaults();
let msg1 = producer.create_message("topic-a", 0, None, b"v1");
let msg2 = producer.create_message("topic-a", 0, None, b"v2");
let msg3 = producer.create_message("topic-b", 0, None, b"v3");
assert!(Arc::ptr_eq(&msg1.topic, &msg2.topic));
assert!(!Arc::ptr_eq(&msg1.topic, &msg3.topic));
}
}