sochdb_core/
zero_copy.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Zero-Copy Iterators - mmap-based Scans
16//!
17//! This module provides memory-mapped file access for efficient,
18//! zero-copy iteration over large data files.
19//!
20//! # Design
21//!
22//! ```text
23//! ┌─────────────────────────────────────────────────────────────────┐
24//! │                    Zero-Copy Iterator                           │
25//! │                                                                 │
26//! │  Application                    OS/Kernel                       │
27//! │  ┌────────────┐                 ┌────────────────────┐         │
28//! │  │  Iterator  │                 │   Page Cache       │         │
29//! │  │            │                 │  ┌──────────────┐  │         │
30//! │  │  &[u8] ────┼─────────────────┼─▶│ Data Pages  │  │         │
31//! │  │            │    mmap         │  └──────────────┘  │         │
32//! │  └────────────┘                 │         ↑         │         │
33//! │        │                        │         │         │         │
34//! │        │ No copy!               │    ┌────┴────┐    │         │
35//! │        ▼                        │    │  Disk   │    │         │
36//! │  Process data                   │    └─────────┘    │         │
37//! │  in-place                       └────────────────────┘         │
38//! │                                                                 │
39//! │  Benefits:                                                      │
40//! │  • No buffer copies                                             │
41//! │  • OS manages page faults                                       │
42//! │  • Efficient for sequential scans                               │
43//! │  • Read-ahead by OS                                             │
44//! └─────────────────────────────────────────────────────────────────┘
45//! ```
46//!
47//! # Safety
48//!
49//! Memory-mapped regions can become invalid if the underlying file is
50//! modified. This implementation provides:
51//! - Read-only mappings by default
52//! - Length validation
53//! - Guard types for safe access
54
55use std::fs::File;
56use std::io;
57use std::ops::Range;
58use std::path::Path;
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62#[cfg(unix)]
63use std::os::unix::io::AsRawFd;
64
65/// Memory-mapped region
66pub struct MmapRegion {
67    /// Pointer to mapped memory
68    ptr: *const u8,
69    /// Length of mapped region
70    len: usize,
71    /// File descriptor (for cleanup)
72    #[cfg(unix)]
73    _fd: i32,
74}
75
76// Safety: MmapRegion is Send/Sync because we only allow read-only access
77// and the underlying mapping is immutable
78unsafe impl Send for MmapRegion {}
79unsafe impl Sync for MmapRegion {}
80
81impl MmapRegion {
82    /// Create a new memory-mapped region for a file
83    #[cfg(unix)]
84    pub fn new(file: &File) -> io::Result<Self> {
85        use std::ptr;
86
87        let metadata = file.metadata()?;
88        let len = metadata.len() as usize;
89
90        if len == 0 {
91            return Ok(Self {
92                ptr: ptr::null(),
93                len: 0,
94                _fd: file.as_raw_fd(),
95            });
96        }
97
98        // SAFETY: We're creating a read-only mapping of a valid file
99        let ptr = unsafe {
100            libc::mmap(
101                ptr::null_mut(),
102                len,
103                libc::PROT_READ,
104                libc::MAP_PRIVATE,
105                file.as_raw_fd(),
106                0,
107            )
108        };
109
110        if ptr == libc::MAP_FAILED {
111            return Err(io::Error::last_os_error());
112        }
113
114        Ok(Self {
115            ptr: ptr as *const u8,
116            len,
117            _fd: file.as_raw_fd(),
118        })
119    }
120
121    /// Create with advisory read-ahead hint
122    #[cfg(unix)]
123    pub fn new_with_readahead(file: &File) -> io::Result<Self> {
124        let region = Self::new(file)?;
125
126        if region.len > 0 {
127            // Advise kernel for sequential access
128            unsafe {
129                libc::madvise(
130                    region.ptr as *mut libc::c_void,
131                    region.len,
132                    libc::MADV_SEQUENTIAL,
133                );
134            }
135        }
136
137        Ok(region)
138    }
139
140    /// Fallback for non-Unix systems (reads entire file into memory)
141    #[cfg(not(unix))]
142    pub fn new(file: &File) -> io::Result<Self> {
143        use std::io::Read;
144        let mut file = file;
145        let mut buffer = Vec::new();
146        file.read_to_end(&mut buffer)?;
147
148        let len = buffer.len();
149        let ptr = Box::into_raw(buffer.into_boxed_slice()) as *const u8;
150
151        Ok(Self { ptr, len })
152    }
153
154    #[cfg(not(unix))]
155    pub fn new_with_readahead(file: &File) -> io::Result<Self> {
156        Self::new(file)
157    }
158
159    /// Get the length of the mapped region
160    pub fn len(&self) -> usize {
161        self.len
162    }
163
164    /// Check if empty
165    pub fn is_empty(&self) -> bool {
166        self.len == 0
167    }
168
169    /// Get a slice of the mapped region
170    ///
171    /// # Safety
172    /// The returned slice is only valid as long as the MmapRegion exists
173    /// and the underlying file is not modified.
174    pub fn as_slice(&self) -> &[u8] {
175        if self.ptr.is_null() || self.len == 0 {
176            return &[];
177        }
178        // SAFETY: ptr is valid for len bytes, read-only, and properly aligned
179        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
180    }
181
182    /// Get a subslice of the mapped region
183    pub fn slice(&self, range: Range<usize>) -> Option<&[u8]> {
184        if range.end > self.len {
185            return None;
186        }
187        Some(&self.as_slice()[range])
188    }
189
190    /// Prefetch a range of data
191    #[cfg(unix)]
192    pub fn prefetch(&self, range: Range<usize>) {
193        if range.start >= self.len || self.ptr.is_null() {
194            return;
195        }
196
197        let end = range.end.min(self.len);
198        let ptr = unsafe { self.ptr.add(range.start) };
199        let len = end - range.start;
200
201        unsafe {
202            libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_WILLNEED);
203        }
204    }
205
206    #[cfg(not(unix))]
207    pub fn prefetch(&self, _range: Range<usize>) {
208        // No-op on non-Unix
209    }
210}
211
212impl Drop for MmapRegion {
213    #[cfg(unix)]
214    fn drop(&mut self) {
215        if !self.ptr.is_null() && self.len > 0 {
216            unsafe {
217                libc::munmap(self.ptr as *mut libc::c_void, self.len);
218            }
219        }
220    }
221
222    #[cfg(not(unix))]
223    fn drop(&mut self) {
224        if !self.ptr.is_null() && self.len > 0 {
225            // Reconstruct and drop the boxed slice
226            unsafe {
227                let slice = std::slice::from_raw_parts_mut(self.ptr as *mut u8, self.len);
228                drop(Box::from_raw(slice));
229            }
230        }
231    }
232}
233
234/// Zero-copy iterator over chunks of mapped memory
235pub struct ZeroCopyIterator<'a> {
236    /// The mapped region
237    data: &'a [u8],
238    /// Current position
239    pos: usize,
240    /// Chunk size
241    chunk_size: usize,
242    /// Statistics
243    stats: Arc<IteratorStats>,
244}
245
246impl<'a> ZeroCopyIterator<'a> {
247    /// Create new iterator over mapped region
248    pub fn new(data: &'a [u8], chunk_size: usize) -> Self {
249        Self {
250            data,
251            pos: 0,
252            chunk_size,
253            stats: Arc::new(IteratorStats::default()),
254        }
255    }
256
257    /// Create with shared statistics
258    pub fn with_stats(data: &'a [u8], chunk_size: usize, stats: Arc<IteratorStats>) -> Self {
259        Self {
260            data,
261            pos: 0,
262            chunk_size,
263            stats,
264        }
265    }
266
267    /// Get remaining bytes
268    pub fn remaining(&self) -> usize {
269        self.data.len().saturating_sub(self.pos)
270    }
271
272    /// Seek to position
273    pub fn seek(&mut self, pos: usize) -> bool {
274        if pos <= self.data.len() {
275            self.pos = pos;
276            true
277        } else {
278            false
279        }
280    }
281
282    /// Get statistics
283    pub fn stats(&self) -> &IteratorStats {
284        &self.stats
285    }
286}
287
288impl<'a> Iterator for ZeroCopyIterator<'a> {
289    type Item = &'a [u8];
290
291    fn next(&mut self) -> Option<Self::Item> {
292        if self.pos >= self.data.len() {
293            return None;
294        }
295
296        let start = self.pos;
297        let end = (start + self.chunk_size).min(self.data.len());
298        self.pos = end;
299
300        self.stats.chunks_read.fetch_add(1, Ordering::Relaxed);
301        self.stats
302            .bytes_read
303            .fetch_add((end - start) as u64, Ordering::Relaxed);
304
305        Some(&self.data[start..end])
306    }
307}
308
309/// Iterator statistics
310#[derive(Debug, Default)]
311pub struct IteratorStats {
312    pub chunks_read: AtomicU64,
313    pub bytes_read: AtomicU64,
314    pub seeks: AtomicU64,
315}
316
317impl IteratorStats {
318    pub fn snapshot(&self) -> IteratorStatsSnapshot {
319        IteratorStatsSnapshot {
320            chunks_read: self.chunks_read.load(Ordering::Relaxed),
321            bytes_read: self.bytes_read.load(Ordering::Relaxed),
322            seeks: self.seeks.load(Ordering::Relaxed),
323        }
324    }
325}
326
327#[derive(Debug, Clone)]
328pub struct IteratorStatsSnapshot {
329    pub chunks_read: u64,
330    pub bytes_read: u64,
331    pub seeks: u64,
332}
333
334/// Block-aware iterator that parses block headers
335pub struct BlockIterator<'a> {
336    /// Raw data iterator
337    inner: ZeroCopyIterator<'a>,
338    /// Current block index
339    block_index: usize,
340}
341
342impl<'a> BlockIterator<'a> {
343    pub fn new(data: &'a [u8], block_size: usize) -> Self {
344        Self {
345            inner: ZeroCopyIterator::new(data, block_size),
346            block_index: 0,
347        }
348    }
349
350    /// Get current block index
351    pub fn block_index(&self) -> usize {
352        self.block_index
353    }
354
355    /// Skip to specific block
356    pub fn skip_to_block(&mut self, index: usize) -> bool {
357        let pos = index * self.inner.chunk_size;
358        if self.inner.seek(pos) {
359            self.block_index = index;
360            true
361        } else {
362            false
363        }
364    }
365}
366
367impl<'a> Iterator for BlockIterator<'a> {
368    type Item = (usize, &'a [u8]);
369
370    fn next(&mut self) -> Option<Self::Item> {
371        let block = self.inner.next()?;
372        let index = self.block_index;
373        self.block_index += 1;
374        Some((index, block))
375    }
376}
377
378/// Scanned region with optional filtering
379pub struct FilteredScan<'a, F>
380where
381    F: Fn(&[u8]) -> bool,
382{
383    inner: ZeroCopyIterator<'a>,
384    predicate: F,
385}
386
387impl<'a, F> FilteredScan<'a, F>
388where
389    F: Fn(&[u8]) -> bool,
390{
391    pub fn new(data: &'a [u8], chunk_size: usize, predicate: F) -> Self {
392        Self {
393            inner: ZeroCopyIterator::new(data, chunk_size),
394            predicate,
395        }
396    }
397}
398
399impl<'a, F> Iterator for FilteredScan<'a, F>
400where
401    F: Fn(&[u8]) -> bool,
402{
403    type Item = &'a [u8];
404
405    fn next(&mut self) -> Option<Self::Item> {
406        loop {
407            let chunk = self.inner.next()?;
408            if (self.predicate)(chunk) {
409                return Some(chunk);
410            }
411        }
412    }
413}
414
415/// Parallel scan configuration
416#[derive(Debug, Clone)]
417pub struct ParallelScanConfig {
418    /// Number of parallel readers
419    pub num_readers: usize,
420    /// Chunk size per reader
421    pub chunk_size: usize,
422    /// Prefetch distance (in chunks)
423    pub prefetch_distance: usize,
424}
425
426impl Default for ParallelScanConfig {
427    fn default() -> Self {
428        Self {
429            num_readers: 4,
430            chunk_size: 64 * 1024, // 64KB
431            prefetch_distance: 2,
432        }
433    }
434}
435
436/// Range scanner for parallel processing
437pub struct RangeScanner {
438    /// Total length
439    total_len: usize,
440    /// Range size
441    range_size: usize,
442    /// Current range index
443    current: usize,
444    /// Total ranges
445    total_ranges: usize,
446}
447
448impl RangeScanner {
449    /// Create scanner that divides data into N ranges
450    pub fn new(total_len: usize, num_ranges: usize) -> Self {
451        let range_size = total_len.div_ceil(num_ranges.max(1));
452        let total_ranges = if total_len > 0 {
453            total_len.div_ceil(range_size)
454        } else {
455            0
456        };
457
458        Self {
459            total_len,
460            range_size,
461            current: 0,
462            total_ranges,
463        }
464    }
465
466    /// Get range for a specific index
467    pub fn range(&self, index: usize) -> Option<Range<usize>> {
468        if index >= self.total_ranges {
469            return None;
470        }
471
472        let start = index * self.range_size;
473        let end = ((index + 1) * self.range_size).min(self.total_len);
474
475        Some(start..end)
476    }
477
478    /// Get total number of ranges
479    pub fn total_ranges(&self) -> usize {
480        self.total_ranges
481    }
482}
483
484impl Iterator for RangeScanner {
485    type Item = Range<usize>;
486
487    fn next(&mut self) -> Option<Self::Item> {
488        let range = self.range(self.current)?;
489        self.current += 1;
490        Some(range)
491    }
492}
493
494/// Open a file for zero-copy scanning
495pub fn open_for_scan(path: impl AsRef<Path>) -> io::Result<MmapRegion> {
496    let file = File::open(path)?;
497    MmapRegion::new_with_readahead(&file)
498}
499
500/// Create an iterator over a file
501pub fn scan_file(path: impl AsRef<Path>, chunk_size: usize) -> io::Result<(MmapRegion, usize)> {
502    let region = open_for_scan(path)?;
503    Ok((region, chunk_size))
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use std::io::Write;
510    use tempfile::NamedTempFile;
511
512    fn create_test_file(data: &[u8]) -> NamedTempFile {
513        let mut file = NamedTempFile::new().unwrap();
514        file.write_all(data).unwrap();
515        file.flush().unwrap();
516        file
517    }
518
519    #[test]
520    fn test_mmap_region_basic() {
521        let data = b"Hello, World! This is test data for mmap.";
522        let file = create_test_file(data);
523
524        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
525
526        assert_eq!(region.len(), data.len());
527        assert_eq!(region.as_slice(), data);
528    }
529
530    #[test]
531    fn test_mmap_empty_file() {
532        let file = create_test_file(b"");
533
534        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
535
536        assert!(region.is_empty());
537        assert_eq!(region.as_slice(), &[] as &[u8]);
538    }
539
540    #[test]
541    fn test_mmap_slice() {
542        let data = b"0123456789ABCDEF";
543        let file = create_test_file(data);
544
545        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
546
547        assert_eq!(region.slice(0..4), Some(&b"0123"[..]));
548        assert_eq!(region.slice(4..8), Some(&b"4567"[..]));
549        assert_eq!(region.slice(0..100), None);
550    }
551
552    #[test]
553    fn test_zero_copy_iterator() {
554        let data = b"AAAABBBBCCCCDDDD";
555        let file = create_test_file(data);
556
557        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
558        let iter = ZeroCopyIterator::new(region.as_slice(), 4);
559
560        let chunks: Vec<_> = iter.collect();
561
562        assert_eq!(chunks.len(), 4);
563        assert_eq!(chunks[0], b"AAAA");
564        assert_eq!(chunks[1], b"BBBB");
565        assert_eq!(chunks[2], b"CCCC");
566        assert_eq!(chunks[3], b"DDDD");
567    }
568
569    #[test]
570    fn test_iterator_uneven_chunks() {
571        let data = b"AAABBBCC";
572        let file = create_test_file(data);
573
574        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
575        let iter = ZeroCopyIterator::new(region.as_slice(), 3);
576
577        let chunks: Vec<_> = iter.collect();
578
579        assert_eq!(chunks.len(), 3);
580        assert_eq!(chunks[0], b"AAA");
581        assert_eq!(chunks[1], b"BBB");
582        assert_eq!(chunks[2], b"CC");
583    }
584
585    #[test]
586    fn test_iterator_stats() {
587        let data = b"AAAABBBBCCCC";
588        let file = create_test_file(data);
589
590        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
591        let iter = ZeroCopyIterator::new(region.as_slice(), 4);
592        let stats = Arc::clone(&iter.stats);
593
594        let _chunks: Vec<_> = iter.collect();
595
596        let snapshot = stats.snapshot();
597        assert_eq!(snapshot.chunks_read, 3);
598        assert_eq!(snapshot.bytes_read, 12);
599    }
600
601    #[test]
602    fn test_iterator_seek() {
603        let data = b"AAAABBBBCCCCDDDD";
604        let file = create_test_file(data);
605
606        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
607        let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
608
609        assert!(iter.seek(8));
610        assert_eq!(iter.next(), Some(&b"CCCC"[..]));
611
612        assert!(!iter.seek(100));
613    }
614
615    #[test]
616    fn test_block_iterator() {
617        let data = b"BLK1BLK2BLK3";
618        let file = create_test_file(data);
619
620        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
621        let iter = BlockIterator::new(region.as_slice(), 4);
622
623        let blocks: Vec<_> = iter.collect();
624
625        assert_eq!(blocks.len(), 3);
626        assert_eq!(blocks[0], (0, &b"BLK1"[..]));
627        assert_eq!(blocks[1], (1, &b"BLK2"[..]));
628        assert_eq!(blocks[2], (2, &b"BLK3"[..]));
629    }
630
631    #[test]
632    fn test_block_iterator_skip() {
633        let data = b"BLK1BLK2BLK3BLK4";
634        let file = create_test_file(data);
635
636        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
637        let mut iter = BlockIterator::new(region.as_slice(), 4);
638
639        assert!(iter.skip_to_block(2));
640        assert_eq!(iter.next(), Some((2, &b"BLK3"[..])));
641    }
642
643    #[test]
644    fn test_filtered_scan() {
645        let data = b"ABCDXXXXYYYY1234";
646        let file = create_test_file(data);
647
648        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
649
650        // Filter for chunks that don't contain 'X' or 'Y'
651        let scan = FilteredScan::new(region.as_slice(), 4, |chunk| {
652            !chunk.contains(&b'X') && !chunk.contains(&b'Y')
653        });
654
655        let matching: Vec<_> = scan.collect();
656
657        assert_eq!(matching.len(), 2);
658        assert_eq!(matching[0], b"ABCD");
659        assert_eq!(matching[1], b"1234");
660    }
661
662    #[test]
663    fn test_range_scanner() {
664        let scanner = RangeScanner::new(100, 4);
665
666        assert_eq!(scanner.total_ranges(), 4);
667
668        let ranges: Vec<_> = scanner.collect();
669
670        assert_eq!(ranges.len(), 4);
671        assert_eq!(ranges[0], 0..25);
672        assert_eq!(ranges[1], 25..50);
673        assert_eq!(ranges[2], 50..75);
674        assert_eq!(ranges[3], 75..100);
675    }
676
677    #[test]
678    fn test_range_scanner_uneven() {
679        let scanner = RangeScanner::new(10, 3);
680
681        let ranges: Vec<_> = scanner.collect();
682
683        // 10 / 3 = 4 (ceiling), so ranges are 0..4, 4..8, 8..10
684        assert_eq!(ranges.len(), 3);
685        assert!(ranges.last().unwrap().end == 10);
686    }
687
688    #[test]
689    fn test_range_scanner_empty() {
690        let scanner = RangeScanner::new(0, 4);
691
692        assert_eq!(scanner.total_ranges(), 0);
693        let ranges: Vec<_> = scanner.collect();
694        assert!(ranges.is_empty());
695    }
696
697    #[test]
698    fn test_parallel_scan_config() {
699        let config = ParallelScanConfig::default();
700
701        assert!(config.num_readers > 0);
702        assert!(config.chunk_size > 0);
703    }
704
705    #[test]
706    fn test_remaining_bytes() {
707        let data = b"AAAABBBBCCCC";
708        let file = create_test_file(data);
709
710        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
711        let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
712
713        assert_eq!(iter.remaining(), 12);
714        iter.next();
715        assert_eq!(iter.remaining(), 8);
716        iter.next();
717        assert_eq!(iter.remaining(), 4);
718        iter.next();
719        assert_eq!(iter.remaining(), 0);
720    }
721
722    #[test]
723    fn test_mmap_with_readahead() {
724        let data = b"Test data for readahead mmap";
725        let file = create_test_file(data);
726
727        let region = MmapRegion::new_with_readahead(&File::open(file.path()).unwrap()).unwrap();
728
729        assert_eq!(region.len(), data.len());
730        assert_eq!(region.as_slice(), data);
731    }
732
733    #[test]
734    fn test_prefetch() {
735        let data = vec![0u8; 1024 * 1024]; // 1MB
736        let file = create_test_file(&data);
737
738        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
739
740        // Prefetch should not crash
741        region.prefetch(0..65536);
742        region.prefetch(65536..131072);
743    }
744}