1use byteorder::{ByteOrder, LittleEndian};
46use std::cmp::Ordering;
47
48pub const EDGE_SIZE: usize = 128;
50
51#[derive(Clone, Copy)]
61pub struct EdgeRef<'a> {
62 bytes: &'a [u8; EDGE_SIZE],
64}
65
66impl<'a> EdgeRef<'a> {
67 #[inline]
74 pub fn new(bytes: &'a [u8; EDGE_SIZE]) -> Self {
75 Self { bytes }
76 }
77
78 #[inline]
80 pub fn try_from_slice(bytes: &'a [u8]) -> Option<Self> {
81 if bytes.len() >= EDGE_SIZE {
82 let arr: &[u8; EDGE_SIZE] = bytes[..EDGE_SIZE].try_into().ok()?;
83 Some(Self { bytes: arr })
84 } else {
85 None
86 }
87 }
88
89 #[inline]
91 pub fn edge_id(&self) -> u128 {
92 LittleEndian::read_u128(&self.bytes[0..16])
93 }
94
95 #[inline]
97 pub fn timestamp_us(&self) -> u64 {
98 LittleEndian::read_u64(&self.bytes[16..24])
99 }
100
101 #[inline]
103 pub fn tenant_id(&self) -> u64 {
104 LittleEndian::read_u64(&self.bytes[24..32])
105 }
106
107 #[inline]
109 pub fn project_id(&self) -> u16 {
110 LittleEndian::read_u16(&self.bytes[32..34])
111 }
112
113 #[inline]
115 pub fn source_node_id(&self) -> u128 {
116 LittleEndian::read_u128(&self.bytes[34..50])
117 }
118
119 #[inline]
121 pub fn target_node_id(&self) -> u128 {
122 LittleEndian::read_u128(&self.bytes[50..66])
123 }
124
125 #[inline]
127 pub fn edge_type(&self) -> u8 {
128 self.bytes[66]
129 }
130
131 #[inline]
133 pub fn flags(&self) -> u8 {
134 self.bytes[67]
135 }
136
137 #[inline]
139 pub fn is_deleted(&self) -> bool {
140 (self.flags() & 0x01) != 0
142 }
143
144 #[inline]
146 pub fn payload_ref(&self) -> &[u8] {
147 &self.bytes[68..100]
148 }
149
150 #[inline]
152 pub fn checksum(&self) -> u32 {
153 LittleEndian::read_u32(&self.bytes[124..128])
154 }
155
156 #[inline]
158 pub fn as_bytes(&self) -> &[u8; EDGE_SIZE] {
159 self.bytes
160 }
161
162 pub fn verify_checksum(&self) -> bool {
164 let data = &self.bytes[..124];
166 let computed = crate::block_checksum::crc32c(data);
167 computed == self.checksum()
168 }
169}
170
171impl std::fmt::Debug for EdgeRef<'_> {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 f.debug_struct("EdgeRef")
174 .field("edge_id", &self.edge_id())
175 .field("timestamp_us", &self.timestamp_us())
176 .field("tenant_id", &self.tenant_id())
177 .field("is_deleted", &self.is_deleted())
178 .finish()
179 }
180}
181
182impl PartialEq for EdgeRef<'_> {
183 fn eq(&self, other: &Self) -> bool {
184 self.edge_id() == other.edge_id()
185 }
186}
187
188impl Eq for EdgeRef<'_> {}
189
190impl PartialOrd for EdgeRef<'_> {
191 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
192 Some(self.cmp(other))
193 }
194}
195
196impl Ord for EdgeRef<'_> {
197 fn cmp(&self, other: &Self) -> Ordering {
198 match self.timestamp_us().cmp(&other.timestamp_us()) {
200 Ordering::Equal => self.edge_id().cmp(&other.edge_id()),
201 ord => ord,
202 }
203 }
204}
205
206pub struct ZeroCopyIterator<'a> {
211 data: &'a [u8],
213 offset: usize,
215 end: usize,
217}
218
219impl<'a> ZeroCopyIterator<'a> {
220 pub fn new(data: &'a [u8], start: usize, end: usize) -> Self {
227 Self {
228 data,
229 offset: start,
230 end: end.min(data.len()),
231 }
232 }
233
234 pub fn all(data: &'a [u8]) -> Self {
236 Self::new(data, 0, data.len())
237 }
238
239 pub fn position(&self) -> usize {
241 self.offset
242 }
243
244 pub fn remaining(&self) -> usize {
246 self.end.saturating_sub(self.offset)
247 }
248
249 pub fn remaining_edges(&self) -> usize {
251 self.remaining() / EDGE_SIZE
252 }
253
254 pub fn seek(&mut self, offset: usize) {
256 self.offset = offset.min(self.end);
257 }
258
259 pub fn skip_edges(&mut self, n: usize) {
261 self.offset = (self.offset + n * EDGE_SIZE).min(self.end);
262 }
263}
264
265impl<'a> Iterator for ZeroCopyIterator<'a> {
266 type Item = EdgeRef<'a>;
267
268 #[inline]
269 fn next(&mut self) -> Option<Self::Item> {
270 if self.offset + EDGE_SIZE > self.end {
271 return None;
272 }
273
274 let bytes: &[u8; EDGE_SIZE] = self.data[self.offset..self.offset + EDGE_SIZE]
275 .try_into()
276 .ok()?;
277
278 self.offset += EDGE_SIZE;
279 Some(EdgeRef::new(bytes))
280 }
281
282 fn size_hint(&self) -> (usize, Option<usize>) {
283 let remaining = self.remaining_edges();
284 (remaining, Some(remaining))
285 }
286}
287
288impl ExactSizeIterator for ZeroCopyIterator<'_> {}
289
290pub struct PrefetchingZeroCopyIterator<'a> {
295 inner: ZeroCopyIterator<'a>,
296 prefetch_distance: usize,
297}
298
299impl<'a> PrefetchingZeroCopyIterator<'a> {
300 pub fn new(data: &'a [u8], start: usize, end: usize, prefetch_distance: usize) -> Self {
308 Self {
309 inner: ZeroCopyIterator::new(data, start, end),
310 prefetch_distance,
311 }
312 }
313
314 pub fn with_default_prefetch(data: &'a [u8], start: usize, end: usize) -> Self {
316 Self::new(data, start, end, 16)
317 }
318}
319
320impl<'a> Iterator for PrefetchingZeroCopyIterator<'a> {
321 type Item = EdgeRef<'a>;
322
323 #[inline]
324 fn next(&mut self) -> Option<Self::Item> {
325 let prefetch_offset = self.inner.offset + self.prefetch_distance * EDGE_SIZE;
327 if prefetch_offset < self.inner.end {
328 crate::prefetch::prefetch_ahead(
329 self.inner.data,
330 self.inner.offset,
331 self.prefetch_distance * EDGE_SIZE,
332 );
333 }
334
335 self.inner.next()
336 }
337
338 fn size_hint(&self) -> (usize, Option<usize>) {
339 self.inner.size_hint()
340 }
341}
342
343impl ExactSizeIterator for PrefetchingZeroCopyIterator<'_> {}
344
345pub struct TimestampRangeFilter<'a, I> {
349 inner: I,
350 start_ts: u64,
351 end_ts: u64,
352 _marker: std::marker::PhantomData<&'a ()>,
353}
354
355impl<'a, I: Iterator<Item = EdgeRef<'a>>> TimestampRangeFilter<'a, I> {
356 pub fn new(inner: I, start_ts: u64, end_ts: u64) -> Self {
358 Self {
359 inner,
360 start_ts,
361 end_ts,
362 _marker: std::marker::PhantomData,
363 }
364 }
365}
366
367impl<'a, I: Iterator<Item = EdgeRef<'a>>> Iterator for TimestampRangeFilter<'a, I> {
368 type Item = EdgeRef<'a>;
369
370 fn next(&mut self) -> Option<Self::Item> {
371 loop {
372 let edge = self.inner.next()?;
373 let ts = edge.timestamp_us();
374
375 if ts > self.end_ts {
376 return None;
378 }
379
380 if ts >= self.start_ts {
381 return Some(edge);
382 }
383
384 }
386 }
387}
388
389pub trait ZeroCopyExt<'a> {
391 fn iter_zero_copy(&'a self) -> ZeroCopyIterator<'a>;
393
394 fn iter_zero_copy_prefetching(&'a self) -> PrefetchingZeroCopyIterator<'a>;
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 fn create_test_edge(edge_id: u128, timestamp_us: u64) -> [u8; EDGE_SIZE] {
403 let mut bytes = [0u8; EDGE_SIZE];
404
405 LittleEndian::write_u128(&mut bytes[0..16], edge_id);
407 LittleEndian::write_u64(&mut bytes[16..24], timestamp_us);
409 LittleEndian::write_u64(&mut bytes[24..32], 1);
411 LittleEndian::write_u16(&mut bytes[32..34], 1);
413
414 let checksum = crate::block_checksum::crc32c(&bytes[..124]);
416 LittleEndian::write_u32(&mut bytes[124..128], checksum);
417
418 bytes
419 }
420
421 #[test]
422 fn test_edge_ref_basic() {
423 let edge_bytes = create_test_edge(12345, 1000000);
424 let edge_ref = EdgeRef::new(&edge_bytes);
425
426 assert_eq!(edge_ref.edge_id(), 12345);
427 assert_eq!(edge_ref.timestamp_us(), 1000000);
428 assert_eq!(edge_ref.tenant_id(), 1);
429 assert_eq!(edge_ref.project_id(), 1);
430 assert!(!edge_ref.is_deleted());
431 }
432
433 #[test]
434 fn test_edge_ref_checksum() {
435 let edge_bytes = create_test_edge(42, 500000);
436 let edge_ref = EdgeRef::new(&edge_bytes);
437
438 assert!(edge_ref.verify_checksum());
439
440 let mut corrupted = edge_bytes;
442 corrupted[10] ^= 0xFF;
443 let corrupted_ref = EdgeRef::new(&corrupted);
444 assert!(!corrupted_ref.verify_checksum());
445 }
446
447 #[test]
448 fn test_zero_copy_iterator() {
449 let mut data = Vec::new();
451 for i in 0..10 {
452 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
453 }
454
455 let iter = ZeroCopyIterator::all(&data);
456 assert_eq!(iter.remaining_edges(), 10);
457
458 let edges: Vec<_> = iter.collect();
459 assert_eq!(edges.len(), 10);
460
461 for (i, edge) in edges.iter().enumerate() {
462 assert_eq!(edge.edge_id(), i as u128);
463 assert_eq!(edge.timestamp_us(), i as u64 * 1000);
464 }
465 }
466
467 #[test]
468 fn test_zero_copy_iterator_range() {
469 let mut data = Vec::new();
470 for i in 0..10 {
471 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
472 }
473
474 let start = 3 * EDGE_SIZE;
476 let end = 8 * EDGE_SIZE;
477 let iter = ZeroCopyIterator::new(&data, start, end);
478
479 let edges: Vec<_> = iter.collect();
480 assert_eq!(edges.len(), 5);
481 assert_eq!(edges[0].edge_id(), 3);
482 assert_eq!(edges[4].edge_id(), 7);
483 }
484
485 #[test]
486 fn test_timestamp_range_filter() {
487 let mut data = Vec::new();
488 for i in 0..10 {
489 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
490 }
491
492 let iter = ZeroCopyIterator::all(&data);
493 let filtered = TimestampRangeFilter::new(iter, 3000, 6000);
494
495 let edges: Vec<_> = filtered.collect();
496 assert_eq!(edges.len(), 4); assert_eq!(edges[0].timestamp_us(), 3000);
498 assert_eq!(edges[3].timestamp_us(), 6000);
499 }
500
501 #[test]
502 fn test_edge_ref_ordering() {
503 let edge1_bytes = create_test_edge(1, 1000);
504 let edge2_bytes = create_test_edge(2, 1000);
505 let edge3_bytes = create_test_edge(1, 2000);
506
507 let edge1 = EdgeRef::new(&edge1_bytes);
508 let edge2 = EdgeRef::new(&edge2_bytes);
509 let edge3 = EdgeRef::new(&edge3_bytes);
510
511 assert!(edge1 < edge2);
513
514 assert!(edge1 < edge3);
516 assert!(edge2 < edge3);
517 }
518
519 #[test]
520 fn test_seek_and_skip() {
521 let mut data = Vec::new();
522 for i in 0..10 {
523 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
524 }
525
526 let mut iter = ZeroCopyIterator::all(&data);
527
528 iter.skip_edges(3);
530 assert_eq!(iter.remaining_edges(), 7);
531
532 let edge = iter.next().unwrap();
533 assert_eq!(edge.edge_id(), 3);
534
535 iter.seek(7 * EDGE_SIZE);
537 let edge = iter.next().unwrap();
538 assert_eq!(edge.edge_id(), 7);
539 }
540
541 #[test]
542 fn test_exact_size_iterator() {
543 let mut data = Vec::new();
544 for i in 0..5 {
545 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
546 }
547
548 let iter = ZeroCopyIterator::all(&data);
549 assert_eq!(iter.len(), 5);
550
551 let mut iter = ZeroCopyIterator::all(&data);
552 iter.next();
553 assert_eq!(iter.len(), 4);
554 }
555
556 #[test]
557 fn test_prefetching_iterator() {
558 let mut data = Vec::new();
559 for i in 0..100 {
560 data.extend_from_slice(&create_test_edge(i as u128, i as u64 * 1000));
561 }
562
563 let iter = PrefetchingZeroCopyIterator::with_default_prefetch(&data, 0, data.len());
564 let edges: Vec<_> = iter.collect();
565
566 assert_eq!(edges.len(), 100);
567 }
568}