use std::fs::File;
use std::io;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
pub struct MmapRegion {
ptr: *const u8,
len: usize,
#[cfg(unix)]
_fd: i32,
}
unsafe impl Send for MmapRegion {}
unsafe impl Sync for MmapRegion {}
impl MmapRegion {
#[cfg(unix)]
pub fn new(file: &File) -> io::Result<Self> {
use std::ptr;
let metadata = file.metadata()?;
let len = metadata.len() as usize;
if len == 0 {
return Ok(Self {
ptr: ptr::null(),
len: 0,
_fd: file.as_raw_fd(),
});
}
let ptr = unsafe {
libc::mmap(
ptr::null_mut(),
len,
libc::PROT_READ,
libc::MAP_PRIVATE,
file.as_raw_fd(),
0,
)
};
if ptr == libc::MAP_FAILED {
return Err(io::Error::last_os_error());
}
Ok(Self {
ptr: ptr as *const u8,
len,
_fd: file.as_raw_fd(),
})
}
#[cfg(unix)]
pub fn new_with_readahead(file: &File) -> io::Result<Self> {
let region = Self::new(file)?;
if region.len > 0 {
unsafe {
libc::madvise(
region.ptr as *mut libc::c_void,
region.len,
libc::MADV_SEQUENTIAL,
);
}
}
Ok(region)
}
#[cfg(not(unix))]
pub fn new(file: &File) -> io::Result<Self> {
use std::io::Read;
let mut file = file;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let len = buffer.len();
let ptr = Box::into_raw(buffer.into_boxed_slice()) as *const u8;
Ok(Self { ptr, len })
}
#[cfg(not(unix))]
pub fn new_with_readahead(file: &File) -> io::Result<Self> {
Self::new(file)
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn as_slice(&self) -> &[u8] {
if self.ptr.is_null() || self.len == 0 {
return &[];
}
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
pub fn slice(&self, range: Range<usize>) -> Option<&[u8]> {
if range.end > self.len {
return None;
}
Some(&self.as_slice()[range])
}
#[cfg(unix)]
pub fn prefetch(&self, range: Range<usize>) {
if range.start >= self.len || self.ptr.is_null() {
return;
}
let end = range.end.min(self.len);
let ptr = unsafe { self.ptr.add(range.start) };
let len = end - range.start;
unsafe {
libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_WILLNEED);
}
}
#[cfg(not(unix))]
pub fn prefetch(&self, _range: Range<usize>) {
}
}
impl Drop for MmapRegion {
#[cfg(unix)]
fn drop(&mut self) {
if !self.ptr.is_null() && self.len > 0 {
unsafe {
libc::munmap(self.ptr as *mut libc::c_void, self.len);
}
}
}
#[cfg(not(unix))]
fn drop(&mut self) {
if !self.ptr.is_null() && self.len > 0 {
unsafe {
let slice = std::slice::from_raw_parts_mut(self.ptr as *mut u8, self.len);
drop(Box::from_raw(slice));
}
}
}
}
pub struct ZeroCopyIterator<'a> {
data: &'a [u8],
pos: usize,
chunk_size: usize,
stats: Arc<IteratorStats>,
}
impl<'a> ZeroCopyIterator<'a> {
pub fn new(data: &'a [u8], chunk_size: usize) -> Self {
Self {
data,
pos: 0,
chunk_size,
stats: Arc::new(IteratorStats::default()),
}
}
pub fn with_stats(data: &'a [u8], chunk_size: usize, stats: Arc<IteratorStats>) -> Self {
Self {
data,
pos: 0,
chunk_size,
stats,
}
}
pub fn remaining(&self) -> usize {
self.data.len().saturating_sub(self.pos)
}
pub fn seek(&mut self, pos: usize) -> bool {
if pos <= self.data.len() {
self.pos = pos;
true
} else {
false
}
}
pub fn stats(&self) -> &IteratorStats {
&self.stats
}
}
impl<'a> Iterator for ZeroCopyIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() {
return None;
}
let start = self.pos;
let end = (start + self.chunk_size).min(self.data.len());
self.pos = end;
self.stats.chunks_read.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_read
.fetch_add((end - start) as u64, Ordering::Relaxed);
Some(&self.data[start..end])
}
}
#[derive(Debug, Default)]
pub struct IteratorStats {
pub chunks_read: AtomicU64,
pub bytes_read: AtomicU64,
pub seeks: AtomicU64,
}
impl IteratorStats {
pub fn snapshot(&self) -> IteratorStatsSnapshot {
IteratorStatsSnapshot {
chunks_read: self.chunks_read.load(Ordering::Relaxed),
bytes_read: self.bytes_read.load(Ordering::Relaxed),
seeks: self.seeks.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct IteratorStatsSnapshot {
pub chunks_read: u64,
pub bytes_read: u64,
pub seeks: u64,
}
pub struct BlockIterator<'a> {
inner: ZeroCopyIterator<'a>,
block_index: usize,
}
impl<'a> BlockIterator<'a> {
pub fn new(data: &'a [u8], block_size: usize) -> Self {
Self {
inner: ZeroCopyIterator::new(data, block_size),
block_index: 0,
}
}
pub fn block_index(&self) -> usize {
self.block_index
}
pub fn skip_to_block(&mut self, index: usize) -> bool {
let pos = index * self.inner.chunk_size;
if self.inner.seek(pos) {
self.block_index = index;
true
} else {
false
}
}
}
impl<'a> Iterator for BlockIterator<'a> {
type Item = (usize, &'a [u8]);
fn next(&mut self) -> Option<Self::Item> {
let block = self.inner.next()?;
let index = self.block_index;
self.block_index += 1;
Some((index, block))
}
}
pub struct FilteredScan<'a, F>
where
F: Fn(&[u8]) -> bool,
{
inner: ZeroCopyIterator<'a>,
predicate: F,
}
impl<'a, F> FilteredScan<'a, F>
where
F: Fn(&[u8]) -> bool,
{
pub fn new(data: &'a [u8], chunk_size: usize, predicate: F) -> Self {
Self {
inner: ZeroCopyIterator::new(data, chunk_size),
predicate,
}
}
}
impl<'a, F> Iterator for FilteredScan<'a, F>
where
F: Fn(&[u8]) -> bool,
{
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
loop {
let chunk = self.inner.next()?;
if (self.predicate)(chunk) {
return Some(chunk);
}
}
}
}
#[derive(Debug, Clone)]
pub struct ParallelScanConfig {
pub num_readers: usize,
pub chunk_size: usize,
pub prefetch_distance: usize,
}
impl Default for ParallelScanConfig {
fn default() -> Self {
Self {
num_readers: 4,
chunk_size: 64 * 1024, prefetch_distance: 2,
}
}
}
pub struct RangeScanner {
total_len: usize,
range_size: usize,
current: usize,
total_ranges: usize,
}
impl RangeScanner {
pub fn new(total_len: usize, num_ranges: usize) -> Self {
let range_size = total_len.div_ceil(num_ranges.max(1));
let total_ranges = if total_len > 0 {
total_len.div_ceil(range_size)
} else {
0
};
Self {
total_len,
range_size,
current: 0,
total_ranges,
}
}
pub fn range(&self, index: usize) -> Option<Range<usize>> {
if index >= self.total_ranges {
return None;
}
let start = index * self.range_size;
let end = ((index + 1) * self.range_size).min(self.total_len);
Some(start..end)
}
pub fn total_ranges(&self) -> usize {
self.total_ranges
}
}
impl Iterator for RangeScanner {
type Item = Range<usize>;
fn next(&mut self) -> Option<Self::Item> {
let range = self.range(self.current)?;
self.current += 1;
Some(range)
}
}
pub fn open_for_scan(path: impl AsRef<Path>) -> io::Result<MmapRegion> {
let file = File::open(path)?;
MmapRegion::new_with_readahead(&file)
}
pub fn scan_file(path: impl AsRef<Path>, chunk_size: usize) -> io::Result<(MmapRegion, usize)> {
let region = open_for_scan(path)?;
Ok((region, chunk_size))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_test_file(data: &[u8]) -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
file.write_all(data).unwrap();
file.flush().unwrap();
file
}
#[test]
fn test_mmap_region_basic() {
let data = b"Hello, World! This is test data for mmap.";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
assert_eq!(region.len(), data.len());
assert_eq!(region.as_slice(), data);
}
#[test]
fn test_mmap_empty_file() {
let file = create_test_file(b"");
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
assert!(region.is_empty());
assert_eq!(region.as_slice(), &[] as &[u8]);
}
#[test]
fn test_mmap_slice() {
let data = b"0123456789ABCDEF";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
assert_eq!(region.slice(0..4), Some(&b"0123"[..]));
assert_eq!(region.slice(4..8), Some(&b"4567"[..]));
assert_eq!(region.slice(0..100), None);
}
#[test]
fn test_zero_copy_iterator() {
let data = b"AAAABBBBCCCCDDDD";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let iter = ZeroCopyIterator::new(region.as_slice(), 4);
let chunks: Vec<_> = iter.collect();
assert_eq!(chunks.len(), 4);
assert_eq!(chunks[0], b"AAAA");
assert_eq!(chunks[1], b"BBBB");
assert_eq!(chunks[2], b"CCCC");
assert_eq!(chunks[3], b"DDDD");
}
#[test]
fn test_iterator_uneven_chunks() {
let data = b"AAABBBCC";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let iter = ZeroCopyIterator::new(region.as_slice(), 3);
let chunks: Vec<_> = iter.collect();
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0], b"AAA");
assert_eq!(chunks[1], b"BBB");
assert_eq!(chunks[2], b"CC");
}
#[test]
fn test_iterator_stats() {
let data = b"AAAABBBBCCCC";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let iter = ZeroCopyIterator::new(region.as_slice(), 4);
let stats = Arc::clone(&iter.stats);
let _chunks: Vec<_> = iter.collect();
let snapshot = stats.snapshot();
assert_eq!(snapshot.chunks_read, 3);
assert_eq!(snapshot.bytes_read, 12);
}
#[test]
fn test_iterator_seek() {
let data = b"AAAABBBBCCCCDDDD";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
assert!(iter.seek(8));
assert_eq!(iter.next(), Some(&b"CCCC"[..]));
assert!(!iter.seek(100));
}
#[test]
fn test_block_iterator() {
let data = b"BLK1BLK2BLK3";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let iter = BlockIterator::new(region.as_slice(), 4);
let blocks: Vec<_> = iter.collect();
assert_eq!(blocks.len(), 3);
assert_eq!(blocks[0], (0, &b"BLK1"[..]));
assert_eq!(blocks[1], (1, &b"BLK2"[..]));
assert_eq!(blocks[2], (2, &b"BLK3"[..]));
}
#[test]
fn test_block_iterator_skip() {
let data = b"BLK1BLK2BLK3BLK4";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let mut iter = BlockIterator::new(region.as_slice(), 4);
assert!(iter.skip_to_block(2));
assert_eq!(iter.next(), Some((2, &b"BLK3"[..])));
}
#[test]
fn test_filtered_scan() {
let data = b"ABCDXXXXYYYY1234";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let scan = FilteredScan::new(region.as_slice(), 4, |chunk| {
!chunk.contains(&b'X') && !chunk.contains(&b'Y')
});
let matching: Vec<_> = scan.collect();
assert_eq!(matching.len(), 2);
assert_eq!(matching[0], b"ABCD");
assert_eq!(matching[1], b"1234");
}
#[test]
fn test_range_scanner() {
let scanner = RangeScanner::new(100, 4);
assert_eq!(scanner.total_ranges(), 4);
let ranges: Vec<_> = scanner.collect();
assert_eq!(ranges.len(), 4);
assert_eq!(ranges[0], 0..25);
assert_eq!(ranges[1], 25..50);
assert_eq!(ranges[2], 50..75);
assert_eq!(ranges[3], 75..100);
}
#[test]
fn test_range_scanner_uneven() {
let scanner = RangeScanner::new(10, 3);
let ranges: Vec<_> = scanner.collect();
assert_eq!(ranges.len(), 3);
assert!(ranges.last().unwrap().end == 10);
}
#[test]
fn test_range_scanner_empty() {
let scanner = RangeScanner::new(0, 4);
assert_eq!(scanner.total_ranges(), 0);
let ranges: Vec<_> = scanner.collect();
assert!(ranges.is_empty());
}
#[test]
fn test_parallel_scan_config() {
let config = ParallelScanConfig::default();
assert!(config.num_readers > 0);
assert!(config.chunk_size > 0);
}
#[test]
fn test_remaining_bytes() {
let data = b"AAAABBBBCCCC";
let file = create_test_file(data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
assert_eq!(iter.remaining(), 12);
iter.next();
assert_eq!(iter.remaining(), 8);
iter.next();
assert_eq!(iter.remaining(), 4);
iter.next();
assert_eq!(iter.remaining(), 0);
}
#[test]
fn test_mmap_with_readahead() {
let data = b"Test data for readahead mmap";
let file = create_test_file(data);
let region = MmapRegion::new_with_readahead(&File::open(file.path()).unwrap()).unwrap();
assert_eq!(region.len(), data.len());
assert_eq!(region.as_slice(), data);
}
#[test]
fn test_prefetch() {
let data = vec![0u8; 1024 * 1024]; let file = create_test_file(&data);
let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
region.prefetch(0..65536);
region.prefetch(65536..131072);
}
}