1use std::fs::File;
56use std::io;
57use std::ops::Range;
58use std::path::Path;
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62#[cfg(unix)]
63use std::os::unix::io::AsRawFd;
64
65pub struct MmapRegion {
67 ptr: *const u8,
69 len: usize,
71 #[cfg(unix)]
73 _fd: i32,
74}
75
76unsafe impl Send for MmapRegion {}
79unsafe impl Sync for MmapRegion {}
80
81impl MmapRegion {
82 #[cfg(unix)]
84 pub fn new(file: &File) -> io::Result<Self> {
85 use std::ptr;
86
87 let metadata = file.metadata()?;
88 let len = metadata.len() as usize;
89
90 if len == 0 {
91 return Ok(Self {
92 ptr: ptr::null(),
93 len: 0,
94 _fd: file.as_raw_fd(),
95 });
96 }
97
98 let ptr = unsafe {
100 libc::mmap(
101 ptr::null_mut(),
102 len,
103 libc::PROT_READ,
104 libc::MAP_PRIVATE,
105 file.as_raw_fd(),
106 0,
107 )
108 };
109
110 if ptr == libc::MAP_FAILED {
111 return Err(io::Error::last_os_error());
112 }
113
114 Ok(Self {
115 ptr: ptr as *const u8,
116 len,
117 _fd: file.as_raw_fd(),
118 })
119 }
120
121 #[cfg(unix)]
123 pub fn new_with_readahead(file: &File) -> io::Result<Self> {
124 let region = Self::new(file)?;
125
126 if region.len > 0 {
127 unsafe {
129 libc::madvise(
130 region.ptr as *mut libc::c_void,
131 region.len,
132 libc::MADV_SEQUENTIAL,
133 );
134 }
135 }
136
137 Ok(region)
138 }
139
140 #[cfg(not(unix))]
142 pub fn new(file: &File) -> io::Result<Self> {
143 use std::io::Read;
144 let mut file = file;
145 let mut buffer = Vec::new();
146 file.read_to_end(&mut buffer)?;
147
148 let len = buffer.len();
149 let ptr = Box::into_raw(buffer.into_boxed_slice()) as *const u8;
150
151 Ok(Self { ptr, len })
152 }
153
154 #[cfg(not(unix))]
155 pub fn new_with_readahead(file: &File) -> io::Result<Self> {
156 Self::new(file)
157 }
158
159 pub fn len(&self) -> usize {
161 self.len
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.len == 0
167 }
168
169 pub fn as_slice(&self) -> &[u8] {
175 if self.ptr.is_null() || self.len == 0 {
176 return &[];
177 }
178 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
180 }
181
182 pub fn slice(&self, range: Range<usize>) -> Option<&[u8]> {
184 if range.end > self.len {
185 return None;
186 }
187 Some(&self.as_slice()[range])
188 }
189
190 #[cfg(unix)]
192 pub fn prefetch(&self, range: Range<usize>) {
193 if range.start >= self.len || self.ptr.is_null() {
194 return;
195 }
196
197 let end = range.end.min(self.len);
198 let ptr = unsafe { self.ptr.add(range.start) };
199 let len = end - range.start;
200
201 unsafe {
202 libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_WILLNEED);
203 }
204 }
205
206 #[cfg(not(unix))]
207 pub fn prefetch(&self, _range: Range<usize>) {
208 }
210}
211
212impl Drop for MmapRegion {
213 #[cfg(unix)]
214 fn drop(&mut self) {
215 if !self.ptr.is_null() && self.len > 0 {
216 unsafe {
217 libc::munmap(self.ptr as *mut libc::c_void, self.len);
218 }
219 }
220 }
221
222 #[cfg(not(unix))]
223 fn drop(&mut self) {
224 if !self.ptr.is_null() && self.len > 0 {
225 unsafe {
227 let slice = std::slice::from_raw_parts_mut(self.ptr as *mut u8, self.len);
228 drop(Box::from_raw(slice));
229 }
230 }
231 }
232}
233
234pub struct ZeroCopyIterator<'a> {
236 data: &'a [u8],
238 pos: usize,
240 chunk_size: usize,
242 stats: Arc<IteratorStats>,
244}
245
246impl<'a> ZeroCopyIterator<'a> {
247 pub fn new(data: &'a [u8], chunk_size: usize) -> Self {
249 Self {
250 data,
251 pos: 0,
252 chunk_size,
253 stats: Arc::new(IteratorStats::default()),
254 }
255 }
256
257 pub fn with_stats(data: &'a [u8], chunk_size: usize, stats: Arc<IteratorStats>) -> Self {
259 Self {
260 data,
261 pos: 0,
262 chunk_size,
263 stats,
264 }
265 }
266
267 pub fn remaining(&self) -> usize {
269 self.data.len().saturating_sub(self.pos)
270 }
271
272 pub fn seek(&mut self, pos: usize) -> bool {
274 if pos <= self.data.len() {
275 self.pos = pos;
276 true
277 } else {
278 false
279 }
280 }
281
282 pub fn stats(&self) -> &IteratorStats {
284 &self.stats
285 }
286}
287
288impl<'a> Iterator for ZeroCopyIterator<'a> {
289 type Item = &'a [u8];
290
291 fn next(&mut self) -> Option<Self::Item> {
292 if self.pos >= self.data.len() {
293 return None;
294 }
295
296 let start = self.pos;
297 let end = (start + self.chunk_size).min(self.data.len());
298 self.pos = end;
299
300 self.stats.chunks_read.fetch_add(1, Ordering::Relaxed);
301 self.stats
302 .bytes_read
303 .fetch_add((end - start) as u64, Ordering::Relaxed);
304
305 Some(&self.data[start..end])
306 }
307}
308
309#[derive(Debug, Default)]
311pub struct IteratorStats {
312 pub chunks_read: AtomicU64,
313 pub bytes_read: AtomicU64,
314 pub seeks: AtomicU64,
315}
316
317impl IteratorStats {
318 pub fn snapshot(&self) -> IteratorStatsSnapshot {
319 IteratorStatsSnapshot {
320 chunks_read: self.chunks_read.load(Ordering::Relaxed),
321 bytes_read: self.bytes_read.load(Ordering::Relaxed),
322 seeks: self.seeks.load(Ordering::Relaxed),
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
328pub struct IteratorStatsSnapshot {
329 pub chunks_read: u64,
330 pub bytes_read: u64,
331 pub seeks: u64,
332}
333
334pub struct BlockIterator<'a> {
336 inner: ZeroCopyIterator<'a>,
338 block_index: usize,
340}
341
342impl<'a> BlockIterator<'a> {
343 pub fn new(data: &'a [u8], block_size: usize) -> Self {
344 Self {
345 inner: ZeroCopyIterator::new(data, block_size),
346 block_index: 0,
347 }
348 }
349
350 pub fn block_index(&self) -> usize {
352 self.block_index
353 }
354
355 pub fn skip_to_block(&mut self, index: usize) -> bool {
357 let pos = index * self.inner.chunk_size;
358 if self.inner.seek(pos) {
359 self.block_index = index;
360 true
361 } else {
362 false
363 }
364 }
365}
366
367impl<'a> Iterator for BlockIterator<'a> {
368 type Item = (usize, &'a [u8]);
369
370 fn next(&mut self) -> Option<Self::Item> {
371 let block = self.inner.next()?;
372 let index = self.block_index;
373 self.block_index += 1;
374 Some((index, block))
375 }
376}
377
378pub struct FilteredScan<'a, F>
380where
381 F: Fn(&[u8]) -> bool,
382{
383 inner: ZeroCopyIterator<'a>,
384 predicate: F,
385}
386
387impl<'a, F> FilteredScan<'a, F>
388where
389 F: Fn(&[u8]) -> bool,
390{
391 pub fn new(data: &'a [u8], chunk_size: usize, predicate: F) -> Self {
392 Self {
393 inner: ZeroCopyIterator::new(data, chunk_size),
394 predicate,
395 }
396 }
397}
398
399impl<'a, F> Iterator for FilteredScan<'a, F>
400where
401 F: Fn(&[u8]) -> bool,
402{
403 type Item = &'a [u8];
404
405 fn next(&mut self) -> Option<Self::Item> {
406 loop {
407 let chunk = self.inner.next()?;
408 if (self.predicate)(chunk) {
409 return Some(chunk);
410 }
411 }
412 }
413}
414
415#[derive(Debug, Clone)]
417pub struct ParallelScanConfig {
418 pub num_readers: usize,
420 pub chunk_size: usize,
422 pub prefetch_distance: usize,
424}
425
426impl Default for ParallelScanConfig {
427 fn default() -> Self {
428 Self {
429 num_readers: 4,
430 chunk_size: 64 * 1024, prefetch_distance: 2,
432 }
433 }
434}
435
436pub struct RangeScanner {
438 total_len: usize,
440 range_size: usize,
442 current: usize,
444 total_ranges: usize,
446}
447
448impl RangeScanner {
449 pub fn new(total_len: usize, num_ranges: usize) -> Self {
451 let range_size = total_len.div_ceil(num_ranges.max(1));
452 let total_ranges = if total_len > 0 {
453 total_len.div_ceil(range_size)
454 } else {
455 0
456 };
457
458 Self {
459 total_len,
460 range_size,
461 current: 0,
462 total_ranges,
463 }
464 }
465
466 pub fn range(&self, index: usize) -> Option<Range<usize>> {
468 if index >= self.total_ranges {
469 return None;
470 }
471
472 let start = index * self.range_size;
473 let end = ((index + 1) * self.range_size).min(self.total_len);
474
475 Some(start..end)
476 }
477
478 pub fn total_ranges(&self) -> usize {
480 self.total_ranges
481 }
482}
483
484impl Iterator for RangeScanner {
485 type Item = Range<usize>;
486
487 fn next(&mut self) -> Option<Self::Item> {
488 let range = self.range(self.current)?;
489 self.current += 1;
490 Some(range)
491 }
492}
493
494pub fn open_for_scan(path: impl AsRef<Path>) -> io::Result<MmapRegion> {
496 let file = File::open(path)?;
497 MmapRegion::new_with_readahead(&file)
498}
499
500pub fn scan_file(path: impl AsRef<Path>, chunk_size: usize) -> io::Result<(MmapRegion, usize)> {
502 let region = open_for_scan(path)?;
503 Ok((region, chunk_size))
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509 use std::io::Write;
510 use tempfile::NamedTempFile;
511
512 fn create_test_file(data: &[u8]) -> NamedTempFile {
513 let mut file = NamedTempFile::new().unwrap();
514 file.write_all(data).unwrap();
515 file.flush().unwrap();
516 file
517 }
518
519 #[test]
520 fn test_mmap_region_basic() {
521 let data = b"Hello, World! This is test data for mmap.";
522 let file = create_test_file(data);
523
524 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
525
526 assert_eq!(region.len(), data.len());
527 assert_eq!(region.as_slice(), data);
528 }
529
530 #[test]
531 fn test_mmap_empty_file() {
532 let file = create_test_file(b"");
533
534 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
535
536 assert!(region.is_empty());
537 assert_eq!(region.as_slice(), &[] as &[u8]);
538 }
539
540 #[test]
541 fn test_mmap_slice() {
542 let data = b"0123456789ABCDEF";
543 let file = create_test_file(data);
544
545 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
546
547 assert_eq!(region.slice(0..4), Some(&b"0123"[..]));
548 assert_eq!(region.slice(4..8), Some(&b"4567"[..]));
549 assert_eq!(region.slice(0..100), None);
550 }
551
552 #[test]
553 fn test_zero_copy_iterator() {
554 let data = b"AAAABBBBCCCCDDDD";
555 let file = create_test_file(data);
556
557 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
558 let iter = ZeroCopyIterator::new(region.as_slice(), 4);
559
560 let chunks: Vec<_> = iter.collect();
561
562 assert_eq!(chunks.len(), 4);
563 assert_eq!(chunks[0], b"AAAA");
564 assert_eq!(chunks[1], b"BBBB");
565 assert_eq!(chunks[2], b"CCCC");
566 assert_eq!(chunks[3], b"DDDD");
567 }
568
569 #[test]
570 fn test_iterator_uneven_chunks() {
571 let data = b"AAABBBCC";
572 let file = create_test_file(data);
573
574 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
575 let iter = ZeroCopyIterator::new(region.as_slice(), 3);
576
577 let chunks: Vec<_> = iter.collect();
578
579 assert_eq!(chunks.len(), 3);
580 assert_eq!(chunks[0], b"AAA");
581 assert_eq!(chunks[1], b"BBB");
582 assert_eq!(chunks[2], b"CC");
583 }
584
585 #[test]
586 fn test_iterator_stats() {
587 let data = b"AAAABBBBCCCC";
588 let file = create_test_file(data);
589
590 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
591 let iter = ZeroCopyIterator::new(region.as_slice(), 4);
592 let stats = Arc::clone(&iter.stats);
593
594 let _chunks: Vec<_> = iter.collect();
595
596 let snapshot = stats.snapshot();
597 assert_eq!(snapshot.chunks_read, 3);
598 assert_eq!(snapshot.bytes_read, 12);
599 }
600
601 #[test]
602 fn test_iterator_seek() {
603 let data = b"AAAABBBBCCCCDDDD";
604 let file = create_test_file(data);
605
606 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
607 let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
608
609 assert!(iter.seek(8));
610 assert_eq!(iter.next(), Some(&b"CCCC"[..]));
611
612 assert!(!iter.seek(100));
613 }
614
615 #[test]
616 fn test_block_iterator() {
617 let data = b"BLK1BLK2BLK3";
618 let file = create_test_file(data);
619
620 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
621 let iter = BlockIterator::new(region.as_slice(), 4);
622
623 let blocks: Vec<_> = iter.collect();
624
625 assert_eq!(blocks.len(), 3);
626 assert_eq!(blocks[0], (0, &b"BLK1"[..]));
627 assert_eq!(blocks[1], (1, &b"BLK2"[..]));
628 assert_eq!(blocks[2], (2, &b"BLK3"[..]));
629 }
630
631 #[test]
632 fn test_block_iterator_skip() {
633 let data = b"BLK1BLK2BLK3BLK4";
634 let file = create_test_file(data);
635
636 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
637 let mut iter = BlockIterator::new(region.as_slice(), 4);
638
639 assert!(iter.skip_to_block(2));
640 assert_eq!(iter.next(), Some((2, &b"BLK3"[..])));
641 }
642
643 #[test]
644 fn test_filtered_scan() {
645 let data = b"ABCDXXXXYYYY1234";
646 let file = create_test_file(data);
647
648 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
649
650 let scan = FilteredScan::new(region.as_slice(), 4, |chunk| {
652 !chunk.contains(&b'X') && !chunk.contains(&b'Y')
653 });
654
655 let matching: Vec<_> = scan.collect();
656
657 assert_eq!(matching.len(), 2);
658 assert_eq!(matching[0], b"ABCD");
659 assert_eq!(matching[1], b"1234");
660 }
661
662 #[test]
663 fn test_range_scanner() {
664 let scanner = RangeScanner::new(100, 4);
665
666 assert_eq!(scanner.total_ranges(), 4);
667
668 let ranges: Vec<_> = scanner.collect();
669
670 assert_eq!(ranges.len(), 4);
671 assert_eq!(ranges[0], 0..25);
672 assert_eq!(ranges[1], 25..50);
673 assert_eq!(ranges[2], 50..75);
674 assert_eq!(ranges[3], 75..100);
675 }
676
677 #[test]
678 fn test_range_scanner_uneven() {
679 let scanner = RangeScanner::new(10, 3);
680
681 let ranges: Vec<_> = scanner.collect();
682
683 assert_eq!(ranges.len(), 3);
685 assert!(ranges.last().unwrap().end == 10);
686 }
687
688 #[test]
689 fn test_range_scanner_empty() {
690 let scanner = RangeScanner::new(0, 4);
691
692 assert_eq!(scanner.total_ranges(), 0);
693 let ranges: Vec<_> = scanner.collect();
694 assert!(ranges.is_empty());
695 }
696
697 #[test]
698 fn test_parallel_scan_config() {
699 let config = ParallelScanConfig::default();
700
701 assert!(config.num_readers > 0);
702 assert!(config.chunk_size > 0);
703 }
704
705 #[test]
706 fn test_remaining_bytes() {
707 let data = b"AAAABBBBCCCC";
708 let file = create_test_file(data);
709
710 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
711 let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
712
713 assert_eq!(iter.remaining(), 12);
714 iter.next();
715 assert_eq!(iter.remaining(), 8);
716 iter.next();
717 assert_eq!(iter.remaining(), 4);
718 iter.next();
719 assert_eq!(iter.remaining(), 0);
720 }
721
722 #[test]
723 fn test_mmap_with_readahead() {
724 let data = b"Test data for readahead mmap";
725 let file = create_test_file(data);
726
727 let region = MmapRegion::new_with_readahead(&File::open(file.path()).unwrap()).unwrap();
728
729 assert_eq!(region.len(), data.len());
730 assert_eq!(region.as_slice(), data);
731 }
732
733 #[test]
734 fn test_prefetch() {
735 let data = vec![0u8; 1024 * 1024]; let file = create_test_file(&data);
737
738 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
739
740 region.prefetch(0..65536);
742 region.prefetch(65536..131072);
743 }
744}