sochdb_storage/
zero_copy.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Zero-Copy SSTable Iterator
16//!
17//! Provides zero-copy access to edges in memory-mapped SSTables,
18//! avoiding the overhead of copying 128 bytes per edge during iteration.
19//!
20//! ## jj.md Task 6: Zero-Copy Iterator
21//!
22//! Goals:
23//! - 3-5x faster range scans
24//! - Reduced memory bandwidth (128 bytes saved per edge)
25//! - Better CPU cache utilization
26//!
27//! ## Implementation
28//!
29//! Uses `EdgeRef<'a>` to provide lazy field access directly from mmap'd memory:
30//! - No allocation per edge
31//! - No copy of 128 bytes
32//! - Fields parsed on-demand
33//!
34//! ## Memory Bandwidth Savings
35//!
36//! ```text
37//! Before: read 128B (mmap) + write 128B (copy) = 256B per edge
38//! After:  read 128B (mmap) only = 128B per edge
39//! Improvement: 2x memory bandwidth, 3-5x throughput
40//! ```
41
42use byteorder::{ByteOrder, LittleEndian};
43use std::cmp::Ordering;
44
45/// Size of an edge record in bytes
46pub const EDGE_SIZE: usize = 128;
47
48/// A zero-copy reference to an edge stored in memory-mapped data.
49///
50/// This struct provides lazy field access directly from the underlying
51/// byte slice, avoiding the overhead of deserializing into an owned struct.
52///
53/// # Lifetime
54///
55/// The lifetime `'a` is tied to the underlying memory-mapped region.
56/// The `EdgeRef` is only valid as long as the mmap is valid.
57#[derive(Clone, Copy)]
58pub struct EdgeRef<'a> {
59    /// The raw 128-byte edge data
60    bytes: &'a [u8; EDGE_SIZE],
61}
62
63impl<'a> EdgeRef<'a> {
64    /// Create a new EdgeRef from a byte slice.
65    ///
66    /// # Safety
67    ///
68    /// The caller must ensure the slice is exactly 128 bytes and contains
69    /// a valid edge representation.
70    #[inline]
71    pub fn new(bytes: &'a [u8; EDGE_SIZE]) -> Self {
72        Self { bytes }
73    }
74
75    /// Try to create an EdgeRef from a slice (with length check).
76    #[inline]
77    pub fn try_from_slice(bytes: &'a [u8]) -> Option<Self> {
78        if bytes.len() >= EDGE_SIZE {
79            let arr: &[u8; EDGE_SIZE] = bytes[..EDGE_SIZE].try_into().ok()?;
80            Some(Self { bytes: arr })
81        } else {
82            None
83        }
84    }
85
86    /// Get the edge ID (u128 at offset 0).
87    #[inline]
88    pub fn edge_id(&self) -> u128 {
89        LittleEndian::read_u128(&self.bytes[0..16])
90    }
91
92    /// Get the timestamp in microseconds (u64 at offset 16).
93    #[inline]
94    pub fn timestamp_us(&self) -> u64 {
95        LittleEndian::read_u64(&self.bytes[16..24])
96    }
97
98    /// Get the tenant ID (u64 at offset 24).
99    #[inline]
100    pub fn tenant_id(&self) -> u64 {
101        LittleEndian::read_u64(&self.bytes[24..32])
102    }
103
104    /// Get the project ID (u16 at offset 32).
105    #[inline]
106    pub fn project_id(&self) -> u16 {
107        LittleEndian::read_u16(&self.bytes[32..34])
108    }
109
110    /// Get the source node ID (u128 at offset 34).
111    #[inline]
112    pub fn source_node_id(&self) -> u128 {
113        LittleEndian::read_u128(&self.bytes[34..50])
114    }
115
116    /// Get the target node ID (u128 at offset 50).
117    #[inline]
118    pub fn target_node_id(&self) -> u128 {
119        LittleEndian::read_u128(&self.bytes[50..66])
120    }
121
122    /// Get the edge type (u8 at offset 66).
123    #[inline]
124    pub fn edge_type(&self) -> u8 {
125        self.bytes[66]
126    }
127
128    /// Get the flags byte (u8 at offset 67).
129    #[inline]
130    pub fn flags(&self) -> u8 {
131        self.bytes[67]
132    }
133
134    /// Check if this edge is a tombstone (deleted).
135    #[inline]
136    pub fn is_deleted(&self) -> bool {
137        // Assuming bit 0 of flags indicates deletion
138        (self.flags() & 0x01) != 0
139    }
140
141    /// Get the payload reference (hash or inline data at offset 68).
142    #[inline]
143    pub fn payload_ref(&self) -> &[u8] {
144        &self.bytes[68..100]
145    }
146
147    /// Get the checksum (u32 at offset 124).
148    #[inline]
149    pub fn checksum(&self) -> u32 {
150        LittleEndian::read_u32(&self.bytes[124..128])
151    }
152
153    /// Get the raw bytes.
154    #[inline]
155    pub fn as_bytes(&self) -> &[u8; EDGE_SIZE] {
156        self.bytes
157    }
158
159    /// Verify the edge checksum.
160    pub fn verify_checksum(&self) -> bool {
161        // Compute checksum of first 124 bytes
162        let data = &self.bytes[..124];
163        let computed = crate::block_checksum::crc32c(data);
164        computed == self.checksum()
165    }
166}
167
168impl std::fmt::Debug for EdgeRef<'_> {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("EdgeRef")
171            .field("edge_id", &self.edge_id())
172            .field("timestamp_us", &self.timestamp_us())
173            .field("tenant_id", &self.tenant_id())
174            .field("is_deleted", &self.is_deleted())
175            .finish()
176    }
177}
178
179impl PartialEq for EdgeRef<'_> {
180    fn eq(&self, other: &Self) -> bool {
181        self.edge_id() == other.edge_id()
182    }
183}
184
185impl Eq for EdgeRef<'_> {}
186
187impl PartialOrd for EdgeRef<'_> {
188    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
189        Some(self.cmp(other))
190    }
191}
192
193impl Ord for EdgeRef<'_> {
194    fn cmp(&self, other: &Self) -> Ordering {
195        // Primary sort by timestamp, secondary by edge_id
196        match self.timestamp_us().cmp(&other.timestamp_us()) {
197            Ordering::Equal => self.edge_id().cmp(&other.edge_id()),
198            ord => ord,
199        }
200    }
201}
202
203/// Zero-copy iterator over edges in a memory-mapped region.
204///
205/// This iterator yields `EdgeRef` values that point directly into the
206/// underlying mmap, avoiding any copying.
207pub struct ZeroCopyIterator<'a> {
208    /// The memory-mapped data
209    data: &'a [u8],
210    /// Current offset in the data
211    offset: usize,
212    /// End offset (exclusive)
213    end: usize,
214}
215
216impl<'a> ZeroCopyIterator<'a> {
217    /// Create a new zero-copy iterator.
218    ///
219    /// # Arguments
220    /// * `data` - The memory-mapped data
221    /// * `start` - Starting offset (must be aligned to EDGE_SIZE)
222    /// * `end` - Ending offset (exclusive)
223    pub fn new(data: &'a [u8], start: usize, end: usize) -> Self {
224        Self {
225            data,
226            offset: start,
227            end: end.min(data.len()),
228        }
229    }
230
231    /// Create an iterator over the entire data region.
232    pub fn all(data: &'a [u8]) -> Self {
233        Self::new(data, 0, data.len())
234    }
235
236    /// Get the current position.
237    pub fn position(&self) -> usize {
238        self.offset
239    }
240
241    /// Get the remaining byte count.
242    pub fn remaining(&self) -> usize {
243        self.end.saturating_sub(self.offset)
244    }
245
246    /// Get the remaining edge count.
247    pub fn remaining_edges(&self) -> usize {
248        self.remaining() / EDGE_SIZE
249    }
250
251    /// Skip to a specific offset.
252    pub fn seek(&mut self, offset: usize) {
253        self.offset = offset.min(self.end);
254    }
255
256    /// Skip n edges.
257    pub fn skip_edges(&mut self, n: usize) {
258        self.offset = (self.offset + n * EDGE_SIZE).min(self.end);
259    }
260}
261
262impl<'a> Iterator for ZeroCopyIterator<'a> {
263    type Item = EdgeRef<'a>;
264
265    #[inline]
266    fn next(&mut self) -> Option<Self::Item> {
267        if self.offset + EDGE_SIZE > self.end {
268            return None;
269        }
270
271        let bytes: &[u8; EDGE_SIZE] = self.data[self.offset..self.offset + EDGE_SIZE]
272            .try_into()
273            .ok()?;
274
275        self.offset += EDGE_SIZE;
276        Some(EdgeRef::new(bytes))
277    }
278
279    fn size_hint(&self) -> (usize, Option<usize>) {
280        let remaining = self.remaining_edges();
281        (remaining, Some(remaining))
282    }
283}
284
285impl ExactSizeIterator for ZeroCopyIterator<'_> {}
286
287/// Zero-copy iterator with prefetching for better cache performance.
288///
289/// This iterator uses CPU prefetch instructions to bring future edges
290/// into cache before they're accessed.
291pub struct PrefetchingZeroCopyIterator<'a> {
292    inner: ZeroCopyIterator<'a>,
293    prefetch_distance: usize,
294}
295
296impl<'a> PrefetchingZeroCopyIterator<'a> {
297    /// Create a new prefetching iterator.
298    ///
299    /// # Arguments
300    /// * `data` - The memory-mapped data
301    /// * `start` - Starting offset
302    /// * `end` - Ending offset
303    /// * `prefetch_distance` - Number of edges to prefetch ahead (default: 16)
304    pub fn new(data: &'a [u8], start: usize, end: usize, prefetch_distance: usize) -> Self {
305        Self {
306            inner: ZeroCopyIterator::new(data, start, end),
307            prefetch_distance,
308        }
309    }
310
311    /// Create with default prefetch distance (16 edges = 2KB).
312    pub fn with_default_prefetch(data: &'a [u8], start: usize, end: usize) -> Self {
313        Self::new(data, start, end, 16)
314    }
315}
316
317impl<'a> Iterator for PrefetchingZeroCopyIterator<'a> {
318    type Item = EdgeRef<'a>;
319
320    #[inline]
321    fn next(&mut self) -> Option<Self::Item> {
322        // Prefetch ahead
323        let prefetch_offset = self.inner.offset + self.prefetch_distance * EDGE_SIZE;
324        if prefetch_offset < self.inner.end {
325            crate::prefetch::prefetch_ahead(
326                self.inner.data,
327                self.inner.offset,
328                self.prefetch_distance * EDGE_SIZE,
329            );
330        }
331
332        self.inner.next()
333    }
334
335    fn size_hint(&self) -> (usize, Option<usize>) {
336        self.inner.size_hint()
337    }
338}
339
340impl ExactSizeIterator for PrefetchingZeroCopyIterator<'_> {}
341
342/// Range filter for zero-copy iteration.
343///
344/// Wraps an iterator and filters edges by timestamp range.
345pub struct TimestampRangeFilter<'a, I> {
346    inner: I,
347    start_ts: u64,
348    end_ts: u64,
349    _marker: std::marker::PhantomData<&'a ()>,
350}
351
352impl<'a, I: Iterator<Item = EdgeRef<'a>>> TimestampRangeFilter<'a, I> {
353    /// Create a new range filter.
354    pub fn new(inner: I, start_ts: u64, end_ts: u64) -> Self {
355        Self {
356            inner,
357            start_ts,
358            end_ts,
359            _marker: std::marker::PhantomData,
360        }
361    }
362}
363
364impl<'a, I: Iterator<Item = EdgeRef<'a>>> Iterator for TimestampRangeFilter<'a, I> {
365    type Item = EdgeRef<'a>;
366
367    fn next(&mut self) -> Option<Self::Item> {
368        loop {
369            let edge = self.inner.next()?;
370            let ts = edge.timestamp_us();
371
372            if ts > self.end_ts {
373                // Past the end - stop iteration
374                return None;
375            }
376
377            if ts >= self.start_ts {
378                return Some(edge);
379            }
380
381            // Before start - continue to next
382        }
383    }
384}
385
386/// Extension trait for adding zero-copy iteration methods.
387pub trait ZeroCopyExt<'a> {
388    /// Get a zero-copy iterator over the data.
389    fn iter_zero_copy(&'a self) -> ZeroCopyIterator<'a>;
390
391    /// Get a zero-copy iterator with prefetching.
392    fn iter_zero_copy_prefetching(&'a self) -> PrefetchingZeroCopyIterator<'a>;
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    fn create_test_edge(edge_id: u128, timestamp_us: u64) -> [u8; EDGE_SIZE] {
400        let mut bytes = [0u8; EDGE_SIZE];
401
402        // Write edge_id at offset 0
403        LittleEndian::write_u128(&mut bytes[0..16], edge_id);
404        // Write timestamp at offset 16
405        LittleEndian::write_u64(&mut bytes[16..24], timestamp_us);
406        // Write tenant_id at offset 24
407        LittleEndian::write_u64(&mut bytes[24..32], 1);
408        // Write project_id at offset 32
409        LittleEndian::write_u16(&mut bytes[32..34], 1);
410
411        // Compute and write checksum
412        let checksum = crate::block_checksum::crc32c(&bytes[..124]);
413        LittleEndian::write_u32(&mut bytes[124..128], checksum);
414
415        bytes
416    }
417
418    #[test]
419    fn test_edge_ref_basic() {
420        let edge_bytes = create_test_edge(12345, 1000000);
421        let edge_ref = EdgeRef::new(&edge_bytes);
422
423        assert_eq!(edge_ref.edge_id(), 12345);
424        assert_eq!(edge_ref.timestamp_us(), 1000000);
425        assert_eq!(edge_ref.tenant_id(), 1);
426        assert_eq!(edge_ref.project_id(), 1);
427        assert!(!edge_ref.is_deleted());
428    }
429
430    #[test]
431    fn test_edge_ref_checksum() {
432        let edge_bytes = create_test_edge(42, 500000);
433        let edge_ref = EdgeRef::new(&edge_bytes);
434
435        assert!(edge_ref.verify_checksum());
436
437        // Corrupt the data
438        let mut corrupted = edge_bytes;
439        corrupted[10] ^= 0xFF;
440        let corrupted_ref = EdgeRef::new(&corrupted);
441        assert!(!corrupted_ref.verify_checksum());
442    }
443
444    #[test]
445    fn test_zero_copy_iterator() {
446        // Create multiple edges
447        let mut data = Vec::new();
448        for i in 0..10 {
449            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
450        }
451
452        let iter = ZeroCopyIterator::all(&data);
453        assert_eq!(iter.remaining_edges(), 10);
454
455        let edges: Vec<_> = iter.collect();
456        assert_eq!(edges.len(), 10);
457
458        for (i, edge) in edges.iter().enumerate() {
459            assert_eq!(edge.edge_id(), i as u128);
460            assert_eq!(edge.timestamp_us(), i as u64 * 1000);
461        }
462    }
463
464    #[test]
465    fn test_zero_copy_iterator_range() {
466        let mut data = Vec::new();
467        for i in 0..10 {
468            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
469        }
470
471        // Iterate over edges 3-7 (inclusive)
472        let start = 3 * EDGE_SIZE;
473        let end = 8 * EDGE_SIZE;
474        let iter = ZeroCopyIterator::new(&data, start, end);
475
476        let edges: Vec<_> = iter.collect();
477        assert_eq!(edges.len(), 5);
478        assert_eq!(edges[0].edge_id(), 3);
479        assert_eq!(edges[4].edge_id(), 7);
480    }
481
482    #[test]
483    fn test_timestamp_range_filter() {
484        let mut data = Vec::new();
485        for i in 0..10 {
486            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
487        }
488
489        let iter = ZeroCopyIterator::all(&data);
490        let filtered = TimestampRangeFilter::new(iter, 3000, 6000);
491
492        let edges: Vec<_> = filtered.collect();
493        assert_eq!(edges.len(), 4); // timestamps 3000, 4000, 5000, 6000
494        assert_eq!(edges[0].timestamp_us(), 3000);
495        assert_eq!(edges[3].timestamp_us(), 6000);
496    }
497
498    #[test]
499    fn test_edge_ref_ordering() {
500        let edge1_bytes = create_test_edge(1, 1000);
501        let edge2_bytes = create_test_edge(2, 1000);
502        let edge3_bytes = create_test_edge(1, 2000);
503
504        let edge1 = EdgeRef::new(&edge1_bytes);
505        let edge2 = EdgeRef::new(&edge2_bytes);
506        let edge3 = EdgeRef::new(&edge3_bytes);
507
508        // Same timestamp, different edge_id
509        assert!(edge1 < edge2);
510
511        // Different timestamp
512        assert!(edge1 < edge3);
513        assert!(edge2 < edge3);
514    }
515
516    #[test]
517    fn test_seek_and_skip() {
518        let mut data = Vec::new();
519        for i in 0..10 {
520            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
521        }
522
523        let mut iter = ZeroCopyIterator::all(&data);
524
525        // Skip first 3 edges
526        iter.skip_edges(3);
527        assert_eq!(iter.remaining_edges(), 7);
528
529        let edge = iter.next().unwrap();
530        assert_eq!(edge.edge_id(), 3);
531
532        // Seek to edge 7
533        iter.seek(7 * EDGE_SIZE);
534        let edge = iter.next().unwrap();
535        assert_eq!(edge.edge_id(), 7);
536    }
537
538    #[test]
539    fn test_exact_size_iterator() {
540        let mut data = Vec::new();
541        for i in 0..5 {
542            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
543        }
544
545        let iter = ZeroCopyIterator::all(&data);
546        assert_eq!(iter.len(), 5);
547
548        let mut iter = ZeroCopyIterator::all(&data);
549        iter.next();
550        assert_eq!(iter.len(), 4);
551    }
552
553    #[test]
554    fn test_prefetching_iterator() {
555        let mut data = Vec::new();
556        for i in 0..100 {
557            data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
558        }
559
560        let iter = PrefetchingZeroCopyIterator::with_default_prefetch(&data, 0, data.len());
561        let edges: Vec<_> = iter.collect();
562
563        assert_eq!(edges.len(), 100);
564    }
565}