1use std::fs::File;
59use std::io;
60use std::ops::Range;
61use std::path::Path;
62use std::sync::Arc;
63use std::sync::atomic::{AtomicU64, Ordering};
64
65#[cfg(unix)]
66use std::os::unix::io::AsRawFd;
67
68pub struct MmapRegion {
70 ptr: *const u8,
72 len: usize,
74 #[cfg(unix)]
76 _fd: i32,
77}
78
79unsafe impl Send for MmapRegion {}
82unsafe impl Sync for MmapRegion {}
83
84impl MmapRegion {
85 #[cfg(unix)]
87 pub fn new(file: &File) -> io::Result<Self> {
88 use std::ptr;
89
90 let metadata = file.metadata()?;
91 let len = metadata.len() as usize;
92
93 if len == 0 {
94 return Ok(Self {
95 ptr: ptr::null(),
96 len: 0,
97 _fd: file.as_raw_fd(),
98 });
99 }
100
101 let ptr = unsafe {
103 libc::mmap(
104 ptr::null_mut(),
105 len,
106 libc::PROT_READ,
107 libc::MAP_PRIVATE,
108 file.as_raw_fd(),
109 0,
110 )
111 };
112
113 if ptr == libc::MAP_FAILED {
114 return Err(io::Error::last_os_error());
115 }
116
117 Ok(Self {
118 ptr: ptr as *const u8,
119 len,
120 _fd: file.as_raw_fd(),
121 })
122 }
123
124 #[cfg(unix)]
126 pub fn new_with_readahead(file: &File) -> io::Result<Self> {
127 let region = Self::new(file)?;
128
129 if region.len > 0 {
130 unsafe {
132 libc::madvise(
133 region.ptr as *mut libc::c_void,
134 region.len,
135 libc::MADV_SEQUENTIAL,
136 );
137 }
138 }
139
140 Ok(region)
141 }
142
143 #[cfg(not(unix))]
145 pub fn new(file: &File) -> io::Result<Self> {
146 use std::io::Read;
147 let mut file = file;
148 let mut buffer = Vec::new();
149 file.read_to_end(&mut buffer)?;
150
151 let len = buffer.len();
152 let ptr = Box::into_raw(buffer.into_boxed_slice()) as *const u8;
153
154 Ok(Self { ptr, len })
155 }
156
157 #[cfg(not(unix))]
158 pub fn new_with_readahead(file: &File) -> io::Result<Self> {
159 Self::new(file)
160 }
161
162 pub fn len(&self) -> usize {
164 self.len
165 }
166
167 pub fn is_empty(&self) -> bool {
169 self.len == 0
170 }
171
172 pub fn as_slice(&self) -> &[u8] {
178 if self.ptr.is_null() || self.len == 0 {
179 return &[];
180 }
181 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
183 }
184
185 pub fn slice(&self, range: Range<usize>) -> Option<&[u8]> {
187 if range.end > self.len {
188 return None;
189 }
190 Some(&self.as_slice()[range])
191 }
192
193 #[cfg(unix)]
195 pub fn prefetch(&self, range: Range<usize>) {
196 if range.start >= self.len || self.ptr.is_null() {
197 return;
198 }
199
200 let end = range.end.min(self.len);
201 let ptr = unsafe { self.ptr.add(range.start) };
202 let len = end - range.start;
203
204 unsafe {
205 libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_WILLNEED);
206 }
207 }
208
209 #[cfg(not(unix))]
210 pub fn prefetch(&self, _range: Range<usize>) {
211 }
213}
214
215impl Drop for MmapRegion {
216 #[cfg(unix)]
217 fn drop(&mut self) {
218 if !self.ptr.is_null() && self.len > 0 {
219 unsafe {
220 libc::munmap(self.ptr as *mut libc::c_void, self.len);
221 }
222 }
223 }
224
225 #[cfg(not(unix))]
226 fn drop(&mut self) {
227 if !self.ptr.is_null() && self.len > 0 {
228 unsafe {
230 let slice = std::slice::from_raw_parts_mut(self.ptr as *mut u8, self.len);
231 drop(Box::from_raw(slice));
232 }
233 }
234 }
235}
236
237pub struct ZeroCopyIterator<'a> {
239 data: &'a [u8],
241 pos: usize,
243 chunk_size: usize,
245 stats: Arc<IteratorStats>,
247}
248
249impl<'a> ZeroCopyIterator<'a> {
250 pub fn new(data: &'a [u8], chunk_size: usize) -> Self {
252 Self {
253 data,
254 pos: 0,
255 chunk_size,
256 stats: Arc::new(IteratorStats::default()),
257 }
258 }
259
260 pub fn with_stats(data: &'a [u8], chunk_size: usize, stats: Arc<IteratorStats>) -> Self {
262 Self {
263 data,
264 pos: 0,
265 chunk_size,
266 stats,
267 }
268 }
269
270 pub fn remaining(&self) -> usize {
272 self.data.len().saturating_sub(self.pos)
273 }
274
275 pub fn seek(&mut self, pos: usize) -> bool {
277 if pos <= self.data.len() {
278 self.pos = pos;
279 true
280 } else {
281 false
282 }
283 }
284
285 pub fn stats(&self) -> &IteratorStats {
287 &self.stats
288 }
289}
290
291impl<'a> Iterator for ZeroCopyIterator<'a> {
292 type Item = &'a [u8];
293
294 fn next(&mut self) -> Option<Self::Item> {
295 if self.pos >= self.data.len() {
296 return None;
297 }
298
299 let start = self.pos;
300 let end = (start + self.chunk_size).min(self.data.len());
301 self.pos = end;
302
303 self.stats.chunks_read.fetch_add(1, Ordering::Relaxed);
304 self.stats
305 .bytes_read
306 .fetch_add((end - start) as u64, Ordering::Relaxed);
307
308 Some(&self.data[start..end])
309 }
310}
311
312#[derive(Debug, Default)]
314pub struct IteratorStats {
315 pub chunks_read: AtomicU64,
316 pub bytes_read: AtomicU64,
317 pub seeks: AtomicU64,
318}
319
320impl IteratorStats {
321 pub fn snapshot(&self) -> IteratorStatsSnapshot {
322 IteratorStatsSnapshot {
323 chunks_read: self.chunks_read.load(Ordering::Relaxed),
324 bytes_read: self.bytes_read.load(Ordering::Relaxed),
325 seeks: self.seeks.load(Ordering::Relaxed),
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
331pub struct IteratorStatsSnapshot {
332 pub chunks_read: u64,
333 pub bytes_read: u64,
334 pub seeks: u64,
335}
336
337pub struct BlockIterator<'a> {
339 inner: ZeroCopyIterator<'a>,
341 block_index: usize,
343}
344
345impl<'a> BlockIterator<'a> {
346 pub fn new(data: &'a [u8], block_size: usize) -> Self {
347 Self {
348 inner: ZeroCopyIterator::new(data, block_size),
349 block_index: 0,
350 }
351 }
352
353 pub fn block_index(&self) -> usize {
355 self.block_index
356 }
357
358 pub fn skip_to_block(&mut self, index: usize) -> bool {
360 let pos = index * self.inner.chunk_size;
361 if self.inner.seek(pos) {
362 self.block_index = index;
363 true
364 } else {
365 false
366 }
367 }
368}
369
370impl<'a> Iterator for BlockIterator<'a> {
371 type Item = (usize, &'a [u8]);
372
373 fn next(&mut self) -> Option<Self::Item> {
374 let block = self.inner.next()?;
375 let index = self.block_index;
376 self.block_index += 1;
377 Some((index, block))
378 }
379}
380
381pub struct FilteredScan<'a, F>
383where
384 F: Fn(&[u8]) -> bool,
385{
386 inner: ZeroCopyIterator<'a>,
387 predicate: F,
388}
389
390impl<'a, F> FilteredScan<'a, F>
391where
392 F: Fn(&[u8]) -> bool,
393{
394 pub fn new(data: &'a [u8], chunk_size: usize, predicate: F) -> Self {
395 Self {
396 inner: ZeroCopyIterator::new(data, chunk_size),
397 predicate,
398 }
399 }
400}
401
402impl<'a, F> Iterator for FilteredScan<'a, F>
403where
404 F: Fn(&[u8]) -> bool,
405{
406 type Item = &'a [u8];
407
408 fn next(&mut self) -> Option<Self::Item> {
409 loop {
410 let chunk = self.inner.next()?;
411 if (self.predicate)(chunk) {
412 return Some(chunk);
413 }
414 }
415 }
416}
417
418#[derive(Debug, Clone)]
420pub struct ParallelScanConfig {
421 pub num_readers: usize,
423 pub chunk_size: usize,
425 pub prefetch_distance: usize,
427}
428
429impl Default for ParallelScanConfig {
430 fn default() -> Self {
431 Self {
432 num_readers: 4,
433 chunk_size: 64 * 1024, prefetch_distance: 2,
435 }
436 }
437}
438
439pub struct RangeScanner {
441 total_len: usize,
443 range_size: usize,
445 current: usize,
447 total_ranges: usize,
449}
450
451impl RangeScanner {
452 pub fn new(total_len: usize, num_ranges: usize) -> Self {
454 let range_size = total_len.div_ceil(num_ranges.max(1));
455 let total_ranges = if total_len > 0 {
456 total_len.div_ceil(range_size)
457 } else {
458 0
459 };
460
461 Self {
462 total_len,
463 range_size,
464 current: 0,
465 total_ranges,
466 }
467 }
468
469 pub fn range(&self, index: usize) -> Option<Range<usize>> {
471 if index >= self.total_ranges {
472 return None;
473 }
474
475 let start = index * self.range_size;
476 let end = ((index + 1) * self.range_size).min(self.total_len);
477
478 Some(start..end)
479 }
480
481 pub fn total_ranges(&self) -> usize {
483 self.total_ranges
484 }
485}
486
487impl Iterator for RangeScanner {
488 type Item = Range<usize>;
489
490 fn next(&mut self) -> Option<Self::Item> {
491 let range = self.range(self.current)?;
492 self.current += 1;
493 Some(range)
494 }
495}
496
497pub fn open_for_scan(path: impl AsRef<Path>) -> io::Result<MmapRegion> {
499 let file = File::open(path)?;
500 MmapRegion::new_with_readahead(&file)
501}
502
503pub fn scan_file(path: impl AsRef<Path>, chunk_size: usize) -> io::Result<(MmapRegion, usize)> {
505 let region = open_for_scan(path)?;
506 Ok((region, chunk_size))
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use std::io::Write;
513 use tempfile::NamedTempFile;
514
515 fn create_test_file(data: &[u8]) -> NamedTempFile {
516 let mut file = NamedTempFile::new().unwrap();
517 file.write_all(data).unwrap();
518 file.flush().unwrap();
519 file
520 }
521
522 #[test]
523 fn test_mmap_region_basic() {
524 let data = b"Hello, World! This is test data for mmap.";
525 let file = create_test_file(data);
526
527 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
528
529 assert_eq!(region.len(), data.len());
530 assert_eq!(region.as_slice(), data);
531 }
532
533 #[test]
534 fn test_mmap_empty_file() {
535 let file = create_test_file(b"");
536
537 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
538
539 assert!(region.is_empty());
540 assert_eq!(region.as_slice(), &[] as &[u8]);
541 }
542
543 #[test]
544 fn test_mmap_slice() {
545 let data = b"0123456789ABCDEF";
546 let file = create_test_file(data);
547
548 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
549
550 assert_eq!(region.slice(0..4), Some(&b"0123"[..]));
551 assert_eq!(region.slice(4..8), Some(&b"4567"[..]));
552 assert_eq!(region.slice(0..100), None);
553 }
554
555 #[test]
556 fn test_zero_copy_iterator() {
557 let data = b"AAAABBBBCCCCDDDD";
558 let file = create_test_file(data);
559
560 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
561 let iter = ZeroCopyIterator::new(region.as_slice(), 4);
562
563 let chunks: Vec<_> = iter.collect();
564
565 assert_eq!(chunks.len(), 4);
566 assert_eq!(chunks[0], b"AAAA");
567 assert_eq!(chunks[1], b"BBBB");
568 assert_eq!(chunks[2], b"CCCC");
569 assert_eq!(chunks[3], b"DDDD");
570 }
571
572 #[test]
573 fn test_iterator_uneven_chunks() {
574 let data = b"AAABBBCC";
575 let file = create_test_file(data);
576
577 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
578 let iter = ZeroCopyIterator::new(region.as_slice(), 3);
579
580 let chunks: Vec<_> = iter.collect();
581
582 assert_eq!(chunks.len(), 3);
583 assert_eq!(chunks[0], b"AAA");
584 assert_eq!(chunks[1], b"BBB");
585 assert_eq!(chunks[2], b"CC");
586 }
587
588 #[test]
589 fn test_iterator_stats() {
590 let data = b"AAAABBBBCCCC";
591 let file = create_test_file(data);
592
593 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
594 let iter = ZeroCopyIterator::new(region.as_slice(), 4);
595 let stats = Arc::clone(&iter.stats);
596
597 let _chunks: Vec<_> = iter.collect();
598
599 let snapshot = stats.snapshot();
600 assert_eq!(snapshot.chunks_read, 3);
601 assert_eq!(snapshot.bytes_read, 12);
602 }
603
604 #[test]
605 fn test_iterator_seek() {
606 let data = b"AAAABBBBCCCCDDDD";
607 let file = create_test_file(data);
608
609 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
610 let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
611
612 assert!(iter.seek(8));
613 assert_eq!(iter.next(), Some(&b"CCCC"[..]));
614
615 assert!(!iter.seek(100));
616 }
617
618 #[test]
619 fn test_block_iterator() {
620 let data = b"BLK1BLK2BLK3";
621 let file = create_test_file(data);
622
623 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
624 let iter = BlockIterator::new(region.as_slice(), 4);
625
626 let blocks: Vec<_> = iter.collect();
627
628 assert_eq!(blocks.len(), 3);
629 assert_eq!(blocks[0], (0, &b"BLK1"[..]));
630 assert_eq!(blocks[1], (1, &b"BLK2"[..]));
631 assert_eq!(blocks[2], (2, &b"BLK3"[..]));
632 }
633
634 #[test]
635 fn test_block_iterator_skip() {
636 let data = b"BLK1BLK2BLK3BLK4";
637 let file = create_test_file(data);
638
639 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
640 let mut iter = BlockIterator::new(region.as_slice(), 4);
641
642 assert!(iter.skip_to_block(2));
643 assert_eq!(iter.next(), Some((2, &b"BLK3"[..])));
644 }
645
646 #[test]
647 fn test_filtered_scan() {
648 let data = b"ABCDXXXXYYYY1234";
649 let file = create_test_file(data);
650
651 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
652
653 let scan = FilteredScan::new(region.as_slice(), 4, |chunk| {
655 !chunk.contains(&b'X') && !chunk.contains(&b'Y')
656 });
657
658 let matching: Vec<_> = scan.collect();
659
660 assert_eq!(matching.len(), 2);
661 assert_eq!(matching[0], b"ABCD");
662 assert_eq!(matching[1], b"1234");
663 }
664
665 #[test]
666 fn test_range_scanner() {
667 let scanner = RangeScanner::new(100, 4);
668
669 assert_eq!(scanner.total_ranges(), 4);
670
671 let ranges: Vec<_> = scanner.collect();
672
673 assert_eq!(ranges.len(), 4);
674 assert_eq!(ranges[0], 0..25);
675 assert_eq!(ranges[1], 25..50);
676 assert_eq!(ranges[2], 50..75);
677 assert_eq!(ranges[3], 75..100);
678 }
679
680 #[test]
681 fn test_range_scanner_uneven() {
682 let scanner = RangeScanner::new(10, 3);
683
684 let ranges: Vec<_> = scanner.collect();
685
686 assert_eq!(ranges.len(), 3);
688 assert!(ranges.last().unwrap().end == 10);
689 }
690
691 #[test]
692 fn test_range_scanner_empty() {
693 let scanner = RangeScanner::new(0, 4);
694
695 assert_eq!(scanner.total_ranges(), 0);
696 let ranges: Vec<_> = scanner.collect();
697 assert!(ranges.is_empty());
698 }
699
700 #[test]
701 fn test_parallel_scan_config() {
702 let config = ParallelScanConfig::default();
703
704 assert!(config.num_readers > 0);
705 assert!(config.chunk_size > 0);
706 }
707
708 #[test]
709 fn test_remaining_bytes() {
710 let data = b"AAAABBBBCCCC";
711 let file = create_test_file(data);
712
713 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
714 let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
715
716 assert_eq!(iter.remaining(), 12);
717 iter.next();
718 assert_eq!(iter.remaining(), 8);
719 iter.next();
720 assert_eq!(iter.remaining(), 4);
721 iter.next();
722 assert_eq!(iter.remaining(), 0);
723 }
724
725 #[test]
726 fn test_mmap_with_readahead() {
727 let data = b"Test data for readahead mmap";
728 let file = create_test_file(data);
729
730 let region = MmapRegion::new_with_readahead(&File::open(file.path()).unwrap()).unwrap();
731
732 assert_eq!(region.len(), data.len());
733 assert_eq!(region.as_slice(), data);
734 }
735
736 #[test]
737 fn test_prefetch() {
738 let data = vec![0u8; 1024 * 1024]; let file = create_test_file(&data);
740
741 let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
742
743 region.prefetch(0..65536);
745 region.prefetch(65536..131072);
746 }
747}