Skip to main content

sochdb_storage/
zero_copy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Zero-Copy SSTable Iterator
19//!
20//! Provides zero-copy access to edges in memory-mapped SSTables,
21//! avoiding the overhead of copying 128 bytes per edge during iteration.
22//!
23//! ## jj.md Task 6: Zero-Copy Iterator
24//!
25//! Goals:
26//! - 3-5x faster range scans
27//! - Reduced memory bandwidth (128 bytes saved per edge)
28//! - Better CPU cache utilization
29//!
30//! ## Implementation
31//!
32//! Uses `EdgeRef<'a>` to provide lazy field access directly from mmap'd memory:
33//! - No allocation per edge
34//! - No copy of 128 bytes
35//! - Fields parsed on-demand
36//!
37//! ## Memory Bandwidth Savings
38//!
39//! ```text
40//! Before: read 128B (mmap) + write 128B (copy) = 256B per edge
41//! After:  read 128B (mmap) only = 128B per edge
42//! Improvement: 2x memory bandwidth, 3-5x throughput
43//! ```
44
45use byteorder::{ByteOrder, LittleEndian};
46use std::cmp::Ordering;
47
48/// Size of an edge record in bytes
49pub const EDGE_SIZE: usize = 128;
50
51/// A zero-copy reference to an edge stored in memory-mapped data.
52///
53/// This struct provides lazy field access directly from the underlying
54/// byte slice, avoiding the overhead of deserializing into an owned struct.
55///
56/// # Lifetime
57///
58/// The lifetime `'a` is tied to the underlying memory-mapped region.
59/// The `EdgeRef` is only valid as long as the mmap is valid.
60#[derive(Clone, Copy)]
61pub struct EdgeRef<'a> {
62    /// The raw 128-byte edge data
63    bytes: &'a [u8; EDGE_SIZE],
64}
65
66impl<'a> EdgeRef<'a> {
67    /// Create a new EdgeRef from a byte slice.
68    ///
69    /// # Safety
70    ///
71    /// The caller must ensure the slice is exactly 128 bytes and contains
72    /// a valid edge representation.
73    #[inline]
74    pub fn new(bytes: &'a [u8; EDGE_SIZE]) -> Self {
75        Self { bytes }
76    }
77
78    /// Try to create an EdgeRef from a slice (with length check).
79    #[inline]
80    pub fn try_from_slice(bytes: &'a [u8]) -> Option<Self> {
81        if bytes.len() >= EDGE_SIZE {
82            let arr: &[u8; EDGE_SIZE] = bytes[..EDGE_SIZE].try_into().ok()?;
83            Some(Self { bytes: arr })
84        } else {
85            None
86        }
87    }
88
89    /// Get the edge ID (u128 at offset 0).
90    #[inline]
91    pub fn edge_id(&self) -> u128 {
92        LittleEndian::read_u128(&self.bytes[0..16])
93    }
94
95    /// Get the timestamp in microseconds (u64 at offset 16).
96    #[inline]
97    pub fn timestamp_us(&self) -> u64 {
98        LittleEndian::read_u64(&self.bytes[16..24])
99    }
100
101    /// Get the tenant ID (u64 at offset 24).
102    #[inline]
103    pub fn tenant_id(&self) -> u64 {
104        LittleEndian::read_u64(&self.bytes[24..32])
105    }
106
107    /// Get the project ID (u16 at offset 32).
108    #[inline]
109    pub fn project_id(&self) -> u16 {
110        LittleEndian::read_u16(&self.bytes[32..34])
111    }
112
113    /// Get the source node ID (u128 at offset 34).
114    #[inline]
115    pub fn source_node_id(&self) -> u128 {
116        LittleEndian::read_u128(&self.bytes[34..50])
117    }
118
119    /// Get the target node ID (u128 at offset 50).
120    #[inline]
121    pub fn target_node_id(&self) -> u128 {
122        LittleEndian::read_u128(&self.bytes[50..66])
123    }
124
125    /// Get the edge type (u8 at offset 66).
126    #[inline]
127    pub fn edge_type(&self) -> u8 {
128        self.bytes[66]
129    }
130
131    /// Get the flags byte (u8 at offset 67).
132    #[inline]
133    pub fn flags(&self) -> u8 {
134        self.bytes[67]
135    }
136
137    /// Check if this edge is a tombstone (deleted).
138    #[inline]
139    pub fn is_deleted(&self) -> bool {
140        // Assuming bit 0 of flags indicates deletion
141        (self.flags() & 0x01) != 0
142    }
143
144    /// Get the payload reference (hash or inline data at offset 68).
145    #[inline]
146    pub fn payload_ref(&self) -> &[u8] {
147        &self.bytes[68..100]
148    }
149
150    /// Get the checksum (u32 at offset 124).
151    #[inline]
152    pub fn checksum(&self) -> u32 {
153        LittleEndian::read_u32(&self.bytes[124..128])
154    }
155
156    /// Get the raw bytes.
157    #[inline]
158    pub fn as_bytes(&self) -> &[u8; EDGE_SIZE] {
159        self.bytes
160    }
161
162    /// Verify the edge checksum.
163    pub fn verify_checksum(&self) -> bool {
164        // Compute checksum of first 124 bytes
165        let data = &self.bytes[..124];
166        let computed = crate::block_checksum::crc32c(data);
167        computed == self.checksum()
168    }
169}
170
171impl std::fmt::Debug for EdgeRef<'_> {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        f.debug_struct("EdgeRef")
174            .field("edge_id", &self.edge_id())
175            .field("timestamp_us", &self.timestamp_us())
176            .field("tenant_id", &self.tenant_id())
177            .field("is_deleted", &self.is_deleted())
178            .finish()
179    }
180}
181
182impl PartialEq for EdgeRef<'_> {
183    fn eq(&self, other: &Self) -> bool {
184        self.edge_id() == other.edge_id()
185    }
186}
187
188impl Eq for EdgeRef<'_> {}
189
190impl PartialOrd for EdgeRef<'_> {
191    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
192        Some(self.cmp(other))
193    }
194}
195
196impl Ord for EdgeRef<'_> {
197    fn cmp(&self, other: &Self) -> Ordering {
198        // Primary sort by timestamp, secondary by edge_id
199        match self.timestamp_us().cmp(&other.timestamp_us()) {
200            Ordering::Equal => self.edge_id().cmp(&other.edge_id()),
201            ord => ord,
202        }
203    }
204}
205
206/// Zero-copy iterator over edges in a memory-mapped region.
207///
208/// This iterator yields `EdgeRef` values that point directly into the
209/// underlying mmap, avoiding any copying.
210pub struct ZeroCopyIterator<'a> {
211    /// The memory-mapped data
212    data: &'a [u8],
213    /// Current offset in the data
214    offset: usize,
215    /// End offset (exclusive)
216    end: usize,
217}
218
219impl<'a> ZeroCopyIterator<'a> {
220    /// Create a new zero-copy iterator.
221    ///
222    /// # Arguments
223    /// * `data` - The memory-mapped data
224    /// * `start` - Starting offset (must be aligned to EDGE_SIZE)
225    /// * `end` - Ending offset (exclusive)
226    pub fn new(data: &'a [u8], start: usize, end: usize) -> Self {
227        Self {
228            data,
229            offset: start,
230            end: end.min(data.len()),
231        }
232    }
233
234    /// Create an iterator over the entire data region.
235    pub fn all(data: &'a [u8]) -> Self {
236        Self::new(data, 0, data.len())
237    }
238
239    /// Get the current position.
240    pub fn position(&self) -> usize {
241        self.offset
242    }
243
244    /// Get the remaining byte count.
245    pub fn remaining(&self) -> usize {
246        self.end.saturating_sub(self.offset)
247    }
248
249    /// Get the remaining edge count.
250    pub fn remaining_edges(&self) -> usize {
251        self.remaining() / EDGE_SIZE
252    }
253
254    /// Skip to a specific offset.
255    pub fn seek(&mut self, offset: usize) {
256        self.offset = offset.min(self.end);
257    }
258
259    /// Skip n edges.
260    pub fn skip_edges(&mut self, n: usize) {
261        self.offset = (self.offset + n * EDGE_SIZE).min(self.end);
262    }
263}
264
265impl<'a> Iterator for ZeroCopyIterator<'a> {
266    type Item = EdgeRef<'a>;
267
268    #[inline]
269    fn next(&mut self) -> Option<Self::Item> {
270        if self.offset + EDGE_SIZE > self.end {
271            return None;
272        }
273
274        let bytes: &[u8; EDGE_SIZE] = self.data[self.offset..self.offset + EDGE_SIZE]
275            .try_into()
276            .ok()?;
277
278        self.offset += EDGE_SIZE;
279        Some(EdgeRef::new(bytes))
280    }
281
282    fn size_hint(&self) -> (usize, Option<usize>) {
283        let remaining = self.remaining_edges();
284        (remaining, Some(remaining))
285    }
286}
287
288impl ExactSizeIterator for ZeroCopyIterator<'_> {}
289
290/// Zero-copy iterator with prefetching for better cache performance.
291///
292/// This iterator uses CPU prefetch instructions to bring future edges
293/// into cache before they're accessed.
294pub struct PrefetchingZeroCopyIterator<'a> {
295    inner: ZeroCopyIterator<'a>,
296    prefetch_distance: usize,
297}
298
299impl<'a> PrefetchingZeroCopyIterator<'a> {
300    /// Create a new prefetching iterator.
301    ///
302    /// # Arguments
303    /// * `data` - The memory-mapped data
304    /// * `start` - Starting offset
305    /// * `end` - Ending offset
306    /// * `prefetch_distance` - Number of edges to prefetch ahead (default: 16)
307    pub fn new(data: &'a [u8], start: usize, end: usize, prefetch_distance: usize) -> Self {
308        Self {
309            inner: ZeroCopyIterator::new(data, start, end),
310            prefetch_distance,
311        }
312    }
313
314    /// Create with default prefetch distance (16 edges = 2KB).
315    pub fn with_default_prefetch(data: &'a [u8], start: usize, end: usize) -> Self {
316        Self::new(data, start, end, 16)
317    }
318}
319
320impl<'a> Iterator for PrefetchingZeroCopyIterator<'a> {
321    type Item = EdgeRef<'a>;
322
323    #[inline]
324    fn next(&mut self) -> Option<Self::Item> {
325        // Prefetch ahead
326        let prefetch_offset = self.inner.offset + self.prefetch_distance * EDGE_SIZE;
327        if prefetch_offset < self.inner.end {
328            crate::prefetch::prefetch_ahead(
329                self.inner.data,
330                self.inner.offset,
331                self.prefetch_distance * EDGE_SIZE,
332            );
333        }
334
335        self.inner.next()
336    }
337
338    fn size_hint(&self) -> (usize, Option<usize>) {
339        self.inner.size_hint()
340    }
341}
342
343impl ExactSizeIterator for PrefetchingZeroCopyIterator<'_> {}
344
345/// Range filter for zero-copy iteration.
346///
347/// Wraps an iterator and filters edges by timestamp range.
348pub struct TimestampRangeFilter<'a, I> {
349    inner: I,
350    start_ts: u64,
351    end_ts: u64,
352    _marker: std::marker::PhantomData<&'a ()>,
353}
354
355impl<'a, I: Iterator<Item = EdgeRef<'a>>> TimestampRangeFilter<'a, I> {
356    /// Create a new range filter.
357    pub fn new(inner: I, start_ts: u64, end_ts: u64) -> Self {
358        Self {
359            inner,
360            start_ts,
361            end_ts,
362            _marker: std::marker::PhantomData,
363        }
364    }
365}
366
367impl<'a, I: Iterator<Item = EdgeRef<'a>>> Iterator for TimestampRangeFilter<'a, I> {
368    type Item = EdgeRef<'a>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        loop {
372            let edge = self.inner.next()?;
373            let ts = edge.timestamp_us();
374
375            if ts > self.end_ts {
376                // Past the end - stop iteration
377                return None;
378            }
379
380            if ts >= self.start_ts {
381                return Some(edge);
382            }
383
384            // Before start - continue to next
385        }
386    }
387}
388
389/// Extension trait for adding zero-copy iteration methods.
390pub trait ZeroCopyExt<'a> {
391    /// Get a zero-copy iterator over the data.
392    fn iter_zero_copy(&'a self) -> ZeroCopyIterator<'a>;
393
394    /// Get a zero-copy iterator with prefetching.
395    fn iter_zero_copy_prefetching(&'a self) -> PrefetchingZeroCopyIterator<'a>;
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    fn create_test_edge(edge_id: u128, timestamp_us: u64) -> [u8; EDGE_SIZE] {
403        let mut bytes = [0u8; EDGE_SIZE];
404
405        // Write edge_id at offset 0
406        LittleEndian::write_u128(&mut bytes[0..16], edge_id);
407        // Write timestamp at offset 16
408        LittleEndian::write_u64(&mut bytes[16..24], timestamp_us);
409        // Write tenant_id at offset 24
410        LittleEndian::write_u64(&mut bytes[24..32], 1);
411        // Write project_id at offset 32
412        LittleEndian::write_u16(&mut bytes[32..34], 1);
413
414        // Compute and write checksum
415        let checksum = crate::block_checksum::crc32c(&bytes[..124]);
416        LittleEndian::write_u32(&mut bytes[124..128], checksum);
417
418        bytes
419    }
420
421    #[test]
422    fn test_edge_ref_basic() {
423        let edge_bytes = create_test_edge(12345, 1000000);
424        let edge_ref = EdgeRef::new(&edge_bytes);
425
426        assert_eq!(edge_ref.edge_id(), 12345);
427        assert_eq!(edge_ref.timestamp_us(), 1000000);
428        assert_eq!(edge_ref.tenant_id(), 1);
429        assert_eq!(edge_ref.project_id(), 1);
430        assert!(!edge_ref.is_deleted());
431    }
432
433    #[test]
434    fn test_edge_ref_checksum() {
435        let edge_bytes = create_test_edge(42, 500000);
436        let edge_ref = EdgeRef::new(&edge_bytes);
437
438        assert!(edge_ref.verify_checksum());
439
440        // Corrupt the data
441        let mut corrupted = edge_bytes;
442        corrupted[10] ^= 0xFF;
443        let corrupted_ref = EdgeRef::new(&corrupted);
444        assert!(!corrupted_ref.verify_checksum());
445    }
446
447    #[test]
448    fn test_zero_copy_iterator() {
449        // Create multiple edges
450        let mut data = Vec::new();
451        for i in 0..10 {
452            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
453        }
454
455        let iter = ZeroCopyIterator::all(&data);
456        assert_eq!(iter.remaining_edges(), 10);
457
458        let edges: Vec<_> = iter.collect();
459        assert_eq!(edges.len(), 10);
460
461        for (i, edge) in edges.iter().enumerate() {
462            assert_eq!(edge.edge_id(), i as u128);
463            assert_eq!(edge.timestamp_us(), i as u64 * 1000);
464        }
465    }
466
467    #[test]
468    fn test_zero_copy_iterator_range() {
469        let mut data = Vec::new();
470        for i in 0..10 {
471            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
472        }
473
474        // Iterate over edges 3-7 (inclusive)
475        let start = 3 * EDGE_SIZE;
476        let end = 8 * EDGE_SIZE;
477        let iter = ZeroCopyIterator::new(&data, start, end);
478
479        let edges: Vec<_> = iter.collect();
480        assert_eq!(edges.len(), 5);
481        assert_eq!(edges[0].edge_id(), 3);
482        assert_eq!(edges[4].edge_id(), 7);
483    }
484
485    #[test]
486    fn test_timestamp_range_filter() {
487        let mut data = Vec::new();
488        for i in 0..10 {
489            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
490        }
491
492        let iter = ZeroCopyIterator::all(&data);
493        let filtered = TimestampRangeFilter::new(iter, 3000, 6000);
494
495        let edges: Vec<_> = filtered.collect();
496        assert_eq!(edges.len(), 4); // timestamps 3000, 4000, 5000, 6000
497        assert_eq!(edges[0].timestamp_us(), 3000);
498        assert_eq!(edges[3].timestamp_us(), 6000);
499    }
500
501    #[test]
502    fn test_edge_ref_ordering() {
503        let edge1_bytes = create_test_edge(1, 1000);
504        let edge2_bytes = create_test_edge(2, 1000);
505        let edge3_bytes = create_test_edge(1, 2000);
506
507        let edge1 = EdgeRef::new(&edge1_bytes);
508        let edge2 = EdgeRef::new(&edge2_bytes);
509        let edge3 = EdgeRef::new(&edge3_bytes);
510
511        // Same timestamp, different edge_id
512        assert!(edge1 < edge2);
513
514        // Different timestamp
515        assert!(edge1 < edge3);
516        assert!(edge2 < edge3);
517    }
518
519    #[test]
520    fn test_seek_and_skip() {
521        let mut data = Vec::new();
522        for i in 0..10 {
523            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
524        }
525
526        let mut iter = ZeroCopyIterator::all(&data);
527
528        // Skip first 3 edges
529        iter.skip_edges(3);
530        assert_eq!(iter.remaining_edges(), 7);
531
532        let edge = iter.next().unwrap();
533        assert_eq!(edge.edge_id(), 3);
534
535        // Seek to edge 7
536        iter.seek(7 * EDGE_SIZE);
537        let edge = iter.next().unwrap();
538        assert_eq!(edge.edge_id(), 7);
539    }
540
541    #[test]
542    fn test_exact_size_iterator() {
543        let mut data = Vec::new();
544        for i in 0..5 {
545            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
546        }
547
548        let iter = ZeroCopyIterator::all(&data);
549        assert_eq!(iter.len(), 5);
550
551        let mut iter = ZeroCopyIterator::all(&data);
552        iter.next();
553        assert_eq!(iter.len(), 4);
554    }
555
556    #[test]
557    fn test_prefetching_iterator() {
558        let mut data = Vec::new();
559        for i in 0..100 {
560            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
561        }
562
563        let iter = PrefetchingZeroCopyIterator::with_default_prefetch(&data, 0, data.len());
564        let edges: Vec<_> = iter.collect();
565
566        assert_eq!(edges.len(), 100);
567    }
568}