use bytes::{Bytes, BytesMut};
use super::error::RedexError;
pub(super) const MAX_SEGMENT_BYTES: usize = 3 * 1024 * 1024 * 1024;
#[derive(Debug)]
pub struct HeapSegment {
buf: Bytes,
base_offset: u64,
}
impl HeapSegment {
pub fn new() -> Self {
Self {
buf: Bytes::new(),
base_offset: 0,
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
buf: BytesMut::with_capacity(capacity).freeze(),
base_offset: 0,
}
}
#[cfg(feature = "redex-disk")]
pub(super) fn from_existing(buf: Vec<u8>) -> Self {
Self {
buf: Bytes::from(buf),
base_offset: 0,
}
}
fn take_mut_or_copy(&mut self, additional: usize) -> BytesMut {
match std::mem::take(&mut self.buf).try_into_mut() {
Ok(m) => m,
Err(bytes) => {
let mut m = BytesMut::with_capacity(bytes.len() + additional);
m.extend_from_slice(&bytes);
m
}
}
}
pub fn append(&mut self, payload: &[u8]) -> Result<u64, RedexError> {
if self.buf.len().saturating_add(payload.len()) > MAX_SEGMENT_BYTES {
return Err(RedexError::PayloadTooLarge {
size: payload.len(),
max: MAX_SEGMENT_BYTES.saturating_sub(self.buf.len()),
});
}
let offset = self.base_offset + self.buf.len() as u64;
let mut m = self.take_mut_or_copy(payload.len());
m.extend_from_slice(payload);
self.buf = m.freeze();
Ok(offset)
}
pub fn append_many(&mut self, payloads: &[Bytes]) -> Result<u64, RedexError> {
let total: usize = payloads.iter().map(|p| p.len()).sum();
if self.buf.len().saturating_add(total) > MAX_SEGMENT_BYTES {
return Err(RedexError::PayloadTooLarge {
size: total,
max: MAX_SEGMENT_BYTES.saturating_sub(self.buf.len()),
});
}
let first = self.base_offset + self.buf.len() as u64;
let mut m = self.take_mut_or_copy(total);
m.reserve(total);
for p in payloads {
m.extend_from_slice(p);
}
self.buf = m.freeze();
Ok(first)
}
pub fn read(&self, offset: u64, len: u32) -> Option<Bytes> {
let len = len as usize;
if offset < self.base_offset {
return None;
}
let rel = (offset - self.base_offset) as usize;
let end = rel.checked_add(len)?;
if end > self.buf.len() {
return None;
}
Some(self.buf.slice(rel..end))
}
pub fn live_bytes(&self) -> usize {
self.buf.len()
}
pub fn base_offset(&self) -> u64 {
self.base_offset
}
#[cfg(test)]
pub(super) fn force_base_offset(&mut self, base: u64) {
self.base_offset = base;
}
#[cfg(feature = "redex-disk")]
pub(super) fn rebase_to_zero(&mut self) {
self.base_offset = 0;
}
#[cfg(test)]
pub(super) fn with_bytes_for_test_mut<F>(&mut self, f: F)
where
F: FnOnce(&mut [u8]),
{
let mut m = self.take_mut_or_copy(0);
f(&mut m);
self.buf = m.freeze();
}
pub fn evict_prefix_to(&mut self, new_base: u64) -> u64 {
if new_base <= self.base_offset {
return 0;
}
let delta = (new_base - self.base_offset) as usize;
let delta = delta.min(self.buf.len());
let mut m = BytesMut::with_capacity(self.buf.len() - delta);
m.extend_from_slice(&self.buf[delta..]);
self.buf = m.freeze();
self.base_offset += delta as u64;
delta as u64
}
}
impl Default for HeapSegment {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_append_and_read() {
let mut seg = HeapSegment::new();
let o1 = seg.append(b"hello").unwrap();
let o2 = seg.append(b"world").unwrap();
assert_eq!(o1, 0);
assert_eq!(o2, 5);
assert_eq!(seg.read(o1, 5).unwrap().as_ref(), b"hello");
assert_eq!(seg.read(o2, 5).unwrap().as_ref(), b"world");
}
#[test]
fn test_read_out_of_range_returns_none() {
let mut seg = HeapSegment::new();
seg.append(b"abc").unwrap();
assert!(seg.read(10, 3).is_none()); assert!(seg.read(0, 10).is_none()); }
#[test]
fn test_evict_prefix() {
let mut seg = HeapSegment::new();
seg.append(b"aaaa").unwrap();
let o2 = seg.append(b"bbbb").unwrap();
assert_eq!(o2, 4);
let evicted = seg.evict_prefix_to(4);
assert_eq!(evicted, 4);
assert_eq!(seg.base_offset(), 4);
assert_eq!(seg.live_bytes(), 4);
assert!(seg.read(0, 4).is_none());
assert_eq!(seg.read(o2, 4).unwrap().as_ref(), b"bbbb");
}
#[test]
fn test_evict_below_base_is_noop() {
let mut seg = HeapSegment::new();
seg.append(b"abc").unwrap();
seg.evict_prefix_to(10);
assert_eq!(seg.base_offset(), 3);
assert_eq!(seg.evict_prefix_to(1), 0);
}
#[test]
fn test_evict_beyond_live_clamps() {
let mut seg = HeapSegment::new();
seg.append(b"xyz").unwrap();
let evicted = seg.evict_prefix_to(100);
assert_eq!(evicted, 3);
assert_eq!(seg.base_offset(), 3);
assert_eq!(seg.live_bytes(), 0);
}
#[test]
fn test_append_many_basic() {
let mut seg = HeapSegment::new();
let pre = seg.append(b"prefix").unwrap();
assert_eq!(pre, 0);
let payloads: Vec<Bytes> = vec![
Bytes::from_static(b"alpha"),
Bytes::from_static(b"beta"),
Bytes::from_static(b"gamma"),
];
let first = seg.append_many(&payloads).unwrap();
assert_eq!(first, 6, "first payload offset == prefix len");
assert_eq!(seg.read(6, 5).unwrap().as_ref(), b"alpha");
assert_eq!(seg.read(11, 4).unwrap().as_ref(), b"beta");
assert_eq!(seg.read(15, 5).unwrap().as_ref(), b"gamma");
assert_eq!(seg.live_bytes(), 6 + 5 + 4 + 5);
}
#[test]
fn test_append_many_capacity_exceeded() {
let mut seg = HeapSegment::new();
let big = vec![0u8; 1024 * 1024 * 1024];
seg.append(&big).unwrap();
seg.append(&big).unwrap();
seg.append(&big).unwrap();
let payloads: Vec<Bytes> = vec![Bytes::from_static(b"x"), Bytes::from_static(b"y")];
assert!(matches!(
seg.append_many(&payloads),
Err(RedexError::PayloadTooLarge { .. })
));
assert_eq!(seg.live_bytes(), MAX_SEGMENT_BYTES);
}
#[test]
fn read_returns_zero_copy_slice_of_underlying_buffer() {
let mut seg = HeapSegment::new();
let payload = b"the quick brown fox jumps over the lazy dog";
seg.append(payload).unwrap();
let buf_ptr = seg.buf.as_ptr();
let slice = seg.read(0, payload.len() as u32).unwrap();
assert_eq!(
slice.as_ptr(),
buf_ptr,
"read(0, len) must return a zero-copy slice of the segment buffer",
);
assert_eq!(slice.as_ref(), payload);
let sub = seg.read(4, 5).unwrap();
let sub_offset = (sub.as_ptr() as usize).wrapping_sub(buf_ptr as usize);
assert_eq!(
sub_offset, 4,
"sub-range read must be a slice into the original buffer at offset 4; \
got offset {sub_offset} (a regression here means read deep-copies)",
);
assert_eq!(sub.as_ref(), b"quick");
}
#[test]
fn read_zero_copy_pin_detects_deep_copy_via_wrapping_sub() {
let mut seg = HeapSegment::new();
let payload = b"the quick brown fox jumps over the lazy dog";
seg.append(payload).unwrap();
let buf_ptr = seg.buf.as_ptr();
let fake_deep_copy = bytes::Bytes::copy_from_slice(&payload[4..9]);
assert_eq!(fake_deep_copy.as_ref(), b"quick");
let fake_offset = (fake_deep_copy.as_ptr() as usize).wrapping_sub(buf_ptr as usize);
assert_ne!(
fake_offset, 4,
"wrapping_sub form must distinguish a fresh allocation \
from a same-allocation sub-slice — if these collide \
the zero-copy pin would no longer detect a deep-copy \
regression",
);
let _well_defined: usize = fake_offset;
}
#[test]
fn test_append_many_empty_returns_current_end() {
let mut seg = HeapSegment::new();
seg.append(b"xyz").unwrap();
let off = seg.append_many(&[]).unwrap();
assert_eq!(off, 3);
assert_eq!(seg.live_bytes(), 3);
}
}