1use smallvec::SmallVec;
30
31use super::factorized_chunk::FactorizedChunk;
32use grafeo_common::types::{EdgeId, NodeId, Value};
33
34const STACK_LEVELS: usize = 4;
37
38#[derive(Debug, Clone)]
42pub struct RowIndices {
43 indices: SmallVec<[usize; STACK_LEVELS]>,
46}
47
48impl RowIndices {
49 #[must_use]
51 pub fn new(indices: &[usize]) -> Self {
52 Self {
53 indices: SmallVec::from_slice(indices),
54 }
55 }
56
57 #[must_use]
59 pub fn get(&self, level: usize) -> Option<usize> {
60 self.indices.get(level).copied()
61 }
62
63 #[must_use]
65 pub fn level_count(&self) -> usize {
66 self.indices.len()
67 }
68
69 #[must_use]
71 pub fn as_slice(&self) -> &[usize] {
72 &self.indices
73 }
74}
75
76#[derive(Debug, Clone)]
81pub struct RowView<'a> {
82 chunk: &'a FactorizedChunk,
83 indices: RowIndices,
84}
85
86impl<'a> RowView<'a> {
87 #[must_use]
89 pub fn new(chunk: &'a FactorizedChunk, indices: RowIndices) -> Self {
90 Self { chunk, indices }
91 }
92
93 #[must_use]
95 pub fn from_ref(chunk: &'a FactorizedChunk, indices: &RowIndices) -> Self {
96 Self {
97 chunk,
98 indices: indices.clone(),
99 }
100 }
101
102 #[must_use]
104 pub fn get(&self, level: usize, column: usize) -> Option<Value> {
105 let physical_idx = self.indices.get(level)?;
106 let level_data = self.chunk.level(level)?;
107 let col = level_data.column(column)?;
108 col.get_physical(physical_idx)
109 }
110
111 #[must_use]
113 pub fn get_node_id(&self, level: usize, column: usize) -> Option<NodeId> {
114 let physical_idx = self.indices.get(level)?;
115 let level_data = self.chunk.level(level)?;
116 let col = level_data.column(column)?;
117 col.get_node_id_physical(physical_idx)
118 }
119
120 #[must_use]
122 pub fn get_edge_id(&self, level: usize, column: usize) -> Option<EdgeId> {
123 let physical_idx = self.indices.get(level)?;
124 let level_data = self.chunk.level(level)?;
125 let col = level_data.column(column)?;
126 col.get_edge_id_physical(physical_idx)
127 }
128
129 #[must_use]
131 pub fn level_count(&self) -> usize {
132 self.indices.level_count()
133 }
134
135 pub fn values(&self) -> impl Iterator<Item = Value> + '_ {
139 (0..self.level_count()).flat_map(move |level| {
140 let physical_idx = self.indices.get(level).unwrap_or(0);
141 let level_data = self.chunk.level(level);
142 let col_count = level_data.map_or(0, |l| l.column_count());
143
144 (0..col_count).filter_map(move |col| {
145 level_data
146 .and_then(|l| l.column(col))
147 .and_then(|c| c.get_physical(physical_idx))
148 })
149 })
150 }
151
152 #[must_use]
154 pub fn to_vec(&self) -> Vec<Value> {
155 self.values().collect()
156 }
157}
158
159#[derive(Debug)]
182pub struct PrecomputedIter<'a> {
183 chunk: &'a FactorizedChunk,
184 rows: Vec<RowIndices>,
186 position: usize,
188}
189
190impl<'a> PrecomputedIter<'a> {
191 #[must_use]
196 pub fn new(chunk: &'a FactorizedChunk) -> Self {
197 let rows = Self::compute_all_indices(chunk);
198 Self {
199 chunk,
200 rows,
201 position: 0,
202 }
203 }
204
205 fn compute_all_indices(chunk: &FactorizedChunk) -> Vec<RowIndices> {
207 let level_count = chunk.level_count();
208 if level_count == 0 {
209 return Vec::new();
210 }
211
212 let logical_rows = chunk.logical_row_count();
213 let mut rows = Vec::with_capacity(logical_rows);
214
215 let mut indices: SmallVec<[usize; STACK_LEVELS]> = SmallVec::new();
217 indices.resize(level_count, 0);
218
219 Self::enumerate_rows_iterative(chunk, &mut indices, &mut rows);
220
221 rows
222 }
223
224 fn enumerate_rows_iterative(
226 chunk: &FactorizedChunk,
227 initial_indices: &mut SmallVec<[usize; STACK_LEVELS]>,
228 rows: &mut Vec<RowIndices>,
229 ) {
230 let level_count = chunk.level_count();
231 if level_count == 0 {
232 return;
233 }
234
235 for level in 0..level_count {
237 if level == 0 {
238 initial_indices[0] = 0;
239 } else {
240 let parent_idx = initial_indices[level - 1];
241 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
242 let (start, _) = col.range_for_parent(parent_idx);
243 initial_indices[level] = start;
244 }
245 }
246 }
247
248 if !Self::is_valid_position(chunk, initial_indices)
250 && !Self::advance_to_next_valid(chunk, initial_indices)
251 {
252 return; }
254
255 loop {
257 rows.push(RowIndices::new(initial_indices));
259
260 if !Self::advance_to_next_valid(chunk, initial_indices) {
262 break;
263 }
264 }
265 }
266
267 fn is_valid_position(
269 chunk: &FactorizedChunk,
270 indices: &SmallVec<[usize; STACK_LEVELS]>,
271 ) -> bool {
272 let level_count = chunk.level_count();
273
274 for level in 0..level_count {
275 if level == 0 {
276 let Some(level_data) = chunk.level(0) else {
277 return false;
278 };
279 if indices[0] >= level_data.group_count() {
280 return false;
281 }
282 } else {
283 let parent_idx = indices[level - 1];
284 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
285 let (start, end) = col.range_for_parent(parent_idx);
286 if start >= end || indices[level] < start || indices[level] >= end {
287 return false;
288 }
289 } else {
290 return false;
291 }
292 }
293 }
294
295 true
296 }
297
298 fn advance_to_next_valid(
300 chunk: &FactorizedChunk,
301 indices: &mut SmallVec<[usize; STACK_LEVELS]>,
302 ) -> bool {
303 let level_count = chunk.level_count();
304 if level_count == 0 {
305 return false;
306 }
307
308 loop {
309 let mut advanced = false;
311
312 for level in (0..level_count).rev() {
313 let (_start, end) = if level == 0 {
314 let Some(level_data) = chunk.level(0) else {
315 return false;
316 };
317 (0, level_data.group_count())
318 } else {
319 let parent_idx = indices[level - 1];
320 if let Some(col) = chunk.level(level).and_then(|l| l.column(0)) {
321 col.range_for_parent(parent_idx)
322 } else {
323 (0, 0)
324 }
325 };
326
327 if indices[level] + 1 < end {
328 indices[level] += 1;
330
331 for deeper in (level + 1)..level_count {
333 let deeper_parent = indices[deeper - 1];
334 if let Some(col) = chunk.level(deeper).and_then(|l| l.column(0)) {
335 let (deeper_start, _) = col.range_for_parent(deeper_parent);
336 indices[deeper] = deeper_start;
337 }
338 }
339
340 advanced = true;
341 break;
342 }
343 }
345
346 if !advanced {
347 return false; }
349
350 if Self::is_valid_position(chunk, indices) {
352 return true;
353 }
354 }
356 }
357
358 #[must_use]
360 pub fn len(&self) -> usize {
361 self.rows.len()
362 }
363
364 #[must_use]
366 pub fn is_empty(&self) -> bool {
367 self.rows.is_empty()
368 }
369
370 #[must_use]
372 pub fn get(&self, index: usize) -> Option<&RowIndices> {
373 self.rows.get(index)
374 }
375
376 #[must_use]
378 pub fn row(&self, index: usize) -> Option<RowView<'a>> {
379 self.rows
380 .get(index)
381 .map(|indices| RowView::from_ref(self.chunk, indices))
382 }
383
384 pub fn rows(&self) -> impl Iterator<Item = RowView<'a>> + '_ {
386 self.rows
387 .iter()
388 .map(|indices| RowView::from_ref(self.chunk, indices))
389 }
390
391 pub fn reset(&mut self) {
393 self.position = 0;
394 }
395}
396
397impl<'a> Iterator for PrecomputedIter<'a> {
398 type Item = RowView<'a>;
399
400 fn next(&mut self) -> Option<Self::Item> {
401 if self.position >= self.rows.len() {
402 return None;
403 }
404
405 let indices = &self.rows[self.position];
406 self.position += 1;
407 Some(RowView::from_ref(self.chunk, indices))
408 }
409
410 fn size_hint(&self) -> (usize, Option<usize>) {
411 let remaining = self.rows.len() - self.position;
412 (remaining, Some(remaining))
413 }
414}
415
416impl ExactSizeIterator for PrecomputedIter<'_> {}
417
418#[derive(Debug)]
423pub struct StreamingIter<'a> {
424 chunk: &'a FactorizedChunk,
425 indices: SmallVec<[usize; STACK_LEVELS]>,
426 exhausted: bool,
427 started: bool,
428}
429
430impl<'a> StreamingIter<'a> {
431 #[must_use]
433 pub fn new(chunk: &'a FactorizedChunk) -> Self {
434 let level_count = chunk.level_count();
435 let mut indices = SmallVec::new();
436 indices.resize(level_count, 0);
437
438 let mut iter = Self {
439 chunk,
440 indices,
441 exhausted: level_count == 0,
442 started: false,
443 };
444
445 if !iter.exhausted {
447 iter.initialize_indices();
448 if !PrecomputedIter::is_valid_position(chunk, &iter.indices) {
449 iter.exhausted = !PrecomputedIter::advance_to_next_valid(chunk, &mut iter.indices);
450 }
451 }
452
453 iter
454 }
455
456 fn initialize_indices(&mut self) {
458 let level_count = self.chunk.level_count();
459 for level in 0..level_count {
460 if level == 0 {
461 self.indices[0] = 0;
462 } else {
463 let parent_idx = self.indices[level - 1];
464 if let Some(col) = self.chunk.level(level).and_then(|l| l.column(0)) {
465 let (start, _) = col.range_for_parent(parent_idx);
466 self.indices[level] = start;
467 }
468 }
469 }
470 }
471
472 #[must_use]
474 pub fn current_indices(&self) -> Option<RowIndices> {
475 if self.exhausted {
476 None
477 } else {
478 Some(RowIndices::new(&self.indices))
479 }
480 }
481
482 pub fn reset(&mut self) {
484 self.started = false;
485 self.exhausted = self.chunk.level_count() == 0;
486 if !self.exhausted {
487 self.initialize_indices();
488 if !PrecomputedIter::is_valid_position(self.chunk, &self.indices) {
489 self.exhausted =
490 !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices);
491 }
492 }
493 }
494}
495
496impl<'a> Iterator for StreamingIter<'a> {
497 type Item = RowIndices;
498
499 fn next(&mut self) -> Option<Self::Item> {
500 if self.exhausted {
501 return None;
502 }
503
504 if self.started {
505 if !PrecomputedIter::advance_to_next_valid(self.chunk, &mut self.indices) {
507 self.exhausted = true;
508 return None;
509 }
510 } else {
511 self.started = true;
512 }
513
514 Some(RowIndices::new(&self.indices))
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::execution::factorized_chunk::FactorizationLevel;
522 use crate::execution::factorized_vector::FactorizedVector;
523 use crate::execution::vector::ValueVector;
524 use grafeo_common::types::{LogicalType, NodeId};
525
526 fn create_test_chunk() -> FactorizedChunk {
527 let mut source_data = ValueVector::with_type(LogicalType::Int64);
529 source_data.push_int64(10);
530 source_data.push_int64(20);
531 let level0 = FactorizationLevel::flat(
532 vec![FactorizedVector::flat(source_data)],
533 vec!["source".to_string()],
534 );
535
536 let mut child_data = ValueVector::with_type(LogicalType::Int64);
538 child_data.push_int64(1);
539 child_data.push_int64(2);
540 child_data.push_int64(3);
541 child_data.push_int64(4);
542 child_data.push_int64(5);
543
544 let offsets = vec![0u32, 3, 5];
545 let child_vec = FactorizedVector::unflat(child_data, offsets, 2);
546 let level1 =
547 FactorizationLevel::unflat(vec![child_vec], vec!["child".to_string()], vec![3, 2]);
548
549 let mut chunk = FactorizedChunk::empty();
550 chunk.add_factorized_level(level0);
551 chunk.add_factorized_level(level1);
552 chunk
553 }
554
555 fn create_node_chunk() -> FactorizedChunk {
556 let mut source_data = ValueVector::with_type(LogicalType::Node);
557 source_data.push_node_id(NodeId::new(100));
558 source_data.push_node_id(NodeId::new(200));
559 let level0 = FactorizationLevel::flat(
560 vec![FactorizedVector::flat(source_data)],
561 vec!["source".to_string()],
562 );
563
564 let mut chunk = FactorizedChunk::empty();
565 chunk.add_factorized_level(level0);
566 chunk
567 }
568
569 #[test]
570 fn test_row_indices_new() {
571 let indices = RowIndices::new(&[0, 1, 2]);
572 assert_eq!(indices.level_count(), 3);
573 assert_eq!(indices.get(0), Some(0));
574 assert_eq!(indices.get(1), Some(1));
575 assert_eq!(indices.get(2), Some(2));
576 assert_eq!(indices.get(3), None);
577 }
578
579 #[test]
580 fn test_row_indices_as_slice() {
581 let indices = RowIndices::new(&[5, 10, 15]);
582 assert_eq!(indices.as_slice(), &[5, 10, 15]);
583 }
584
585 #[test]
586 fn test_row_view_new() {
587 let chunk = create_test_chunk();
588 let indices = RowIndices::new(&[0, 0]);
589
590 let view = RowView::new(&chunk, indices);
591 assert_eq!(view.level_count(), 2);
592 }
593
594 #[test]
595 fn test_row_view_from_ref() {
596 let chunk = create_test_chunk();
597 let indices = RowIndices::new(&[0, 1]);
598
599 let view = RowView::from_ref(&chunk, &indices);
600 assert_eq!(view.get(0, 0), Some(Value::Int64(10)));
601 assert_eq!(view.get(1, 0), Some(Value::Int64(2)));
602 }
603
604 #[test]
605 fn test_row_view_get_node_id() {
606 let chunk = create_node_chunk();
607 let indices = RowIndices::new(&[0]);
608
609 let view = RowView::new(&chunk, indices);
610 assert_eq!(view.get_node_id(0, 0), Some(NodeId::new(100)));
611 }
612
613 #[test]
614 fn test_row_view_get_invalid() {
615 let chunk = create_test_chunk();
616 let indices = RowIndices::new(&[0, 0]);
617
618 let view = RowView::new(&chunk, indices);
619
620 assert_eq!(view.get(10, 0), None);
622
623 assert_eq!(view.get(0, 10), None);
625 }
626
627 #[test]
628 fn test_row_view_values() {
629 let chunk = create_test_chunk();
630 let indices = RowIndices::new(&[0, 0]);
631
632 let view = RowView::new(&chunk, indices);
633 let values: Vec<Value> = view.values().collect();
634
635 assert_eq!(values.len(), 2);
636 assert_eq!(values[0], Value::Int64(10));
637 assert_eq!(values[1], Value::Int64(1));
638 }
639
640 #[test]
641 fn test_row_view_to_vec() {
642 let chunk = create_test_chunk();
643 let indices = RowIndices::new(&[1, 4]);
644
645 let view = RowView::new(&chunk, indices);
646 let vec = view.to_vec();
647
648 assert_eq!(vec.len(), 2);
649 assert_eq!(vec[0], Value::Int64(20));
650 assert_eq!(vec[1], Value::Int64(5));
651 }
652
653 #[test]
654 fn test_precomputed_iter_count() {
655 let chunk = create_test_chunk();
656 let iter = PrecomputedIter::new(&chunk);
657
658 assert_eq!(iter.len(), 5);
660 }
661
662 #[test]
663 fn test_precomputed_iter_values() {
664 let chunk = create_test_chunk();
665 let iter = PrecomputedIter::new(&chunk);
666
667 let rows: Vec<Vec<Value>> = iter.map(|row| row.to_vec()).collect();
668
669 assert_eq!(rows.len(), 5);
670 assert_eq!(rows[0], vec![Value::Int64(10), Value::Int64(1)]);
671 assert_eq!(rows[1], vec![Value::Int64(10), Value::Int64(2)]);
672 assert_eq!(rows[2], vec![Value::Int64(10), Value::Int64(3)]);
673 assert_eq!(rows[3], vec![Value::Int64(20), Value::Int64(4)]);
674 assert_eq!(rows[4], vec![Value::Int64(20), Value::Int64(5)]);
675 }
676
677 #[test]
678 fn test_precomputed_iter_get() {
679 let chunk = create_test_chunk();
680 let iter = PrecomputedIter::new(&chunk);
681
682 let indices = iter.get(2).unwrap();
683 assert_eq!(indices.as_slice(), &[0, 2]);
684
685 assert!(iter.get(10).is_none());
686 }
687
688 #[test]
689 fn test_precomputed_iter_rows() {
690 let chunk = create_test_chunk();
691 let iter = PrecomputedIter::new(&chunk);
692
693 let rows: Vec<RowView> = iter.rows().collect();
694 assert_eq!(rows.len(), 5);
695 }
696
697 #[test]
698 fn test_precomputed_iter_reset() {
699 let chunk = create_test_chunk();
700 let mut iter = PrecomputedIter::new(&chunk);
701
702 assert!(iter.next().is_some());
704 assert!(iter.next().is_some());
705 assert_eq!(iter.size_hint().0, 3);
706
707 iter.reset();
709 assert_eq!(iter.size_hint().0, 5);
710 }
711
712 #[test]
713 fn test_row_view_get() {
714 let chunk = create_test_chunk();
715 let iter = PrecomputedIter::new(&chunk);
716
717 let first_row = iter.row(0).unwrap();
718 assert_eq!(first_row.get(0, 0), Some(Value::Int64(10)));
719 assert_eq!(first_row.get(1, 0), Some(Value::Int64(1)));
720
721 let last_row = iter.row(4).unwrap();
722 assert_eq!(last_row.get(0, 0), Some(Value::Int64(20)));
723 assert_eq!(last_row.get(1, 0), Some(Value::Int64(5)));
724
725 assert!(iter.row(10).is_none());
726 }
727
728 #[test]
729 fn test_streaming_iter() {
730 let chunk = create_test_chunk();
731 let iter = StreamingIter::new(&chunk);
732
733 let indices: Vec<RowIndices> = iter.collect();
734
735 assert_eq!(indices.len(), 5);
736 assert_eq!(indices[0].as_slice(), &[0, 0]);
737 assert_eq!(indices[1].as_slice(), &[0, 1]);
738 assert_eq!(indices[2].as_slice(), &[0, 2]);
739 assert_eq!(indices[3].as_slice(), &[1, 3]);
740 assert_eq!(indices[4].as_slice(), &[1, 4]);
741 }
742
743 #[test]
744 fn test_streaming_iter_current_indices() {
745 let chunk = create_test_chunk();
746 let iter = StreamingIter::new(&chunk);
747
748 let current = iter.current_indices();
749 assert!(current.is_some());
750 assert_eq!(current.unwrap().as_slice(), &[0, 0]);
751 }
752
753 #[test]
754 fn test_streaming_iter_reset() {
755 let chunk = create_test_chunk();
756 let mut iter = StreamingIter::new(&chunk);
757
758 iter.next();
760 iter.next();
761
762 iter.reset();
764
765 let first = iter.next().unwrap();
767 assert_eq!(first.as_slice(), &[0, 0]);
768 }
769
770 #[test]
771 fn test_streaming_iter_exhausted() {
772 let chunk = create_test_chunk();
773 let mut iter = StreamingIter::new(&chunk);
774
775 while iter.next().is_some() {}
777
778 assert!(iter.current_indices().is_none());
780 }
781
782 #[test]
783 fn test_empty_chunk() {
784 let chunk = FactorizedChunk::empty();
785 let iter = PrecomputedIter::new(&chunk);
786
787 assert!(iter.is_empty());
788 assert_eq!(iter.len(), 0);
789 }
790
791 #[test]
792 fn test_empty_chunk_streaming() {
793 let chunk = FactorizedChunk::empty();
794 let mut iter = StreamingIter::new(&chunk);
795
796 assert!(iter.next().is_none());
797 assert!(iter.current_indices().is_none());
798 }
799
800 #[test]
801 fn test_random_access() {
802 let chunk = create_test_chunk();
803 let iter = PrecomputedIter::new(&chunk);
804
805 let row2 = iter.row(2).unwrap();
807 let row4 = iter.row(4).unwrap();
808 let row0 = iter.row(0).unwrap();
809
810 assert_eq!(row2.get(1, 0), Some(Value::Int64(3)));
811 assert_eq!(row4.get(1, 0), Some(Value::Int64(5)));
812 assert_eq!(row0.get(1, 0), Some(Value::Int64(1)));
813 }
814
815 #[test]
816 fn test_exact_size_iterator() {
817 let chunk = create_test_chunk();
818 let iter = PrecomputedIter::new(&chunk);
819
820 assert_eq!(iter.len(), 5);
821 assert_eq!(iter.size_hint(), (5, Some(5)));
822 }
823
824 #[test]
825 fn test_single_level_chunk() {
826 let mut source_data = ValueVector::with_type(LogicalType::Int64);
827 source_data.push_int64(1);
828 source_data.push_int64(2);
829 source_data.push_int64(3);
830 let level0 = FactorizationLevel::flat(
831 vec![FactorizedVector::flat(source_data)],
832 vec!["value".to_string()],
833 );
834
835 let mut chunk = FactorizedChunk::empty();
836 chunk.add_factorized_level(level0);
837
838 let iter = PrecomputedIter::new(&chunk);
839 assert_eq!(iter.len(), 3);
840
841 let streaming = StreamingIter::new(&chunk);
842 let indices: Vec<RowIndices> = streaming.collect();
843 assert_eq!(indices.len(), 3);
844 }
845
846 #[test]
847 fn test_row_indices_clone() {
848 let indices = RowIndices::new(&[1, 2, 3]);
849 let cloned = indices.clone();
850
851 assert_eq!(indices.as_slice(), cloned.as_slice());
852 }
853
854 #[test]
855 fn test_row_view_level_count() {
856 let chunk = create_test_chunk();
857 let indices = RowIndices::new(&[0, 0]);
858 let view = RowView::new(&chunk, indices);
859
860 assert_eq!(view.level_count(), 2);
861 }
862}