1use smallvec::SmallVec;
33
34use super::factorized_chunk::FactorizedChunk;
35use grafeo_common::types::{EdgeId, NodeId, Value};
36
37const STACK_LEVELS: usize = 4;
40
41#[derive(Debug, Clone)]
45pub struct RowIndices {
46 indices: SmallVec<[usize; STACK_LEVELS]>,
49}
50
51impl RowIndices {
52 #[must_use]
54 pub fn new(indices: &[usize]) -> Self {
55 Self {
56 indices: SmallVec::from_slice(indices),
57 }
58 }
59
60 #[must_use]
62 pub fn get(&self, level: usize) -> Option<usize> {
63 self.indices.get(level).copied()
64 }
65
66 #[must_use]
68 pub fn level_count(&self) -> usize {
69 self.indices.len()
70 }
71
72 #[must_use]
74 pub fn as_slice(&self) -> &[usize] {
75 &self.indices
76 }
77}
78
79#[derive(Debug, Clone)]
84pub struct RowView<'a> {
85 chunk: &'a FactorizedChunk,
86 indices: RowIndices,
87}
88
89impl<'a> RowView<'a> {
90 #[must_use]
92 pub fn new(chunk: &'a FactorizedChunk, indices: RowIndices) -> Self {
93 Self { chunk, indices }
94 }
95
96 #[must_use]
98 pub fn from_ref(chunk: &'a FactorizedChunk, indices: &RowIndices) -> Self {
99 Self {
100 chunk,
101 indices: indices.clone(),
102 }
103 }
104
105 #[must_use]
107 pub fn get(&self, level: usize, column: usize) -> Option<Value> {
108 let physical_idx = self.indices.get(level)?;
109 let level_data = self.chunk.level(level)?;
110 let col = level_data.column(column)?;
111 col.get_physical(physical_idx)
112 }
113
114 #[must_use]
116 pub fn get_node_id(&self, level: usize, column: usize) -> Option<NodeId> {
117 let physical_idx = self.indices.get(level)?;
118 let level_data = self.chunk.level(level)?;
119 let col = level_data.column(column)?;
120 col.get_node_id_physical(physical_idx)
121 }
122
123 #[must_use]
125 pub fn get_edge_id(&self, level: usize, column: usize) -> Option<EdgeId> {
126 let physical_idx = self.indices.get(level)?;
127 let level_data = self.chunk.level(level)?;
128 let col = level_data.column(column)?;
129 col.get_edge_id_physical(physical_idx)
130 }
131
132 #[must_use]
134 pub fn level_count(&self) -> usize {
135 self.indices.level_count()
136 }
137
138 pub fn values(&self) -> impl Iterator<Item = Value> + '_ {
142 (0..self.level_count()).flat_map(move |level| {
143 let physical_idx = self.indices.get(level).unwrap_or(0);
144 let level_data = self.chunk.level(level);
145 let col_count = level_data.map_or(0, |l| l.column_count());
146
147 (0..col_count).filter_map(move |col| {
148 level_data
149 .and_then(|l| l.column(col))
150 .and_then(|c| c.get_physical(physical_idx))
151 })
152 })
153 }
154
155 #[must_use]
157 pub fn to_vec(&self) -> Vec<Value> {
158 self.values().collect()
159 }
160}
161
162#[derive(Debug)]
185pub struct PrecomputedIter<'a> {
186 chunk: &'a FactorizedChunk,
187 rows: Vec<RowIndices>,
189 position: usize,
191}
192
193impl<'a> PrecomputedIter<'a> {
194 #[must_use]
199 pub fn new(chunk: &'a FactorizedChunk) -> Self {
200 let rows = Self::compute_all_indices(chunk);
201 Self {
202 chunk,
203 rows,
204 position: 0,
205 }
206 }
207
208 fn compute_all_indices(chunk: &FactorizedChunk) -> Vec<RowIndices> {
210 let level_count = chunk.level_count();
211 if level_count == 0 {
212 return Vec::new();
213 }
214
215 let logical_rows = chunk.logical_row_count();
216 let mut rows = Vec::with_capacity(logical_rows);
217
218 let mut indices: SmallVec<[usize; STACK_LEVELS]> = SmallVec::new();
220 indices.resize(level_count, 0);
221
222 Self::enumerate_rows_iterative(chunk, &mut indices, &mut rows);
223
224 rows
225 }
226
227 fn enumerate_rows_iterative(
229 chunk: &FactorizedChunk,
230 initial_indices: &mut SmallVec<[usize; STACK_LEVELS]>,
231 rows: &mut Vec<RowIndices>,
232 ) {
233 let level_count = chunk.level_count();
234 if level_count == 0 {
235 return;
236 }
237
238 for level in 0..level_count {
240 if level == 0 {
241 initial_indices[0] = 0;
242 } else {
243 let parent_idx = initial_indices[level - 1];
244 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
245 let (start, _) = col.range_for_parent(parent_idx);
246 initial_indices[level] = start;
247 }
248 }
249 }
250
251 if !Self::is_valid_position(chunk, initial_indices)
253 && !Self::advance_to_next_valid(chunk, initial_indices)
254 {
255 return; }
257
258 loop {
260 rows.push(RowIndices::new(initial_indices));
262
263 if !Self::advance_to_next_valid(chunk, initial_indices) {
265 break;
266 }
267 }
268 }
269
270 fn is_valid_position(
272 chunk: &FactorizedChunk,
273 indices: &SmallVec<[usize; STACK_LEVELS]>,
274 ) -> bool {
275 let level_count = chunk.level_count();
276
277 for level in 0..level_count {
278 if level == 0 {
279 let Some(level_data) = chunk.level(0) else {
280 return false;
281 };
282 if indices[0] >= level_data.group_count() {
283 return false;
284 }
285 } else {
286 let parent_idx = indices[level - 1];
287 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
288 let (start, end) = col.range_for_parent(parent_idx);
289 if start >= end || indices[level] < start || indices[level] >= end {
290 return false;
291 }
292 } else {
293 return false;
294 }
295 }
296 }
297
298 true
299 }
300
301 fn advance_to_next_valid(
303 chunk: &FactorizedChunk,
304 indices: &mut SmallVec<[usize; STACK_LEVELS]>,
305 ) -> bool {
306 let level_count = chunk.level_count();
307 if level_count == 0 {
308 return false;
309 }
310
311 loop {
312 let mut advanced = false;
314
315 for level in (0..level_count).rev() {
316 let (_start, end) = if level == 0 {
317 let Some(level_data) = chunk.level(0) else {
318 return false;
319 };
320 (0, level_data.group_count())
321 } else {
322 let parent_idx = indices[level - 1];
323 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
324 col.range_for_parent(parent_idx)
325 } else {
326 (0, 0)
327 }
328 };
329
330 if indices[level] + 1 < end {
331 indices[level] += 1;
333
334 for deeper in (level + 1)..level_count {
336 let deeper_parent = indices[deeper - 1];
337 if let Some(col) = chunk.level(deeper).and_then(|l| l.column(0)) {
338 let (deeper_start, _) = col.range_for_parent(deeper_parent);
339 indices[deeper] = deeper_start;
340 }
341 }
342
343 advanced = true;
344 break;
345 }
346 }
348
349 if !advanced {
350 return false; }
352
353 if Self::is_valid_position(chunk, indices) {
355 return true;
356 }
357 }
359 }
360
361 #[must_use]
363 pub fn len(&self) -> usize {
364 self.rows.len()
365 }
366
367 #[must_use]
369 pub fn is_empty(&self) -> bool {
370 self.rows.is_empty()
371 }
372
373 #[must_use]
375 pub fn get(&self, index: usize) -> Option<&RowIndices> {
376 self.rows.get(index)
377 }
378
379 #[must_use]
381 pub fn row(&self, index: usize) -> Option<RowView<'a>> {
382 self.rows
383 .get(index)
384 .map(|indices| RowView::from_ref(self.chunk, indices))
385 }
386
387 pub fn rows(&self) -> impl Iterator<Item = RowView<'a>> + '_ {
389 self.rows
390 .iter()
391 .map(|indices| RowView::from_ref(self.chunk, indices))
392 }
393
394 pub fn reset(&mut self) {
396 self.position = 0;
397 }
398}
399
400impl<'a> Iterator for PrecomputedIter<'a> {
401 type Item = RowView<'a>;
402
403 fn next(&mut self) -> Option<Self::Item> {
404 if self.position >= self.rows.len() {
405 return None;
406 }
407
408 let indices = &self.rows[self.position];
409 self.position += 1;
410 Some(RowView::from_ref(self.chunk, indices))
411 }
412
413 fn size_hint(&self) -> (usize, Option<usize>) {
414 let remaining = self.rows.len() - self.position;
415 (remaining, Some(remaining))
416 }
417}
418
419impl ExactSizeIterator for PrecomputedIter<'_> {}
420
421#[derive(Debug)]
426pub struct StreamingIter<'a> {
427 chunk: &'a FactorizedChunk,
428 indices: SmallVec<[usize; STACK_LEVELS]>,
429 exhausted: bool,
430 started: bool,
431}
432
433impl<'a> StreamingIter<'a> {
434 #[must_use]
436 pub fn new(chunk: &'a FactorizedChunk) -> Self {
437 let level_count = chunk.level_count();
438 let mut indices = SmallVec::new();
439 indices.resize(level_count, 0);
440
441 let mut iter = Self {
442 chunk,
443 indices,
444 exhausted: level_count == 0,
445 started: false,
446 };
447
448 if !iter.exhausted {
450 iter.initialize_indices();
451 if !PrecomputedIter::is_valid_position(chunk, &iter.indices) {
452 iter.exhausted = !PrecomputedIter::advance_to_next_valid(chunk, &mut iter.indices);
453 }
454 }
455
456 iter
457 }
458
459 fn initialize_indices(&mut self) {
461 let level_count = self.chunk.level_count();
462 for level in 0..level_count {
463 if level == 0 {
464 self.indices[0] = 0;
465 } else {
466 let parent_idx = self.indices[level - 1];
467 if let Some(col) = self.chunk.level(level).and_then(|l| l.column(0)) {
468 let (start, _) = col.range_for_parent(parent_idx);
469 self.indices[level] = start;
470 }
471 }
472 }
473 }
474
475 #[must_use]
477 pub fn current_indices(&self) -> Option<RowIndices> {
478 if self.exhausted {
479 None
480 } else {
481 Some(RowIndices::new(&self.indices))
482 }
483 }
484
485 pub fn reset(&mut self) {
487 self.started = false;
488 self.exhausted = self.chunk.level_count() == 0;
489 if !self.exhausted {
490 self.initialize_indices();
491 if !PrecomputedIter::is_valid_position(self.chunk, &self.indices) {
492 self.exhausted =
493 !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices);
494 }
495 }
496 }
497}
498
499impl Iterator for StreamingIter<'_> {
500 type Item = RowIndices;
501
502 fn next(&mut self) -> Option<Self::Item> {
503 if self.exhausted {
504 return None;
505 }
506
507 if self.started {
508 if !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices) {
510 self.exhausted = true;
511 return None;
512 }
513 } else {
514 self.started = true;
515 }
516
517 Some(RowIndices::new(&self.indices))
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use crate::execution::factorized_chunk::FactorizationLevel;
525 use crate::execution::factorized_vector::FactorizedVector;
526 use crate::execution::vector::ValueVector;
527 use grafeo_common::types::{LogicalType, NodeId};
528
529 fn create_test_chunk() -> FactorizedChunk {
530 let mut source_data = ValueVector::with_type(LogicalType::Int64);
532 source_data.push_int64(10);
533 source_data.push_int64(20);
534 let level0 = FactorizationLevel::flat(
535 vec![FactorizedVector::flat(source_data)],
536 vec!["source".to_string()],
537 );
538
539 let mut child_data = ValueVector::with_type(LogicalType::Int64);
541 child_data.push_int64(1);
542 child_data.push_int64(2);
543 child_data.push_int64(3);
544 child_data.push_int64(4);
545 child_data.push_int64(5);
546
547 let offsets = vec![0u32, 3, 5];
548 let child_vec = FactorizedVector::unflat(child_data, offsets, 2);
549 let level1 =
550 FactorizationLevel::unflat(vec![child_vec], vec!["child".to_string()], vec![3, 2]);
551
552 let mut chunk = FactorizedChunk::empty();
553 chunk.add_factorized_level(level0);
554 chunk.add_factorized_level(level1);
555 chunk
556 }
557
558 fn create_node_chunk() -> FactorizedChunk {
559 let mut source_data = ValueVector::with_type(LogicalType::Node);
560 source_data.push_node_id(NodeId::new(100));
561 source_data.push_node_id(NodeId::new(200));
562 let level0 = FactorizationLevel::flat(
563 vec![FactorizedVector::flat(source_data)],
564 vec!["source".to_string()],
565 );
566
567 let mut chunk = FactorizedChunk::empty();
568 chunk.add_factorized_level(level0);
569 chunk
570 }
571
572 #[test]
573 fn test_row_indices_new() {
574 let indices = RowIndices::new(&[0, 1, 2]);
575 assert_eq!(indices.level_count(), 3);
576 assert_eq!(indices.get(0), Some(0));
577 assert_eq!(indices.get(1), Some(1));
578 assert_eq!(indices.get(2), Some(2));
579 assert_eq!(indices.get(3), None);
580 }
581
582 #[test]
583 fn test_row_indices_as_slice() {
584 let indices = RowIndices::new(&[5, 10, 15]);
585 assert_eq!(indices.as_slice(), &[5, 10, 15]);
586 }
587
588 #[test]
589 fn test_row_view_new() {
590 let chunk = create_test_chunk();
591 let indices = RowIndices::new(&[0, 0]);
592
593 let view = RowView::new(&chunk, indices);
594 assert_eq!(view.level_count(), 2);
595 }
596
597 #[test]
598 fn test_row_view_from_ref() {
599 let chunk = create_test_chunk();
600 let indices = RowIndices::new(&[0, 1]);
601
602 let view = RowView::from_ref(&chunk, &indices);
603 assert_eq!(view.get(0, 0), Some(Value::Int64(10)));
604 assert_eq!(view.get(1, 0), Some(Value::Int64(2)));
605 }
606
607 #[test]
608 fn test_row_view_get_node_id() {
609 let chunk = create_node_chunk();
610 let indices = RowIndices::new(&[0]);
611
612 let view = RowView::new(&chunk, indices);
613 assert_eq!(view.get_node_id(0, 0), Some(NodeId::new(100)));
614 }
615
616 #[test]
617 fn test_row_view_get_invalid() {
618 let chunk = create_test_chunk();
619 let indices = RowIndices::new(&[0, 0]);
620
621 let view = RowView::new(&chunk, indices);
622
623 assert_eq!(view.get(10, 0), None);
625
626 assert_eq!(view.get(0, 10), None);
628 }
629
630 #[test]
631 fn test_row_view_values() {
632 let chunk = create_test_chunk();
633 let indices = RowIndices::new(&[0, 0]);
634
635 let view = RowView::new(&chunk, indices);
636 let values: Vec<Value> = view.values().collect();
637
638 assert_eq!(values.len(), 2);
639 assert_eq!(values[0], Value::Int64(10));
640 assert_eq!(values[1], Value::Int64(1));
641 }
642
643 #[test]
644 fn test_row_view_to_vec() {
645 let chunk = create_test_chunk();
646 let indices = RowIndices::new(&[1, 4]);
647
648 let view = RowView::new(&chunk, indices);
649 let vec = view.to_vec();
650
651 assert_eq!(vec.len(), 2);
652 assert_eq!(vec[0], Value::Int64(20));
653 assert_eq!(vec[1], Value::Int64(5));
654 }
655
656 #[test]
657 fn test_precomputed_iter_count() {
658 let chunk = create_test_chunk();
659 let iter = PrecomputedIter::new(&chunk);
660
661 assert_eq!(iter.len(), 5);
663 }
664
665 #[test]
666 fn test_precomputed_iter_values() {
667 let chunk = create_test_chunk();
668 let iter = PrecomputedIter::new(&chunk);
669
670 let rows: Vec<Vec<Value>> = iter.map(|row| row.to_vec()).collect();
671
672 assert_eq!(rows.len(), 5);
673 assert_eq!(rows[0], vec![Value::Int64(10), Value::Int64(1)]);
674 assert_eq!(rows[1], vec![Value::Int64(10), Value::Int64(2)]);
675 assert_eq!(rows[2], vec![Value::Int64(10), Value::Int64(3)]);
676 assert_eq!(rows[3], vec![Value::Int64(20), Value::Int64(4)]);
677 assert_eq!(rows[4], vec![Value::Int64(20), Value::Int64(5)]);
678 }
679
680 #[test]
681 fn test_precomputed_iter_get() {
682 let chunk = create_test_chunk();
683 let iter = PrecomputedIter::new(&chunk);
684
685 let indices = iter.get(2).unwrap();
686 assert_eq!(indices.as_slice(), &[0, 2]);
687
688 assert!(iter.get(10).is_none());
689 }
690
691 #[test]
692 fn test_precomputed_iter_rows() {
693 let chunk = create_test_chunk();
694 let iter = PrecomputedIter::new(&chunk);
695
696 let rows: Vec<RowView> = iter.rows().collect();
697 assert_eq!(rows.len(), 5);
698 }
699
700 #[test]
701 fn test_precomputed_iter_reset() {
702 let chunk = create_test_chunk();
703 let mut iter = PrecomputedIter::new(&chunk);
704
705 assert!(iter.next().is_some());
707 assert!(iter.next().is_some());
708 assert_eq!(iter.size_hint().0, 3);
709
710 iter.reset();
712 assert_eq!(iter.size_hint().0, 5);
713 }
714
715 #[test]
716 fn test_row_view_get() {
717 let chunk = create_test_chunk();
718 let iter = PrecomputedIter::new(&chunk);
719
720 let first_row = iter.row(0).unwrap();
721 assert_eq!(first_row.get(0, 0), Some(Value::Int64(10)));
722 assert_eq!(first_row.get(1, 0), Some(Value::Int64(1)));
723
724 let last_row = iter.row(4).unwrap();
725 assert_eq!(last_row.get(0, 0), Some(Value::Int64(20)));
726 assert_eq!(last_row.get(1, 0), Some(Value::Int64(5)));
727
728 assert!(iter.row(10).is_none());
729 }
730
731 #[test]
732 fn test_streaming_iter() {
733 let chunk = create_test_chunk();
734 let iter = StreamingIter::new(&chunk);
735
736 let indices: Vec<RowIndices> = iter.collect();
737
738 assert_eq!(indices.len(), 5);
739 assert_eq!(indices[0].as_slice(), &[0, 0]);
740 assert_eq!(indices[1].as_slice(), &[0, 1]);
741 assert_eq!(indices[2].as_slice(), &[0, 2]);
742 assert_eq!(indices[3].as_slice(), &[1, 3]);
743 assert_eq!(indices[4].as_slice(), &[1, 4]);
744 }
745
746 #[test]
747 fn test_streaming_iter_current_indices() {
748 let chunk = create_test_chunk();
749 let iter = StreamingIter::new(&chunk);
750
751 let current = iter.current_indices();
752 assert!(current.is_some());
753 assert_eq!(current.unwrap().as_slice(), &[0, 0]);
754 }
755
756 #[test]
757 fn test_streaming_iter_reset() {
758 let chunk = create_test_chunk();
759 let mut iter = StreamingIter::new(&chunk);
760
761 iter.next();
763 iter.next();
764
765 iter.reset();
767
768 let first = iter.next().unwrap();
770 assert_eq!(first.as_slice(), &[0, 0]);
771 }
772
773 #[test]
774 fn test_streaming_iter_exhausted() {
775 let chunk = create_test_chunk();
776 let mut iter = StreamingIter::new(&chunk);
777
778 while iter.next().is_some() {}
780
781 assert!(iter.current_indices().is_none());
783 }
784
785 #[test]
786 fn test_empty_chunk() {
787 let chunk = FactorizedChunk::empty();
788 let iter = PrecomputedIter::new(&chunk);
789
790 assert!(iter.is_empty());
791 assert_eq!(iter.len(), 0);
792 }
793
794 #[test]
795 fn test_empty_chunk_streaming() {
796 let chunk = FactorizedChunk::empty();
797 let mut iter = StreamingIter::new(&chunk);
798
799 assert!(iter.next().is_none());
800 assert!(iter.current_indices().is_none());
801 }
802
803 #[test]
804 fn test_random_access() {
805 let chunk = create_test_chunk();
806 let iter = PrecomputedIter::new(&chunk);
807
808 let row2 = iter.row(2).unwrap();
810 let row4 = iter.row(4).unwrap();
811 let row0 = iter.row(0).unwrap();
812
813 assert_eq!(row2.get(1, 0), Some(Value::Int64(3)));
814 assert_eq!(row4.get(1, 0), Some(Value::Int64(5)));
815 assert_eq!(row0.get(1, 0), Some(Value::Int64(1)));
816 }
817
818 #[test]
819 fn test_exact_size_iterator() {
820 let chunk = create_test_chunk();
821 let iter = PrecomputedIter::new(&chunk);
822
823 assert_eq!(iter.len(), 5);
824 assert_eq!(iter.size_hint(), (5, Some(5)));
825 }
826
827 #[test]
828 fn test_single_level_chunk() {
829 let mut source_data = ValueVector::with_type(LogicalType::Int64);
830 source_data.push_int64(1);
831 source_data.push_int64(2);
832 source_data.push_int64(3);
833 let level0 = FactorizationLevel::flat(
834 vec![FactorizedVector::flat(source_data)],
835 vec!["value".to_string()],
836 );
837
838 let mut chunk = FactorizedChunk::empty();
839 chunk.add_factorized_level(level0);
840
841 let iter = PrecomputedIter::new(&chunk);
842 assert_eq!(iter.len(), 3);
843
844 let streaming = StreamingIter::new(&chunk);
845 let indices: Vec<RowIndices> = streaming.collect();
846 assert_eq!(indices.len(), 3);
847 }
848
849 #[test]
850 fn test_row_indices_clone() {
851 let indices = RowIndices::new(&[1, 2, 3]);
852 let cloned = indices.clone();
853
854 assert_eq!(indices.as_slice(), cloned.as_slice());
855 }
856
857 #[test]
858 fn test_row_view_level_count() {
859 let chunk = create_test_chunk();
860 let indices = RowIndices::new(&[0, 0]);
861 let view = RowView::new(&chunk, indices);
862
863 assert_eq!(view.level_count(), 2);
864 }
865}