Skip to main content

sochdb_core/
zero_copy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Zero-Copy Iterators - mmap-based Scans
19//!
20//! This module provides memory-mapped file access for efficient,
21//! zero-copy iteration over large data files.
22//!
23//! # Design
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │                    Zero-Copy Iterator                           │
28//! │                                                                 │
29//! │  Application                    OS/Kernel                       │
30//! │  ┌────────────┐                 ┌────────────────────┐         │
31//! │  │  Iterator  │                 │   Page Cache       │         │
32//! │  │            │                 │  ┌──────────────┐  │         │
33//! │  │  &[u8] ────┼─────────────────┼─▶│ Data Pages  │  │         │
34//! │  │            │    mmap         │  └──────────────┘  │         │
35//! │  └────────────┘                 │         ↑         │         │
36//! │        │                        │         │         │         │
37//! │        │ No copy!               │    ┌────┴────┐    │         │
38//! │        ▼                        │    │  Disk   │    │         │
39//! │  Process data                   │    └─────────┘    │         │
40//! │  in-place                       └────────────────────┘         │
41//! │                                                                 │
42//! │  Benefits:                                                      │
43//! │  • No buffer copies                                             │
44//! │  • OS manages page faults                                       │
45//! │  • Efficient for sequential scans                               │
46//! │  • Read-ahead by OS                                             │
47//! └─────────────────────────────────────────────────────────────────┘
48//! ```
49//!
50//! # Safety
51//!
52//! Memory-mapped regions can become invalid if the underlying file is
53//! modified. This implementation provides:
54//! - Read-only mappings by default
55//! - Length validation
56//! - Guard types for safe access
57
58use std::fs::File;
59use std::io;
60use std::ops::Range;
61use std::path::Path;
62use std::sync::Arc;
63use std::sync::atomic::{AtomicU64, Ordering};
64
65#[cfg(unix)]
66use std::os::unix::io::AsRawFd;
67
68/// Memory-mapped region
69pub struct MmapRegion {
70    /// Pointer to mapped memory
71    ptr: *const u8,
72    /// Length of mapped region
73    len: usize,
74    /// File descriptor (for cleanup)
75    #[cfg(unix)]
76    _fd: i32,
77}
78
79// Safety: MmapRegion is Send/Sync because we only allow read-only access
80// and the underlying mapping is immutable
81unsafe impl Send for MmapRegion {}
82unsafe impl Sync for MmapRegion {}
83
84impl MmapRegion {
85    /// Create a new memory-mapped region for a file
86    #[cfg(unix)]
87    pub fn new(file: &File) -> io::Result<Self> {
88        use std::ptr;
89
90        let metadata = file.metadata()?;
91        let len = metadata.len() as usize;
92
93        if len == 0 {
94            return Ok(Self {
95                ptr: ptr::null(),
96                len: 0,
97                _fd: file.as_raw_fd(),
98            });
99        }
100
101        // SAFETY: We're creating a read-only mapping of a valid file
102        let ptr = unsafe {
103            libc::mmap(
104                ptr::null_mut(),
105                len,
106                libc::PROT_READ,
107                libc::MAP_PRIVATE,
108                file.as_raw_fd(),
109                0,
110            )
111        };
112
113        if ptr == libc::MAP_FAILED {
114            return Err(io::Error::last_os_error());
115        }
116
117        Ok(Self {
118            ptr: ptr as *const u8,
119            len,
120            _fd: file.as_raw_fd(),
121        })
122    }
123
124    /// Create with advisory read-ahead hint
125    #[cfg(unix)]
126    pub fn new_with_readahead(file: &File) -> io::Result<Self> {
127        let region = Self::new(file)?;
128
129        if region.len > 0 {
130            // Advise kernel for sequential access
131            unsafe {
132                libc::madvise(
133                    region.ptr as *mut libc::c_void,
134                    region.len,
135                    libc::MADV_SEQUENTIAL,
136                );
137            }
138        }
139
140        Ok(region)
141    }
142
143    /// Fallback for non-Unix systems (reads entire file into memory)
144    #[cfg(not(unix))]
145    pub fn new(file: &File) -> io::Result<Self> {
146        use std::io::Read;
147        let mut file = file;
148        let mut buffer = Vec::new();
149        file.read_to_end(&mut buffer)?;
150
151        let len = buffer.len();
152        let ptr = Box::into_raw(buffer.into_boxed_slice()) as *const u8;
153
154        Ok(Self { ptr, len })
155    }
156
157    #[cfg(not(unix))]
158    pub fn new_with_readahead(file: &File) -> io::Result<Self> {
159        Self::new(file)
160    }
161
162    /// Get the length of the mapped region
163    pub fn len(&self) -> usize {
164        self.len
165    }
166
167    /// Check if empty
168    pub fn is_empty(&self) -> bool {
169        self.len == 0
170    }
171
172    /// Get a slice of the mapped region
173    ///
174    /// # Safety
175    /// The returned slice is only valid as long as the MmapRegion exists
176    /// and the underlying file is not modified.
177    pub fn as_slice(&self) -> &[u8] {
178        if self.ptr.is_null() || self.len == 0 {
179            return &[];
180        }
181        // SAFETY: ptr is valid for len bytes, read-only, and properly aligned
182        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
183    }
184
185    /// Get a subslice of the mapped region
186    pub fn slice(&self, range: Range<usize>) -> Option<&[u8]> {
187        if range.end > self.len {
188            return None;
189        }
190        Some(&self.as_slice()[range])
191    }
192
193    /// Prefetch a range of data
194    #[cfg(unix)]
195    pub fn prefetch(&self, range: Range<usize>) {
196        if range.start >= self.len || self.ptr.is_null() {
197            return;
198        }
199
200        let end = range.end.min(self.len);
201        let ptr = unsafe { self.ptr.add(range.start) };
202        let len = end - range.start;
203
204        unsafe {
205            libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_WILLNEED);
206        }
207    }
208
209    #[cfg(not(unix))]
210    pub fn prefetch(&self, _range: Range<usize>) {
211        // No-op on non-Unix
212    }
213}
214
215impl Drop for MmapRegion {
216    #[cfg(unix)]
217    fn drop(&mut self) {
218        if !self.ptr.is_null() && self.len > 0 {
219            unsafe {
220                libc::munmap(self.ptr as *mut libc::c_void, self.len);
221            }
222        }
223    }
224
225    #[cfg(not(unix))]
226    fn drop(&mut self) {
227        if !self.ptr.is_null() && self.len > 0 {
228            // Reconstruct and drop the boxed slice
229            unsafe {
230                let slice = std::slice::from_raw_parts_mut(self.ptr as *mut u8, self.len);
231                drop(Box::from_raw(slice));
232            }
233        }
234    }
235}
236
237/// Zero-copy iterator over chunks of mapped memory
238pub struct ZeroCopyIterator<'a> {
239    /// The mapped region
240    data: &'a [u8],
241    /// Current position
242    pos: usize,
243    /// Chunk size
244    chunk_size: usize,
245    /// Statistics
246    stats: Arc<IteratorStats>,
247}
248
249impl<'a> ZeroCopyIterator<'a> {
250    /// Create new iterator over mapped region
251    pub fn new(data: &'a [u8], chunk_size: usize) -> Self {
252        Self {
253            data,
254            pos: 0,
255            chunk_size,
256            stats: Arc::new(IteratorStats::default()),
257        }
258    }
259
260    /// Create with shared statistics
261    pub fn with_stats(data: &'a [u8], chunk_size: usize, stats: Arc<IteratorStats>) -> Self {
262        Self {
263            data,
264            pos: 0,
265            chunk_size,
266            stats,
267        }
268    }
269
270    /// Get remaining bytes
271    pub fn remaining(&self) -> usize {
272        self.data.len().saturating_sub(self.pos)
273    }
274
275    /// Seek to position
276    pub fn seek(&mut self, pos: usize) -> bool {
277        if pos <= self.data.len() {
278            self.pos = pos;
279            true
280        } else {
281            false
282        }
283    }
284
285    /// Get statistics
286    pub fn stats(&self) -> &IteratorStats {
287        &self.stats
288    }
289}
290
291impl<'a> Iterator for ZeroCopyIterator<'a> {
292    type Item = &'a [u8];
293
294    fn next(&mut self) -> Option<Self::Item> {
295        if self.pos >= self.data.len() {
296            return None;
297        }
298
299        let start = self.pos;
300        let end = (start + self.chunk_size).min(self.data.len());
301        self.pos = end;
302
303        self.stats.chunks_read.fetch_add(1, Ordering::Relaxed);
304        self.stats
305            .bytes_read
306            .fetch_add((end - start) as u64, Ordering::Relaxed);
307
308        Some(&self.data[start..end])
309    }
310}
311
312/// Iterator statistics
313#[derive(Debug, Default)]
314pub struct IteratorStats {
315    pub chunks_read: AtomicU64,
316    pub bytes_read: AtomicU64,
317    pub seeks: AtomicU64,
318}
319
320impl IteratorStats {
321    pub fn snapshot(&self) -> IteratorStatsSnapshot {
322        IteratorStatsSnapshot {
323            chunks_read: self.chunks_read.load(Ordering::Relaxed),
324            bytes_read: self.bytes_read.load(Ordering::Relaxed),
325            seeks: self.seeks.load(Ordering::Relaxed),
326        }
327    }
328}
329
330#[derive(Debug, Clone)]
331pub struct IteratorStatsSnapshot {
332    pub chunks_read: u64,
333    pub bytes_read: u64,
334    pub seeks: u64,
335}
336
337/// Block-aware iterator that parses block headers
338pub struct BlockIterator<'a> {
339    /// Raw data iterator
340    inner: ZeroCopyIterator<'a>,
341    /// Current block index
342    block_index: usize,
343}
344
345impl<'a> BlockIterator<'a> {
346    pub fn new(data: &'a [u8], block_size: usize) -> Self {
347        Self {
348            inner: ZeroCopyIterator::new(data, block_size),
349            block_index: 0,
350        }
351    }
352
353    /// Get current block index
354    pub fn block_index(&self) -> usize {
355        self.block_index
356    }
357
358    /// Skip to specific block
359    pub fn skip_to_block(&mut self, index: usize) -> bool {
360        let pos = index * self.inner.chunk_size;
361        if self.inner.seek(pos) {
362            self.block_index = index;
363            true
364        } else {
365            false
366        }
367    }
368}
369
370impl<'a> Iterator for BlockIterator<'a> {
371    type Item = (usize, &'a [u8]);
372
373    fn next(&mut self) -> Option<Self::Item> {
374        let block = self.inner.next()?;
375        let index = self.block_index;
376        self.block_index += 1;
377        Some((index, block))
378    }
379}
380
381/// Scanned region with optional filtering
382pub struct FilteredScan<'a, F>
383where
384    F: Fn(&[u8]) -> bool,
385{
386    inner: ZeroCopyIterator<'a>,
387    predicate: F,
388}
389
390impl<'a, F> FilteredScan<'a, F>
391where
392    F: Fn(&[u8]) -> bool,
393{
394    pub fn new(data: &'a [u8], chunk_size: usize, predicate: F) -> Self {
395        Self {
396            inner: ZeroCopyIterator::new(data, chunk_size),
397            predicate,
398        }
399    }
400}
401
402impl<'a, F> Iterator for FilteredScan<'a, F>
403where
404    F: Fn(&[u8]) -> bool,
405{
406    type Item = &'a [u8];
407
408    fn next(&mut self) -> Option<Self::Item> {
409        loop {
410            let chunk = self.inner.next()?;
411            if (self.predicate)(chunk) {
412                return Some(chunk);
413            }
414        }
415    }
416}
417
418/// Parallel scan configuration
419#[derive(Debug, Clone)]
420pub struct ParallelScanConfig {
421    /// Number of parallel readers
422    pub num_readers: usize,
423    /// Chunk size per reader
424    pub chunk_size: usize,
425    /// Prefetch distance (in chunks)
426    pub prefetch_distance: usize,
427}
428
429impl Default for ParallelScanConfig {
430    fn default() -> Self {
431        Self {
432            num_readers: 4,
433            chunk_size: 64 * 1024, // 64KB
434            prefetch_distance: 2,
435        }
436    }
437}
438
439/// Range scanner for parallel processing
440pub struct RangeScanner {
441    /// Total length
442    total_len: usize,
443    /// Range size
444    range_size: usize,
445    /// Current range index
446    current: usize,
447    /// Total ranges
448    total_ranges: usize,
449}
450
451impl RangeScanner {
452    /// Create scanner that divides data into N ranges
453    pub fn new(total_len: usize, num_ranges: usize) -> Self {
454        let range_size = total_len.div_ceil(num_ranges.max(1));
455        let total_ranges = if total_len > 0 {
456            total_len.div_ceil(range_size)
457        } else {
458            0
459        };
460
461        Self {
462            total_len,
463            range_size,
464            current: 0,
465            total_ranges,
466        }
467    }
468
469    /// Get range for a specific index
470    pub fn range(&self, index: usize) -> Option<Range<usize>> {
471        if index >= self.total_ranges {
472            return None;
473        }
474
475        let start = index * self.range_size;
476        let end = ((index + 1) * self.range_size).min(self.total_len);
477
478        Some(start..end)
479    }
480
481    /// Get total number of ranges
482    pub fn total_ranges(&self) -> usize {
483        self.total_ranges
484    }
485}
486
487impl Iterator for RangeScanner {
488    type Item = Range<usize>;
489
490    fn next(&mut self) -> Option<Self::Item> {
491        let range = self.range(self.current)?;
492        self.current += 1;
493        Some(range)
494    }
495}
496
497/// Open a file for zero-copy scanning
498pub fn open_for_scan(path: impl AsRef<Path>) -> io::Result<MmapRegion> {
499    let file = File::open(path)?;
500    MmapRegion::new_with_readahead(&file)
501}
502
503/// Create an iterator over a file
504pub fn scan_file(path: impl AsRef<Path>, chunk_size: usize) -> io::Result<(MmapRegion, usize)> {
505    let region = open_for_scan(path)?;
506    Ok((region, chunk_size))
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use std::io::Write;
513    use tempfile::NamedTempFile;
514
515    fn create_test_file(data: &[u8]) -> NamedTempFile {
516        let mut file = NamedTempFile::new().unwrap();
517        file.write_all(data).unwrap();
518        file.flush().unwrap();
519        file
520    }
521
522    #[test]
523    fn test_mmap_region_basic() {
524        let data = b"Hello, World! This is test data for mmap.";
525        let file = create_test_file(data);
526
527        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
528
529        assert_eq!(region.len(), data.len());
530        assert_eq!(region.as_slice(), data);
531    }
532
533    #[test]
534    fn test_mmap_empty_file() {
535        let file = create_test_file(b"");
536
537        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
538
539        assert!(region.is_empty());
540        assert_eq!(region.as_slice(), &[] as &[u8]);
541    }
542
543    #[test]
544    fn test_mmap_slice() {
545        let data = b"0123456789ABCDEF";
546        let file = create_test_file(data);
547
548        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
549
550        assert_eq!(region.slice(0..4), Some(&b"0123"[..]));
551        assert_eq!(region.slice(4..8), Some(&b"4567"[..]));
552        assert_eq!(region.slice(0..100), None);
553    }
554
555    #[test]
556    fn test_zero_copy_iterator() {
557        let data = b"AAAABBBBCCCCDDDD";
558        let file = create_test_file(data);
559
560        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
561        let iter = ZeroCopyIterator::new(region.as_slice(), 4);
562
563        let chunks: Vec<_> = iter.collect();
564
565        assert_eq!(chunks.len(), 4);
566        assert_eq!(chunks[0], b"AAAA");
567        assert_eq!(chunks[1], b"BBBB");
568        assert_eq!(chunks[2], b"CCCC");
569        assert_eq!(chunks[3], b"DDDD");
570    }
571
572    #[test]
573    fn test_iterator_uneven_chunks() {
574        let data = b"AAABBBCC";
575        let file = create_test_file(data);
576
577        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
578        let iter = ZeroCopyIterator::new(region.as_slice(), 3);
579
580        let chunks: Vec<_> = iter.collect();
581
582        assert_eq!(chunks.len(), 3);
583        assert_eq!(chunks[0], b"AAA");
584        assert_eq!(chunks[1], b"BBB");
585        assert_eq!(chunks[2], b"CC");
586    }
587
588    #[test]
589    fn test_iterator_stats() {
590        let data = b"AAAABBBBCCCC";
591        let file = create_test_file(data);
592
593        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
594        let iter = ZeroCopyIterator::new(region.as_slice(), 4);
595        let stats = Arc::clone(&iter.stats);
596
597        let _chunks: Vec<_> = iter.collect();
598
599        let snapshot = stats.snapshot();
600        assert_eq!(snapshot.chunks_read, 3);
601        assert_eq!(snapshot.bytes_read, 12);
602    }
603
604    #[test]
605    fn test_iterator_seek() {
606        let data = b"AAAABBBBCCCCDDDD";
607        let file = create_test_file(data);
608
609        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
610        let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
611
612        assert!(iter.seek(8));
613        assert_eq!(iter.next(), Some(&b"CCCC"[..]));
614
615        assert!(!iter.seek(100));
616    }
617
618    #[test]
619    fn test_block_iterator() {
620        let data = b"BLK1BLK2BLK3";
621        let file = create_test_file(data);
622
623        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
624        let iter = BlockIterator::new(region.as_slice(), 4);
625
626        let blocks: Vec<_> = iter.collect();
627
628        assert_eq!(blocks.len(), 3);
629        assert_eq!(blocks[0], (0, &b"BLK1"[..]));
630        assert_eq!(blocks[1], (1, &b"BLK2"[..]));
631        assert_eq!(blocks[2], (2, &b"BLK3"[..]));
632    }
633
634    #[test]
635    fn test_block_iterator_skip() {
636        let data = b"BLK1BLK2BLK3BLK4";
637        let file = create_test_file(data);
638
639        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
640        let mut iter = BlockIterator::new(region.as_slice(), 4);
641
642        assert!(iter.skip_to_block(2));
643        assert_eq!(iter.next(), Some((2, &b"BLK3"[..])));
644    }
645
646    #[test]
647    fn test_filtered_scan() {
648        let data = b"ABCDXXXXYYYY1234";
649        let file = create_test_file(data);
650
651        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
652
653        // Filter for chunks that don't contain 'X' or 'Y'
654        let scan = FilteredScan::new(region.as_slice(), 4, |chunk| {
655            !chunk.contains(&b'X') && !chunk.contains(&b'Y')
656        });
657
658        let matching: Vec<_> = scan.collect();
659
660        assert_eq!(matching.len(), 2);
661        assert_eq!(matching[0], b"ABCD");
662        assert_eq!(matching[1], b"1234");
663    }
664
665    #[test]
666    fn test_range_scanner() {
667        let scanner = RangeScanner::new(100, 4);
668
669        assert_eq!(scanner.total_ranges(), 4);
670
671        let ranges: Vec<_> = scanner.collect();
672
673        assert_eq!(ranges.len(), 4);
674        assert_eq!(ranges[0], 0..25);
675        assert_eq!(ranges[1], 25..50);
676        assert_eq!(ranges[2], 50..75);
677        assert_eq!(ranges[3], 75..100);
678    }
679
680    #[test]
681    fn test_range_scanner_uneven() {
682        let scanner = RangeScanner::new(10, 3);
683
684        let ranges: Vec<_> = scanner.collect();
685
686        // 10 / 3 = 4 (ceiling), so ranges are 0..4, 4..8, 8..10
687        assert_eq!(ranges.len(), 3);
688        assert!(ranges.last().unwrap().end == 10);
689    }
690
691    #[test]
692    fn test_range_scanner_empty() {
693        let scanner = RangeScanner::new(0, 4);
694
695        assert_eq!(scanner.total_ranges(), 0);
696        let ranges: Vec<_> = scanner.collect();
697        assert!(ranges.is_empty());
698    }
699
700    #[test]
701    fn test_parallel_scan_config() {
702        let config = ParallelScanConfig::default();
703
704        assert!(config.num_readers > 0);
705        assert!(config.chunk_size > 0);
706    }
707
708    #[test]
709    fn test_remaining_bytes() {
710        let data = b"AAAABBBBCCCC";
711        let file = create_test_file(data);
712
713        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
714        let mut iter = ZeroCopyIterator::new(region.as_slice(), 4);
715
716        assert_eq!(iter.remaining(), 12);
717        iter.next();
718        assert_eq!(iter.remaining(), 8);
719        iter.next();
720        assert_eq!(iter.remaining(), 4);
721        iter.next();
722        assert_eq!(iter.remaining(), 0);
723    }
724
725    #[test]
726    fn test_mmap_with_readahead() {
727        let data = b"Test data for readahead mmap";
728        let file = create_test_file(data);
729
730        let region = MmapRegion::new_with_readahead(&File::open(file.path()).unwrap()).unwrap();
731
732        assert_eq!(region.len(), data.len());
733        assert_eq!(region.as_slice(), data);
734    }
735
736    #[test]
737    fn test_prefetch() {
738        let data = vec![0u8; 1024 * 1024]; // 1MB
739        let file = create_test_file(&data);
740
741        let region = MmapRegion::new(&File::open(file.path()).unwrap()).unwrap();
742
743        // Prefetch should not crash
744        region.prefetch(0..65536);
745        region.prefetch(65536..131072);
746    }
747}