1use byteorder::{ByteOrder, LittleEndian};
43use std::cmp::Ordering;
44
45pub const EDGE_SIZE: usize = 128;
47
48#[derive(Clone, Copy)]
58pub struct EdgeRef<'a> {
59 bytes: &'a [u8; EDGE_SIZE],
61}
62
63impl<'a> EdgeRef<'a> {
64 #[inline]
71 pub fn new(bytes: &'a [u8; EDGE_SIZE]) -> Self {
72 Self { bytes }
73 }
74
75 #[inline]
77 pub fn try_from_slice(bytes: &'a [u8]) -> Option<Self> {
78 if bytes.len() >= EDGE_SIZE {
79 let arr: &[u8; EDGE_SIZE] = bytes[..EDGE_SIZE].try_into().ok()?;
80 Some(Self { bytes: arr })
81 } else {
82 None
83 }
84 }
85
86 #[inline]
88 pub fn edge_id(&self) -> u128 {
89 LittleEndian::read_u128(&self.bytes[0..16])
90 }
91
92 #[inline]
94 pub fn timestamp_us(&self) -> u64 {
95 LittleEndian::read_u64(&self.bytes[16..24])
96 }
97
98 #[inline]
100 pub fn tenant_id(&self) -> u64 {
101 LittleEndian::read_u64(&self.bytes[24..32])
102 }
103
104 #[inline]
106 pub fn project_id(&self) -> u16 {
107 LittleEndian::read_u16(&self.bytes[32..34])
108 }
109
110 #[inline]
112 pub fn source_node_id(&self) -> u128 {
113 LittleEndian::read_u128(&self.bytes[34..50])
114 }
115
116 #[inline]
118 pub fn target_node_id(&self) -> u128 {
119 LittleEndian::read_u128(&self.bytes[50..66])
120 }
121
122 #[inline]
124 pub fn edge_type(&self) -> u8 {
125 self.bytes[66]
126 }
127
128 #[inline]
130 pub fn flags(&self) -> u8 {
131 self.bytes[67]
132 }
133
134 #[inline]
136 pub fn is_deleted(&self) -> bool {
137 (self.flags() & 0x01) != 0
139 }
140
141 #[inline]
143 pub fn payload_ref(&self) -> &[u8] {
144 &self.bytes[68..100]
145 }
146
147 #[inline]
149 pub fn checksum(&self) -> u32 {
150 LittleEndian::read_u32(&self.bytes[124..128])
151 }
152
153 #[inline]
155 pub fn as_bytes(&self) -> &[u8; EDGE_SIZE] {
156 self.bytes
157 }
158
159 pub fn verify_checksum(&self) -> bool {
161 let data = &self.bytes[..124];
163 let computed = crate::block_checksum::crc32c(data);
164 computed == self.checksum()
165 }
166}
167
168impl std::fmt::Debug for EdgeRef<'_> {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("EdgeRef")
171 .field("edge_id", &self.edge_id())
172 .field("timestamp_us", &self.timestamp_us())
173 .field("tenant_id", &self.tenant_id())
174 .field("is_deleted", &self.is_deleted())
175 .finish()
176 }
177}
178
179impl PartialEq for EdgeRef<'_> {
180 fn eq(&self, other: &Self) -> bool {
181 self.edge_id() == other.edge_id()
182 }
183}
184
185impl Eq for EdgeRef<'_> {}
186
187impl PartialOrd for EdgeRef<'_> {
188 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
189 Some(self.cmp(other))
190 }
191}
192
193impl Ord for EdgeRef<'_> {
194 fn cmp(&self, other: &Self) -> Ordering {
195 match self.timestamp_us().cmp(&other.timestamp_us()) {
197 Ordering::Equal => self.edge_id().cmp(&other.edge_id()),
198 ord => ord,
199 }
200 }
201}
202
203pub struct ZeroCopyIterator<'a> {
208 data: &'a [u8],
210 offset: usize,
212 end: usize,
214}
215
216impl<'a> ZeroCopyIterator<'a> {
217 pub fn new(data: &'a [u8], start: usize, end: usize) -> Self {
224 Self {
225 data,
226 offset: start,
227 end: end.min(data.len()),
228 }
229 }
230
231 pub fn all(data: &'a [u8]) -> Self {
233 Self::new(data, 0, data.len())
234 }
235
236 pub fn position(&self) -> usize {
238 self.offset
239 }
240
241 pub fn remaining(&self) -> usize {
243 self.end.saturating_sub(self.offset)
244 }
245
246 pub fn remaining_edges(&self) -> usize {
248 self.remaining() / EDGE_SIZE
249 }
250
251 pub fn seek(&mut self, offset: usize) {
253 self.offset = offset.min(self.end);
254 }
255
256 pub fn skip_edges(&mut self, n: usize) {
258 self.offset = (self.offset + n * EDGE_SIZE).min(self.end);
259 }
260}
261
262impl<'a> Iterator for ZeroCopyIterator<'a> {
263 type Item = EdgeRef<'a>;
264
265 #[inline]
266 fn next(&mut self) -> Option<Self::Item> {
267 if self.offset + EDGE_SIZE > self.end {
268 return None;
269 }
270
271 let bytes: &[u8; EDGE_SIZE] = self.data[self.offset..self.offset + EDGE_SIZE]
272 .try_into()
273 .ok()?;
274
275 self.offset += EDGE_SIZE;
276 Some(EdgeRef::new(bytes))
277 }
278
279 fn size_hint(&self) -> (usize, Option<usize>) {
280 let remaining = self.remaining_edges();
281 (remaining, Some(remaining))
282 }
283}
284
285impl ExactSizeIterator for ZeroCopyIterator<'_> {}
286
287pub struct PrefetchingZeroCopyIterator<'a> {
292 inner: ZeroCopyIterator<'a>,
293 prefetch_distance: usize,
294}
295
296impl<'a> PrefetchingZeroCopyIterator<'a> {
297 pub fn new(data: &'a [u8], start: usize, end: usize, prefetch_distance: usize) -> Self {
305 Self {
306 inner: ZeroCopyIterator::new(data, start, end),
307 prefetch_distance,
308 }
309 }
310
311 pub fn with_default_prefetch(data: &'a [u8], start: usize, end: usize) -> Self {
313 Self::new(data, start, end, 16)
314 }
315}
316
317impl<'a> Iterator for PrefetchingZeroCopyIterator<'a> {
318 type Item = EdgeRef<'a>;
319
320 #[inline]
321 fn next(&mut self) -> Option<Self::Item> {
322 let prefetch_offset = self.inner.offset + self.prefetch_distance * EDGE_SIZE;
324 if prefetch_offset < self.inner.end {
325 crate::prefetch::prefetch_ahead(
326 self.inner.data,
327 self.inner.offset,
328 self.prefetch_distance * EDGE_SIZE,
329 );
330 }
331
332 self.inner.next()
333 }
334
335 fn size_hint(&self) -> (usize, Option<usize>) {
336 self.inner.size_hint()
337 }
338}
339
340impl ExactSizeIterator for PrefetchingZeroCopyIterator<'_> {}
341
342pub struct TimestampRangeFilter<'a, I> {
346 inner: I,
347 start_ts: u64,
348 end_ts: u64,
349 _marker: std::marker::PhantomData<&'a ()>,
350}
351
352impl<'a, I: Iterator<Item = EdgeRef<'a>>> TimestampRangeFilter<'a, I> {
353 pub fn new(inner: I, start_ts: u64, end_ts: u64) -> Self {
355 Self {
356 inner,
357 start_ts,
358 end_ts,
359 _marker: std::marker::PhantomData,
360 }
361 }
362}
363
364impl<'a, I: Iterator<Item = EdgeRef<'a>>> Iterator for TimestampRangeFilter<'a, I> {
365 type Item = EdgeRef<'a>;
366
367 fn next(&mut self) -> Option<Self::Item> {
368 loop {
369 let edge = self.inner.next()?;
370 let ts = edge.timestamp_us();
371
372 if ts > self.end_ts {
373 return None;
375 }
376
377 if ts >= self.start_ts {
378 return Some(edge);
379 }
380
381 }
383 }
384}
385
386pub trait ZeroCopyExt<'a> {
388 fn iter_zero_copy(&'a self) -> ZeroCopyIterator<'a>;
390
391 fn iter_zero_copy_prefetching(&'a self) -> PrefetchingZeroCopyIterator<'a>;
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 fn create_test_edge(edge_id: u128, timestamp_us: u64) -> [u8; EDGE_SIZE] {
400 let mut bytes = [0u8; EDGE_SIZE];
401
402 LittleEndian::write_u128(&mut bytes[0..16], edge_id);
404 LittleEndian::write_u64(&mut bytes[16..24], timestamp_us);
406 LittleEndian::write_u64(&mut bytes[24..32], 1);
408 LittleEndian::write_u16(&mut bytes[32..34], 1);
410
411 let checksum = crate::block_checksum::crc32c(&bytes[..124]);
413 LittleEndian::write_u32(&mut bytes[124..128], checksum);
414
415 bytes
416 }
417
418 #[test]
419 fn test_edge_ref_basic() {
420 let edge_bytes = create_test_edge(12345, 1000000);
421 let edge_ref = EdgeRef::new(&edge_bytes);
422
423 assert_eq!(edge_ref.edge_id(), 12345);
424 assert_eq!(edge_ref.timestamp_us(), 1000000);
425 assert_eq!(edge_ref.tenant_id(), 1);
426 assert_eq!(edge_ref.project_id(), 1);
427 assert!(!edge_ref.is_deleted());
428 }
429
430 #[test]
431 fn test_edge_ref_checksum() {
432 let edge_bytes = create_test_edge(42, 500000);
433 let edge_ref = EdgeRef::new(&edge_bytes);
434
435 assert!(edge_ref.verify_checksum());
436
437 let mut corrupted = edge_bytes;
439 corrupted[10] ^= 0xFF;
440 let corrupted_ref = EdgeRef::new(&corrupted);
441 assert!(!corrupted_ref.verify_checksum());
442 }
443
444 #[test]
445 fn test_zero_copy_iterator() {
446 let mut data = Vec::new();
448 for i in 0..10 {
449 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
450 }
451
452 let iter = ZeroCopyIterator::all(&data);
453 assert_eq!(iter.remaining_edges(), 10);
454
455 let edges: Vec<_> = iter.collect();
456 assert_eq!(edges.len(), 10);
457
458 for (i, edge) in edges.iter().enumerate() {
459 assert_eq!(edge.edge_id(), i as u128);
460 assert_eq!(edge.timestamp_us(), i as u64 * 1000);
461 }
462 }
463
464 #[test]
465 fn test_zero_copy_iterator_range() {
466 let mut data = Vec::new();
467 for i in 0..10 {
468 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
469 }
470
471 let start = 3 * EDGE_SIZE;
473 let end = 8 * EDGE_SIZE;
474 let iter = ZeroCopyIterator::new(&data, start, end);
475
476 let edges: Vec<_> = iter.collect();
477 assert_eq!(edges.len(), 5);
478 assert_eq!(edges[0].edge_id(), 3);
479 assert_eq!(edges[4].edge_id(), 7);
480 }
481
482 #[test]
483 fn test_timestamp_range_filter() {
484 let mut data = Vec::new();
485 for i in 0..10 {
486 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
487 }
488
489 let iter = ZeroCopyIterator::all(&data);
490 let filtered = TimestampRangeFilter::new(iter, 3000, 6000);
491
492 let edges: Vec<_> = filtered.collect();
493 assert_eq!(edges.len(), 4); assert_eq!(edges[0].timestamp_us(), 3000);
495 assert_eq!(edges[3].timestamp_us(), 6000);
496 }
497
498 #[test]
499 fn test_edge_ref_ordering() {
500 let edge1_bytes = create_test_edge(1, 1000);
501 let edge2_bytes = create_test_edge(2, 1000);
502 let edge3_bytes = create_test_edge(1, 2000);
503
504 let edge1 = EdgeRef::new(&edge1_bytes);
505 let edge2 = EdgeRef::new(&edge2_bytes);
506 let edge3 = EdgeRef::new(&edge3_bytes);
507
508 assert!(edge1 < edge2);
510
511 assert!(edge1 < edge3);
513 assert!(edge2 < edge3);
514 }
515
516 #[test]
517 fn test_seek_and_skip() {
518 let mut data = Vec::new();
519 for i in 0..10 {
520 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
521 }
522
523 let mut iter = ZeroCopyIterator::all(&data);
524
525 iter.skip_edges(3);
527 assert_eq!(iter.remaining_edges(), 7);
528
529 let edge = iter.next().unwrap();
530 assert_eq!(edge.edge_id(), 3);
531
532 iter.seek(7 * EDGE_SIZE);
534 let edge = iter.next().unwrap();
535 assert_eq!(edge.edge_id(), 7);
536 }
537
538 #[test]
539 fn test_exact_size_iterator() {
540 let mut data = Vec::new();
541 for i in 0..5 {
542 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
543 }
544
545 let iter = ZeroCopyIterator::all(&data);
546 assert_eq!(iter.len(), 5);
547
548 let mut iter = ZeroCopyIterator::all(&data);
549 iter.next();
550 assert_eq!(iter.len(), 4);
551 }
552
553 #[test]
554 fn test_prefetching_iterator() {
555 let mut data = Vec::new();
556 for i in 0..100 {
557 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
558 }
559
560 let iter = PrefetchingZeroCopyIterator::with_default_prefetch(&data, 0, data.len());
561 let edges: Vec<_> = iter.collect();
562
563 assert_eq!(edges.len(), 100);
564 }
565}