1use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14pub trait ParallelSource: Source + Send + Sync {
21 fn total_rows(&self) -> Option<usize>;
25
26 fn is_partitionable(&self) -> bool {
30 self.total_rows().is_some()
31 }
32
33 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39 fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43 match self.total_rows() {
44 Some(total) => generate_morsels(total, morsel_size, source_id),
45 None => Vec::new(),
46 }
47 }
48
49 fn num_columns(&self) -> usize;
51}
52
53pub struct ParallelVectorSource {
57 columns: Arc<Vec<Vec<Value>>>,
59 position: usize,
61}
62
63impl ParallelVectorSource {
64 #[must_use]
66 pub fn new(columns: Vec<Vec<Value>>) -> Self {
67 Self {
68 columns: Arc::new(columns),
69 position: 0,
70 }
71 }
72
73 #[must_use]
75 pub fn single_column(values: Vec<Value>) -> Self {
76 Self::new(vec![values])
77 }
78}
79
80impl Source for ParallelVectorSource {
81 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82 if self.columns.is_empty() || self.columns[0].is_empty() {
83 return Ok(None);
84 }
85
86 let total_rows = self.columns[0].len();
87 if self.position >= total_rows {
88 return Ok(None);
89 }
90
91 let end = (self.position + chunk_size).min(total_rows);
92 let mut vectors = Vec::with_capacity(self.columns.len());
93
94 for col_values in self.columns.iter() {
95 let slice = &col_values[self.position..end];
96 vectors.push(ValueVector::from_values(slice));
97 }
98
99 self.position = end;
100 Ok(Some(DataChunk::new(vectors)))
101 }
102
103 fn reset(&mut self) {
104 self.position = 0;
105 }
106
107 fn name(&self) -> &'static str {
108 "ParallelVectorSource"
109 }
110}
111
112impl ParallelSource for ParallelVectorSource {
113 fn total_rows(&self) -> Option<usize> {
114 if self.columns.is_empty() {
115 Some(0)
116 } else {
117 Some(self.columns[0].len())
118 }
119 }
120
121 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122 Box::new(PartitionedVectorSource::new(
123 Arc::clone(&self.columns),
124 morsel.start_row,
125 morsel.end_row,
126 ))
127 }
128
129 fn num_columns(&self) -> usize {
130 self.columns.len()
131 }
132}
133
134struct PartitionedVectorSource {
138 columns: Arc<Vec<Vec<Value>>>,
139 start_row: usize,
140 end_row: usize,
141 position: usize,
142}
143
144impl PartitionedVectorSource {
145 fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146 Self {
147 columns,
148 start_row,
149 end_row,
150 position: start_row,
151 }
152 }
153}
154
155impl Source for PartitionedVectorSource {
156 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157 if self.columns.is_empty() || self.position >= self.end_row {
158 return Ok(None);
159 }
160
161 let end = (self.position + chunk_size).min(self.end_row);
162 let mut vectors = Vec::with_capacity(self.columns.len());
163
164 for col_values in self.columns.iter() {
165 let slice = &col_values[self.position..end];
166 vectors.push(ValueVector::from_values(slice));
167 }
168
169 self.position = end;
170 Ok(Some(DataChunk::new(vectors)))
171 }
172
173 fn reset(&mut self) {
174 self.position = self.start_row;
175 }
176
177 fn name(&self) -> &'static str {
178 "PartitionedVectorSource"
179 }
180}
181
182pub struct ParallelChunkSource {
186 chunks: Arc<Vec<DataChunk>>,
187 cumulative_rows: Vec<usize>,
189 total_rows: usize,
191 chunk_index: usize,
193 num_columns: usize,
195}
196
197impl ParallelChunkSource {
198 #[must_use]
200 pub fn new(chunks: Vec<DataChunk>) -> Self {
201 let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202 let mut sum = 0;
203 cumulative_rows.push(0);
204 for chunk in &chunks {
205 sum += chunk.len();
206 cumulative_rows.push(sum);
207 }
208
209 let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211 Self {
212 chunks: Arc::new(chunks),
213 cumulative_rows,
214 total_rows: sum,
215 chunk_index: 0,
216 num_columns,
217 }
218 }
219}
220
221impl Source for ParallelChunkSource {
222 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223 if self.chunk_index >= self.chunks.len() {
224 return Ok(None);
225 }
226
227 let chunk = self.chunks[self.chunk_index].clone();
228 self.chunk_index += 1;
229 Ok(Some(chunk))
230 }
231
232 fn reset(&mut self) {
233 self.chunk_index = 0;
234 }
235
236 fn name(&self) -> &'static str {
237 "ParallelChunkSource"
238 }
239}
240
241impl ParallelSource for ParallelChunkSource {
242 fn total_rows(&self) -> Option<usize> {
243 Some(self.total_rows)
244 }
245
246 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247 Box::new(PartitionedChunkSource::new(
248 Arc::clone(&self.chunks),
249 self.cumulative_rows.clone(),
250 morsel.start_row,
251 morsel.end_row,
252 ))
253 }
254
255 fn num_columns(&self) -> usize {
256 self.num_columns
257 }
258}
259
260struct PartitionedChunkSource {
262 chunks: Arc<Vec<DataChunk>>,
263 cumulative_rows: Vec<usize>,
264 start_row: usize,
265 end_row: usize,
266 current_row: usize,
267}
268
269impl PartitionedChunkSource {
270 fn new(
271 chunks: Arc<Vec<DataChunk>>,
272 cumulative_rows: Vec<usize>,
273 start_row: usize,
274 end_row: usize,
275 ) -> Self {
276 Self {
277 chunks,
278 cumulative_rows,
279 start_row,
280 end_row,
281 current_row: start_row,
282 }
283 }
284
285 fn find_chunk_index(&self, row: usize) -> Option<usize> {
287 match self
289 .cumulative_rows
290 .binary_search_by(|&cumul| cumul.cmp(&row))
291 {
292 Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293 Err(idx) => {
294 if idx == 0 {
295 Some(0)
296 } else {
297 Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298 }
299 }
300 }
301 }
302}
303
304impl Source for PartitionedChunkSource {
305 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306 if self.current_row >= self.end_row || self.chunks.is_empty() {
307 return Ok(None);
308 }
309
310 let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312 return Ok(None);
313 };
314
315 if chunk_idx >= self.chunks.len() {
316 return Ok(None);
317 }
318
319 let chunk_start = self.cumulative_rows[chunk_idx];
320 let chunk = &self.chunks[chunk_idx];
321 let offset_in_chunk = self.current_row - chunk_start;
322
323 let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325 let rows_to_end = self.end_row.saturating_sub(self.current_row);
326 let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328 if rows_to_extract == 0 {
329 return Ok(None);
330 }
331
332 let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334 self.current_row += rows_to_extract;
335
336 Ok(Some(sliced))
337 }
338
339 fn reset(&mut self) {
340 self.current_row = self.start_row;
341 }
342
343 fn name(&self) -> &'static str {
344 "PartitionedChunkSource"
345 }
346}
347
348pub struct RangeSource {
352 total: usize,
353 position: usize,
354}
355
356impl RangeSource {
357 #[must_use]
359 pub fn new(total: usize) -> Self {
360 Self { total, position: 0 }
361 }
362}
363
364impl Source for RangeSource {
365 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366 if self.position >= self.total {
367 return Ok(None);
368 }
369
370 let end = (self.position + chunk_size).min(self.total);
371 let values: Vec<Value> = (self.position..end)
372 .map(|i| Value::Int64(i as i64))
373 .collect();
374
375 self.position = end;
376 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
377 &values,
378 )])))
379 }
380
381 fn reset(&mut self) {
382 self.position = 0;
383 }
384
385 fn name(&self) -> &'static str {
386 "RangeSource"
387 }
388}
389
390impl ParallelSource for RangeSource {
391 fn total_rows(&self) -> Option<usize> {
392 Some(self.total)
393 }
394
395 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
396 Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
397 }
398
399 fn num_columns(&self) -> usize {
400 1
401 }
402}
403
404struct RangePartition {
406 start: usize,
407 end: usize,
408 position: usize,
409}
410
411impl RangePartition {
412 fn new(start: usize, end: usize) -> Self {
413 Self {
414 start,
415 end,
416 position: start,
417 }
418 }
419}
420
421impl Source for RangePartition {
422 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
423 if self.position >= self.end {
424 return Ok(None);
425 }
426
427 let end = (self.position + chunk_size).min(self.end);
428 let values: Vec<Value> = (self.position..end)
429 .map(|i| Value::Int64(i as i64))
430 .collect();
431
432 self.position = end;
433 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
434 &values,
435 )])))
436 }
437
438 fn reset(&mut self) {
439 self.position = self.start;
440 }
441
442 fn name(&self) -> &'static str {
443 "RangePartition"
444 }
445}
446
447#[cfg(feature = "rdf")]
452pub struct ParallelTripleScanSource {
453 triples: Arc<Vec<(Value, Value, Value)>>,
455 position: usize,
457}
458
459#[cfg(feature = "rdf")]
460impl ParallelTripleScanSource {
461 #[must_use]
463 pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
464 Self {
465 triples: Arc::new(triples),
466 position: 0,
467 }
468 }
469
470 pub fn from_triples<I>(iter: I) -> Self
472 where
473 I: IntoIterator<Item = (Value, Value, Value)>,
474 {
475 Self::new(iter.into_iter().collect())
476 }
477}
478
479#[cfg(feature = "rdf")]
480impl Source for ParallelTripleScanSource {
481 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
482 if self.position >= self.triples.len() {
483 return Ok(None);
484 }
485
486 let end = (self.position + chunk_size).min(self.triples.len());
487 let slice = &self.triples[self.position..end];
488
489 let mut subjects = Vec::with_capacity(slice.len());
490 let mut predicates = Vec::with_capacity(slice.len());
491 let mut objects = Vec::with_capacity(slice.len());
492
493 for (s, p, o) in slice {
494 subjects.push(s.clone());
495 predicates.push(p.clone());
496 objects.push(o.clone());
497 }
498
499 let columns = vec![
500 ValueVector::from_values(&subjects),
501 ValueVector::from_values(&predicates),
502 ValueVector::from_values(&objects),
503 ];
504
505 self.position = end;
506 Ok(Some(DataChunk::new(columns)))
507 }
508
509 fn reset(&mut self) {
510 self.position = 0;
511 }
512
513 fn name(&self) -> &'static str {
514 "ParallelTripleScanSource"
515 }
516}
517
518#[cfg(feature = "rdf")]
519impl ParallelSource for ParallelTripleScanSource {
520 fn total_rows(&self) -> Option<usize> {
521 Some(self.triples.len())
522 }
523
524 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
525 Box::new(PartitionedTripleScanSource::new(
526 Arc::clone(&self.triples),
527 morsel.start_row,
528 morsel.end_row,
529 ))
530 }
531
532 fn num_columns(&self) -> usize {
533 3 }
535}
536
537#[cfg(feature = "rdf")]
539struct PartitionedTripleScanSource {
540 triples: Arc<Vec<(Value, Value, Value)>>,
541 start_row: usize,
542 end_row: usize,
543 position: usize,
544}
545
546#[cfg(feature = "rdf")]
547impl PartitionedTripleScanSource {
548 fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
549 Self {
550 triples,
551 start_row,
552 end_row,
553 position: start_row,
554 }
555 }
556}
557
558#[cfg(feature = "rdf")]
559impl Source for PartitionedTripleScanSource {
560 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
561 if self.position >= self.end_row || self.position >= self.triples.len() {
562 return Ok(None);
563 }
564
565 let end = (self.position + chunk_size)
566 .min(self.end_row)
567 .min(self.triples.len());
568 let slice = &self.triples[self.position..end];
569
570 let mut subjects = Vec::with_capacity(slice.len());
571 let mut predicates = Vec::with_capacity(slice.len());
572 let mut objects = Vec::with_capacity(slice.len());
573
574 for (s, p, o) in slice {
575 subjects.push(s.clone());
576 predicates.push(p.clone());
577 objects.push(o.clone());
578 }
579
580 let columns = vec![
581 ValueVector::from_values(&subjects),
582 ValueVector::from_values(&predicates),
583 ValueVector::from_values(&objects),
584 ];
585
586 self.position = end;
587 Ok(Some(DataChunk::new(columns)))
588 }
589
590 fn reset(&mut self) {
591 self.position = self.start_row;
592 }
593
594 fn name(&self) -> &'static str {
595 "PartitionedTripleScanSource"
596 }
597}
598
599use crate::graph::GraphStore;
604use grafeo_common::types::NodeId;
605
606pub struct ParallelNodeScanSource {
627 store: Arc<dyn GraphStore>,
629 node_ids: Arc<Vec<NodeId>>,
631 position: usize,
633}
634
635impl ParallelNodeScanSource {
636 #[must_use]
638 pub fn new(store: Arc<dyn GraphStore>) -> Self {
639 let node_ids = Arc::new(store.node_ids());
640 Self {
641 store,
642 node_ids,
643 position: 0,
644 }
645 }
646
647 #[must_use]
649 pub fn with_label(store: Arc<dyn GraphStore>, label: &str) -> Self {
650 let node_ids = Arc::new(store.nodes_by_label(label));
651 Self {
652 store,
653 node_ids,
654 position: 0,
655 }
656 }
657
658 #[must_use]
662 pub fn from_node_ids(store: Arc<dyn GraphStore>, node_ids: Vec<NodeId>) -> Self {
663 Self {
664 store,
665 node_ids: Arc::new(node_ids),
666 position: 0,
667 }
668 }
669
670 #[must_use]
672 pub fn store(&self) -> &Arc<dyn GraphStore> {
673 &self.store
674 }
675}
676
677impl Source for ParallelNodeScanSource {
678 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
679 if self.position >= self.node_ids.len() {
680 return Ok(None);
681 }
682
683 let end = (self.position + chunk_size).min(self.node_ids.len());
684 let slice = &self.node_ids[self.position..end];
685
686 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
688 for &id in slice {
689 vector.push_node_id(id);
690 }
691
692 self.position = end;
693 Ok(Some(DataChunk::new(vec![vector])))
694 }
695
696 fn reset(&mut self) {
697 self.position = 0;
698 }
699
700 fn name(&self) -> &'static str {
701 "ParallelNodeScanSource"
702 }
703}
704
705impl ParallelSource for ParallelNodeScanSource {
706 fn total_rows(&self) -> Option<usize> {
707 Some(self.node_ids.len())
708 }
709
710 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
711 Box::new(PartitionedNodeScanSource::new(
712 Arc::clone(&self.node_ids),
713 morsel.start_row,
714 morsel.end_row,
715 ))
716 }
717
718 fn num_columns(&self) -> usize {
719 1 }
721}
722
723struct PartitionedNodeScanSource {
725 node_ids: Arc<Vec<NodeId>>,
726 start_row: usize,
727 end_row: usize,
728 position: usize,
729}
730
731impl PartitionedNodeScanSource {
732 fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
733 Self {
734 node_ids,
735 start_row,
736 end_row,
737 position: start_row,
738 }
739 }
740}
741
742impl Source for PartitionedNodeScanSource {
743 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
744 if self.position >= self.end_row || self.position >= self.node_ids.len() {
745 return Ok(None);
746 }
747
748 let end = (self.position + chunk_size)
749 .min(self.end_row)
750 .min(self.node_ids.len());
751 let slice = &self.node_ids[self.position..end];
752
753 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
755 for &id in slice {
756 vector.push_node_id(id);
757 }
758
759 self.position = end;
760 Ok(Some(DataChunk::new(vec![vector])))
761 }
762
763 fn reset(&mut self) {
764 self.position = self.start_row;
765 }
766
767 fn name(&self) -> &'static str {
768 "PartitionedNodeScanSource"
769 }
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775 use crate::graph::GraphStoreMut;
776 use crate::graph::lpg::LpgStore;
777
778 #[test]
779 fn test_parallel_vector_source() {
780 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
781 let source = ParallelVectorSource::single_column(values);
782
783 assert_eq!(source.total_rows(), Some(100));
784 assert!(source.is_partitionable());
785 assert_eq!(source.num_columns(), 1);
786
787 let morsels = source.generate_morsels(30, 0);
788 assert_eq!(morsels.len(), 4); }
790
791 #[test]
792 fn test_parallel_vector_source_partition() {
793 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
794 let source = ParallelVectorSource::single_column(values);
795
796 let morsel = Morsel::new(0, 0, 20, 50);
797 let mut partition = source.create_partition(&morsel);
798
799 let mut total = 0;
801 while let Ok(Some(chunk)) = partition.next_chunk(10) {
802 total += chunk.len();
803 }
804 assert_eq!(total, 30);
805 }
806
807 #[test]
808 fn test_range_source() {
809 let source = RangeSource::new(100);
810
811 assert_eq!(source.total_rows(), Some(100));
812 assert!(source.is_partitionable());
813
814 let morsels = source.generate_morsels(25, 0);
815 assert_eq!(morsels.len(), 4);
816 }
817
818 #[test]
819 fn test_range_source_partition() {
820 let source = RangeSource::new(100);
821
822 let morsel = Morsel::new(0, 0, 10, 30);
823 let mut partition = source.create_partition(&morsel);
824
825 let chunk = partition.next_chunk(100).unwrap().unwrap();
826 assert_eq!(chunk.len(), 20);
827
828 let col = chunk.column(0).unwrap();
830 assert_eq!(col.get(0), Some(Value::Int64(10)));
831 assert_eq!(col.get(19), Some(Value::Int64(29)));
832 }
833
834 #[test]
835 fn test_parallel_chunk_source() {
836 let chunks: Vec<DataChunk> = (0..5)
837 .map(|i| {
838 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
839 DataChunk::new(vec![ValueVector::from_values(&values)])
840 })
841 .collect();
842
843 let source = ParallelChunkSource::new(chunks);
844 assert_eq!(source.total_rows(), Some(50));
845 assert_eq!(source.num_columns(), 1);
846 }
847
848 #[test]
849 fn test_parallel_chunk_source_partition() {
850 let chunks: Vec<DataChunk> = (0..5)
851 .map(|i| {
852 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
853 DataChunk::new(vec![ValueVector::from_values(&values)])
854 })
855 .collect();
856
857 let source = ParallelChunkSource::new(chunks);
858
859 let morsel = Morsel::new(0, 0, 15, 35);
861 let mut partition = source.create_partition(&morsel);
862
863 let mut total = 0;
864 let mut first_value: Option<i64> = None;
865 let mut last_value: Option<i64> = None;
866
867 while let Ok(Some(chunk)) = partition.next_chunk(10) {
868 if first_value.is_none()
869 && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
870 {
871 first_value = Some(v);
872 }
873 if let Some(Value::Int64(v)) = chunk
874 .column(0)
875 .and_then(|c| c.get(chunk.len().saturating_sub(1)))
876 {
877 last_value = Some(v);
878 }
879 total += chunk.len();
880 }
881
882 assert_eq!(total, 20);
883 assert_eq!(first_value, Some(15));
884 assert_eq!(last_value, Some(34));
885 }
886
887 #[test]
888 fn test_partitioned_source_reset() {
889 let source = RangeSource::new(100);
890 let morsel = Morsel::new(0, 0, 0, 50);
891 let mut partition = source.create_partition(&morsel);
892
893 while partition.next_chunk(100).unwrap().is_some() {}
895
896 partition.reset();
898 let chunk = partition.next_chunk(100).unwrap().unwrap();
899 assert_eq!(chunk.len(), 50);
900 }
901
902 #[cfg(feature = "rdf")]
903 #[test]
904 fn test_parallel_triple_scan_source() {
905 let triples = vec![
906 (
907 Value::String("s1".into()),
908 Value::String("p1".into()),
909 Value::String("o1".into()),
910 ),
911 (
912 Value::String("s2".into()),
913 Value::String("p2".into()),
914 Value::String("o2".into()),
915 ),
916 (
917 Value::String("s3".into()),
918 Value::String("p3".into()),
919 Value::String("o3".into()),
920 ),
921 ];
922 let source = ParallelTripleScanSource::new(triples);
923
924 assert_eq!(source.total_rows(), Some(3));
925 assert!(source.is_partitionable());
926 assert_eq!(source.num_columns(), 3);
927 }
928
929 #[cfg(feature = "rdf")]
930 #[test]
931 fn test_parallel_triple_scan_partition() {
932 let triples: Vec<(Value, Value, Value)> = (0..100)
933 .map(|i| {
934 (
935 Value::String(format!("s{}", i).into()),
936 Value::String(format!("p{}", i).into()),
937 Value::String(format!("o{}", i).into()),
938 )
939 })
940 .collect();
941 let source = ParallelTripleScanSource::new(triples);
942
943 let morsel = Morsel::new(0, 0, 20, 50);
944 let mut partition = source.create_partition(&morsel);
945
946 let mut total = 0;
947 while let Ok(Some(chunk)) = partition.next_chunk(10) {
948 total += chunk.len();
949 }
950 assert_eq!(total, 30);
951 }
952
953 #[test]
954 fn test_parallel_node_scan_source() {
955 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
956
957 for i in 0..100 {
959 if i % 2 == 0 {
960 store.create_node(&["Person", "Employee"]);
961 } else {
962 store.create_node(&["Person"]);
963 }
964 }
965
966 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
968 assert_eq!(source.total_rows(), Some(100));
969 assert!(source.is_partitionable());
970 assert_eq!(source.num_columns(), 1);
971
972 let source_person =
974 ParallelNodeScanSource::with_label(Arc::clone(&store) as Arc<dyn GraphStore>, "Person");
975 assert_eq!(source_person.total_rows(), Some(100));
976
977 let source_employee = ParallelNodeScanSource::with_label(
978 Arc::clone(&store) as Arc<dyn GraphStore>,
979 "Employee",
980 );
981 assert_eq!(source_employee.total_rows(), Some(50));
982 }
983
984 #[test]
985 fn test_parallel_node_scan_partition() {
986 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
987
988 for _ in 0..100 {
990 store.create_node(&[]);
991 }
992
993 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
994
995 let morsel = Morsel::new(0, 0, 20, 50);
997 let mut partition = source.create_partition(&morsel);
998
999 let mut total = 0;
1001 while let Ok(Some(chunk)) = partition.next_chunk(10) {
1002 total += chunk.len();
1003 }
1004 assert_eq!(total, 30);
1005 }
1006
1007 #[test]
1008 fn test_parallel_node_scan_morsels() {
1009 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
1010
1011 for _ in 0..1000 {
1013 store.create_node(&[]);
1014 }
1015
1016 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1017
1018 let morsels = source.generate_morsels(256, 0);
1020 assert_eq!(morsels.len(), 4); let mut total_rows = 0;
1024 for morsel in &morsels {
1025 total_rows += morsel.end_row - morsel.start_row;
1026 }
1027 assert_eq!(total_rows, 1000);
1028 }
1029}