1use smallvec::SmallVec;
30
31use super::factorized_chunk::FactorizedChunk;
32use grafeo_common::types::{EdgeId, NodeId, Value};
33
34const STACK_LEVELS: usize = 4;
37
38#[derive(Debug, Clone)]
42pub struct RowIndices {
43 indices: SmallVec<[usize; STACK_LEVELS]>,
46}
47
48impl RowIndices {
49 #[must_use]
51 pub fn new(indices: &[usize]) -> Self {
52 Self {
53 indices: SmallVec::from_slice(indices),
54 }
55 }
56
57 #[must_use]
59 pub fn get(&self, level: usize) -> Option<usize> {
60 self.indices.get(level).copied()
61 }
62
63 #[must_use]
65 pub fn level_count(&self) -> usize {
66 self.indices.len()
67 }
68
69 #[must_use]
71 pub fn as_slice(&self) -> &[usize] {
72 &self.indices
73 }
74}
75
76#[derive(Debug, Clone)]
81pub struct RowView<'a> {
82 chunk: &'a FactorizedChunk,
83 indices: RowIndices,
84}
85
86impl<'a> RowView<'a> {
87 #[must_use]
89 pub fn new(chunk: &'a FactorizedChunk, indices: RowIndices) -> Self {
90 Self { chunk, indices }
91 }
92
93 #[must_use]
95 pub fn from_ref(chunk: &'a FactorizedChunk, indices: &RowIndices) -> Self {
96 Self {
97 chunk,
98 indices: indices.clone(),
99 }
100 }
101
102 #[must_use]
104 pub fn get(&self, level: usize, column: usize) -> Option<Value> {
105 let physical_idx = self.indices.get(level)?;
106 let level_data = self.chunk.level(level)?;
107 let col = level_data.column(column)?;
108 col.get_physical(physical_idx)
109 }
110
111 #[must_use]
113 pub fn get_node_id(&self, level: usize, column: usize) -> Option<NodeId> {
114 let physical_idx = self.indices.get(level)?;
115 let level_data = self.chunk.level(level)?;
116 let col = level_data.column(column)?;
117 col.get_node_id_physical(physical_idx)
118 }
119
120 #[must_use]
122 pub fn get_edge_id(&self, level: usize, column: usize) -> Option<EdgeId> {
123 let physical_idx = self.indices.get(level)?;
124 let level_data = self.chunk.level(level)?;
125 let col = level_data.column(column)?;
126 col.get_edge_id_physical(physical_idx)
127 }
128
129 #[must_use]
131 pub fn level_count(&self) -> usize {
132 self.indices.level_count()
133 }
134
135 pub fn values(&self) -> impl Iterator<Item = Value> + '_ {
139 (0..self.level_count()).flat_map(move |level| {
140 let physical_idx = self.indices.get(level).unwrap_or(0);
141 let level_data = self.chunk.level(level);
142 let col_count = level_data.map_or(0, |l| l.column_count());
143
144 (0..col_count).filter_map(move |col| {
145 level_data
146 .and_then(|l| l.column(col))
147 .and_then(|c| c.get_physical(physical_idx))
148 })
149 })
150 }
151
152 #[must_use]
154 pub fn to_vec(&self) -> Vec<Value> {
155 self.values().collect()
156 }
157}
158
159#[derive(Debug)]
182pub struct PrecomputedIter<'a> {
183 chunk: &'a FactorizedChunk,
184 rows: Vec<RowIndices>,
186 position: usize,
188}
189
190impl<'a> PrecomputedIter<'a> {
191 #[must_use]
196 pub fn new(chunk: &'a FactorizedChunk) -> Self {
197 let rows = Self::compute_all_indices(chunk);
198 Self {
199 chunk,
200 rows,
201 position: 0,
202 }
203 }
204
205 fn compute_all_indices(chunk: &FactorizedChunk) -> Vec<RowIndices> {
207 let level_count = chunk.level_count();
208 if level_count == 0 {
209 return Vec::new();
210 }
211
212 let logical_rows = chunk.logical_row_count();
213 let mut rows = Vec::with_capacity(logical_rows);
214
215 let mut indices: SmallVec<[usize; STACK_LEVELS]> = SmallVec::new();
217 indices.resize(level_count, 0);
218
219 Self::enumerate_rows_iterative(chunk, &mut indices, &mut rows);
220
221 rows
222 }
223
224 fn enumerate_rows_iterative(
226 chunk: &FactorizedChunk,
227 initial_indices: &mut SmallVec<[usize; STACK_LEVELS]>,
228 rows: &mut Vec<RowIndices>,
229 ) {
230 let level_count = chunk.level_count();
231 if level_count == 0 {
232 return;
233 }
234
235 for level in 0..level_count {
237 if level == 0 {
238 initial_indices[0] = 0;
239 } else {
240 let parent_idx = initial_indices[level - 1];
241 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
242 let (start, _) = col.range_for_parent(parent_idx);
243 initial_indices[level] = start;
244 }
245 }
246 }
247
248 if !Self::is_valid_position(chunk, initial_indices) {
250 if !Self::advance_to_next_valid(chunk, initial_indices) {
251 return; }
253 }
254
255 loop {
257 rows.push(RowIndices::new(initial_indices));
259
260 if !Self::advance_to_next_valid(chunk, initial_indices) {
262 break;
263 }
264 }
265 }
266
267 fn is_valid_position(
269 chunk: &FactorizedChunk,
270 indices: &SmallVec<[usize; STACK_LEVELS]>,
271 ) -> bool {
272 let level_count = chunk.level_count();
273
274 for level in 0..level_count {
275 if level == 0 {
276 let level_data = match chunk.level(0) {
277 Some(l) => l,
278 None => return false,
279 };
280 if indices[0] >= level_data.group_count() {
281 return false;
282 }
283 } else {
284 let parent_idx = indices[level - 1];
285 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
286 let (start, end) = col.range_for_parent(parent_idx);
287 if start >= end || indices[level] < start || indices[level] >= end {
288 return false;
289 }
290 } else {
291 return false;
292 }
293 }
294 }
295
296 true
297 }
298
299 fn advance_to_next_valid(
301 chunk: &FactorizedChunk,
302 indices: &mut SmallVec<[usize; STACK_LEVELS]>,
303 ) -> bool {
304 let level_count = chunk.level_count();
305 if level_count == 0 {
306 return false;
307 }
308
309 loop {
310 let mut advanced = false;
312
313 for level in (0..level_count).rev() {
314 let (_start, end) = if level == 0 {
315 let level_data = match chunk.level(0) {
316 Some(l) => l,
317 None => return false,
318 };
319 (0, level_data.group_count())
320 } else {
321 let parent_idx = indices[level - 1];
322 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
323 col.range_for_parent(parent_idx)
324 } else {
325 (0, 0)
326 }
327 };
328
329 if indices[level] + 1 < end {
330 indices[level] += 1;
332
333 for deeper in (level + 1)..level_count {
335 let deeper_parent = indices[deeper - 1];
336 if let Some(col) = chunk.level(deeper).and_then(|l| l.column(0)) {
337 let (deeper_start, _) = col.range_for_parent(deeper_parent);
338 indices[deeper] = deeper_start;
339 }
340 }
341
342 advanced = true;
343 break;
344 }
345 }
347
348 if !advanced {
349 return false; }
351
352 if Self::is_valid_position(chunk, indices) {
354 return true;
355 }
356 }
358 }
359
360 #[must_use]
362 pub fn len(&self) -> usize {
363 self.rows.len()
364 }
365
366 #[must_use]
368 pub fn is_empty(&self) -> bool {
369 self.rows.is_empty()
370 }
371
372 #[must_use]
374 pub fn get(&self, index: usize) -> Option<&RowIndices> {
375 self.rows.get(index)
376 }
377
378 #[must_use]
380 pub fn row(&self, index: usize) -> Option<RowView<'a>> {
381 self.rows
382 .get(index)
383 .map(|indices| RowView::from_ref(self.chunk, indices))
384 }
385
386 pub fn rows(&self) -> impl Iterator<Item = RowView<'a>> + '_ {
388 self.rows
389 .iter()
390 .map(|indices| RowView::from_ref(self.chunk, indices))
391 }
392
393 pub fn reset(&mut self) {
395 self.position = 0;
396 }
397}
398
399impl<'a> Iterator for PrecomputedIter<'a> {
400 type Item = RowView<'a>;
401
402 fn next(&mut self) -> Option<Self::Item> {
403 if self.position >= self.rows.len() {
404 return None;
405 }
406
407 let indices = &self.rows[self.position];
408 self.position += 1;
409 Some(RowView::from_ref(self.chunk, indices))
410 }
411
412 fn size_hint(&self) -> (usize, Option<usize>) {
413 let remaining = self.rows.len() - self.position;
414 (remaining, Some(remaining))
415 }
416}
417
418impl ExactSizeIterator for PrecomputedIter<'_> {}
419
420#[derive(Debug)]
425pub struct StreamingIter<'a> {
426 chunk: &'a FactorizedChunk,
427 indices: SmallVec<[usize; STACK_LEVELS]>,
428 exhausted: bool,
429 started: bool,
430}
431
432impl<'a> StreamingIter<'a> {
433 #[must_use]
435 pub fn new(chunk: &'a FactorizedChunk) -> Self {
436 let level_count = chunk.level_count();
437 let mut indices = SmallVec::new();
438 indices.resize(level_count, 0);
439
440 let mut iter = Self {
441 chunk,
442 indices,
443 exhausted: level_count == 0,
444 started: false,
445 };
446
447 if !iter.exhausted {
449 iter.initialize_indices();
450 if !PrecomputedIter::is_valid_position(chunk, &iter.indices) {
451 iter.exhausted = !PrecomputedIter::advance_to_next_valid(chunk, &mut iter.indices);
452 }
453 }
454
455 iter
456 }
457
458 fn initialize_indices(&mut self) {
460 let level_count = self.chunk.level_count();
461 for level in 0..level_count {
462 if level == 0 {
463 self.indices[0] = 0;
464 } else {
465 let parent_idx = self.indices[level - 1];
466 if let Some(col) = self.chunk.level(level).and_then(|l| l.column(0)) {
467 let (start, _) = col.range_for_parent(parent_idx);
468 self.indices[level] = start;
469 }
470 }
471 }
472 }
473
474 #[must_use]
476 pub fn current_indices(&self) -> Option<RowIndices> {
477 if self.exhausted {
478 None
479 } else {
480 Some(RowIndices::new(&self.indices))
481 }
482 }
483
484 pub fn reset(&mut self) {
486 self.started = false;
487 self.exhausted = self.chunk.level_count() == 0;
488 if !self.exhausted {
489 self.initialize_indices();
490 if !PrecomputedIter::is_valid_position(self.chunk, &self.indices) {
491 self.exhausted =
492 !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices);
493 }
494 }
495 }
496}
497
498impl<'a> Iterator for StreamingIter<'a> {
499 type Item = RowIndices;
500
501 fn next(&mut self) -> Option<Self::Item> {
502 if self.exhausted {
503 return None;
504 }
505
506 if self.started {
507 if !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices) {
509 self.exhausted = true;
510 return None;
511 }
512 } else {
513 self.started = true;
514 }
515
516 Some(RowIndices::new(&self.indices))
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use crate::execution::factorized_chunk::FactorizationLevel;
524 use crate::execution::factorized_vector::FactorizedVector;
525 use crate::execution::vector::ValueVector;
526 use grafeo_common::types::{LogicalType, NodeId};
527
528 fn create_test_chunk() -> FactorizedChunk {
529 let mut source_data = ValueVector::with_type(LogicalType::Int64);
531 source_data.push_int64(10);
532 source_data.push_int64(20);
533 let level0 = FactorizationLevel::flat(
534 vec![FactorizedVector::flat(source_data)],
535 vec!["source".to_string()],
536 );
537
538 let mut child_data = ValueVector::with_type(LogicalType::Int64);
540 child_data.push_int64(1);
541 child_data.push_int64(2);
542 child_data.push_int64(3);
543 child_data.push_int64(4);
544 child_data.push_int64(5);
545
546 let offsets = vec![0u32, 3, 5];
547 let child_vec = FactorizedVector::unflat(child_data, offsets, 2);
548 let level1 =
549 FactorizationLevel::unflat(vec![child_vec], vec!["child".to_string()], vec![3, 2]);
550
551 let mut chunk = FactorizedChunk::empty();
552 chunk.add_factorized_level(level0);
553 chunk.add_factorized_level(level1);
554 chunk
555 }
556
557 fn create_node_chunk() -> FactorizedChunk {
558 let mut source_data = ValueVector::with_type(LogicalType::Node);
559 source_data.push_node_id(NodeId::new(100));
560 source_data.push_node_id(NodeId::new(200));
561 let level0 = FactorizationLevel::flat(
562 vec![FactorizedVector::flat(source_data)],
563 vec!["source".to_string()],
564 );
565
566 let mut chunk = FactorizedChunk::empty();
567 chunk.add_factorized_level(level0);
568 chunk
569 }
570
571 #[test]
572 fn test_row_indices_new() {
573 let indices = RowIndices::new(&[0, 1, 2]);
574 assert_eq!(indices.level_count(), 3);
575 assert_eq!(indices.get(0), Some(0));
576 assert_eq!(indices.get(1), Some(1));
577 assert_eq!(indices.get(2), Some(2));
578 assert_eq!(indices.get(3), None);
579 }
580
581 #[test]
582 fn test_row_indices_as_slice() {
583 let indices = RowIndices::new(&[5, 10, 15]);
584 assert_eq!(indices.as_slice(), &[5, 10, 15]);
585 }
586
587 #[test]
588 fn test_row_view_new() {
589 let chunk = create_test_chunk();
590 let indices = RowIndices::new(&[0, 0]);
591
592 let view = RowView::new(&chunk, indices);
593 assert_eq!(view.level_count(), 2);
594 }
595
596 #[test]
597 fn test_row_view_from_ref() {
598 let chunk = create_test_chunk();
599 let indices = RowIndices::new(&[0, 1]);
600
601 let view = RowView::from_ref(&chunk, &indices);
602 assert_eq!(view.get(0, 0), Some(Value::Int64(10)));
603 assert_eq!(view.get(1, 0), Some(Value::Int64(2)));
604 }
605
606 #[test]
607 fn test_row_view_get_node_id() {
608 let chunk = create_node_chunk();
609 let indices = RowIndices::new(&[0]);
610
611 let view = RowView::new(&chunk, indices);
612 assert_eq!(view.get_node_id(0, 0), Some(NodeId::new(100)));
613 }
614
615 #[test]
616 fn test_row_view_get_invalid() {
617 let chunk = create_test_chunk();
618 let indices = RowIndices::new(&[0, 0]);
619
620 let view = RowView::new(&chunk, indices);
621
622 assert_eq!(view.get(10, 0), None);
624
625 assert_eq!(view.get(0, 10), None);
627 }
628
629 #[test]
630 fn test_row_view_values() {
631 let chunk = create_test_chunk();
632 let indices = RowIndices::new(&[0, 0]);
633
634 let view = RowView::new(&chunk, indices);
635 let values: Vec<Value> = view.values().collect();
636
637 assert_eq!(values.len(), 2);
638 assert_eq!(values[0], Value::Int64(10));
639 assert_eq!(values[1], Value::Int64(1));
640 }
641
642 #[test]
643 fn test_row_view_to_vec() {
644 let chunk = create_test_chunk();
645 let indices = RowIndices::new(&[1, 4]);
646
647 let view = RowView::new(&chunk, indices);
648 let vec = view.to_vec();
649
650 assert_eq!(vec.len(), 2);
651 assert_eq!(vec[0], Value::Int64(20));
652 assert_eq!(vec[1], Value::Int64(5));
653 }
654
655 #[test]
656 fn test_precomputed_iter_count() {
657 let chunk = create_test_chunk();
658 let iter = PrecomputedIter::new(&chunk);
659
660 assert_eq!(iter.len(), 5);
662 }
663
664 #[test]
665 fn test_precomputed_iter_values() {
666 let chunk = create_test_chunk();
667 let iter = PrecomputedIter::new(&chunk);
668
669 let rows: Vec<Vec<Value>> = iter.map(|row| row.to_vec()).collect();
670
671 assert_eq!(rows.len(), 5);
672 assert_eq!(rows[0], vec![Value::Int64(10), Value::Int64(1)]);
673 assert_eq!(rows[1], vec![Value::Int64(10), Value::Int64(2)]);
674 assert_eq!(rows[2], vec![Value::Int64(10), Value::Int64(3)]);
675 assert_eq!(rows[3], vec![Value::Int64(20), Value::Int64(4)]);
676 assert_eq!(rows[4], vec![Value::Int64(20), Value::Int64(5)]);
677 }
678
679 #[test]
680 fn test_precomputed_iter_get() {
681 let chunk = create_test_chunk();
682 let iter = PrecomputedIter::new(&chunk);
683
684 let indices = iter.get(2).unwrap();
685 assert_eq!(indices.as_slice(), &[0, 2]);
686
687 assert!(iter.get(10).is_none());
688 }
689
690 #[test]
691 fn test_precomputed_iter_rows() {
692 let chunk = create_test_chunk();
693 let iter = PrecomputedIter::new(&chunk);
694
695 let rows: Vec<RowView> = iter.rows().collect();
696 assert_eq!(rows.len(), 5);
697 }
698
699 #[test]
700 fn test_precomputed_iter_reset() {
701 let chunk = create_test_chunk();
702 let mut iter = PrecomputedIter::new(&chunk);
703
704 assert!(iter.next().is_some());
706 assert!(iter.next().is_some());
707 assert_eq!(iter.size_hint().0, 3);
708
709 iter.reset();
711 assert_eq!(iter.size_hint().0, 5);
712 }
713
714 #[test]
715 fn test_row_view_get() {
716 let chunk = create_test_chunk();
717 let iter = PrecomputedIter::new(&chunk);
718
719 let first_row = iter.row(0).unwrap();
720 assert_eq!(first_row.get(0, 0), Some(Value::Int64(10)));
721 assert_eq!(first_row.get(1, 0), Some(Value::Int64(1)));
722
723 let last_row = iter.row(4).unwrap();
724 assert_eq!(last_row.get(0, 0), Some(Value::Int64(20)));
725 assert_eq!(last_row.get(1, 0), Some(Value::Int64(5)));
726
727 assert!(iter.row(10).is_none());
728 }
729
730 #[test]
731 fn test_streaming_iter() {
732 let chunk = create_test_chunk();
733 let iter = StreamingIter::new(&chunk);
734
735 let indices: Vec<RowIndices> = iter.collect();
736
737 assert_eq!(indices.len(), 5);
738 assert_eq!(indices[0].as_slice(), &[0, 0]);
739 assert_eq!(indices[1].as_slice(), &[0, 1]);
740 assert_eq!(indices[2].as_slice(), &[0, 2]);
741 assert_eq!(indices[3].as_slice(), &[1, 3]);
742 assert_eq!(indices[4].as_slice(), &[1, 4]);
743 }
744
745 #[test]
746 fn test_streaming_iter_current_indices() {
747 let chunk = create_test_chunk();
748 let iter = StreamingIter::new(&chunk);
749
750 let current = iter.current_indices();
751 assert!(current.is_some());
752 assert_eq!(current.unwrap().as_slice(), &[0, 0]);
753 }
754
755 #[test]
756 fn test_streaming_iter_reset() {
757 let chunk = create_test_chunk();
758 let mut iter = StreamingIter::new(&chunk);
759
760 iter.next();
762 iter.next();
763
764 iter.reset();
766
767 let first = iter.next().unwrap();
769 assert_eq!(first.as_slice(), &[0, 0]);
770 }
771
772 #[test]
773 fn test_streaming_iter_exhausted() {
774 let chunk = create_test_chunk();
775 let mut iter = StreamingIter::new(&chunk);
776
777 while iter.next().is_some() {}
779
780 assert!(iter.current_indices().is_none());
782 }
783
784 #[test]
785 fn test_empty_chunk() {
786 let chunk = FactorizedChunk::empty();
787 let iter = PrecomputedIter::new(&chunk);
788
789 assert!(iter.is_empty());
790 assert_eq!(iter.len(), 0);
791 }
792
793 #[test]
794 fn test_empty_chunk_streaming() {
795 let chunk = FactorizedChunk::empty();
796 let mut iter = StreamingIter::new(&chunk);
797
798 assert!(iter.next().is_none());
799 assert!(iter.current_indices().is_none());
800 }
801
802 #[test]
803 fn test_random_access() {
804 let chunk = create_test_chunk();
805 let iter = PrecomputedIter::new(&chunk);
806
807 let row2 = iter.row(2).unwrap();
809 let row4 = iter.row(4).unwrap();
810 let row0 = iter.row(0).unwrap();
811
812 assert_eq!(row2.get(1, 0), Some(Value::Int64(3)));
813 assert_eq!(row4.get(1, 0), Some(Value::Int64(5)));
814 assert_eq!(row0.get(1, 0), Some(Value::Int64(1)));
815 }
816
817 #[test]
818 fn test_exact_size_iterator() {
819 let chunk = create_test_chunk();
820 let iter = PrecomputedIter::new(&chunk);
821
822 assert_eq!(iter.len(), 5);
823 assert_eq!(iter.size_hint(), (5, Some(5)));
824 }
825
826 #[test]
827 fn test_single_level_chunk() {
828 let mut source_data = ValueVector::with_type(LogicalType::Int64);
829 source_data.push_int64(1);
830 source_data.push_int64(2);
831 source_data.push_int64(3);
832 let level0 = FactorizationLevel::flat(
833 vec![FactorizedVector::flat(source_data)],
834 vec!["value".to_string()],
835 );
836
837 let mut chunk = FactorizedChunk::empty();
838 chunk.add_factorized_level(level0);
839
840 let iter = PrecomputedIter::new(&chunk);
841 assert_eq!(iter.len(), 3);
842
843 let streaming = StreamingIter::new(&chunk);
844 let indices: Vec<RowIndices> = streaming.collect();
845 assert_eq!(indices.len(), 3);
846 }
847
848 #[test]
849 fn test_row_indices_clone() {
850 let indices = RowIndices::new(&[1, 2, 3]);
851 let cloned = indices.clone();
852
853 assert_eq!(indices.as_slice(), cloned.as_slice());
854 }
855
856 #[test]
857 fn test_row_view_level_count() {
858 let chunk = create_test_chunk();
859 let indices = RowIndices::new(&[0, 0]);
860 let view = RowView::new(&chunk, indices);
861
862 assert_eq!(view.level_count(), 2);
863 }
864}