1use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14pub trait ParallelSource: Source + Send + Sync {
21 fn total_rows(&self) -> Option<usize>;
25
26 fn is_partitionable(&self) -> bool {
30 self.total_rows().is_some()
31 }
32
33 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39 fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43 match self.total_rows() {
44 Some(total) => generate_morsels(total, morsel_size, source_id),
45 None => Vec::new(),
46 }
47 }
48
49 fn num_columns(&self) -> usize;
51}
52
53pub struct ParallelVectorSource {
57 columns: Arc<Vec<Vec<Value>>>,
59 position: usize,
61}
62
63impl ParallelVectorSource {
64 #[must_use]
66 pub fn new(columns: Vec<Vec<Value>>) -> Self {
67 Self {
68 columns: Arc::new(columns),
69 position: 0,
70 }
71 }
72
73 #[must_use]
75 pub fn single_column(values: Vec<Value>) -> Self {
76 Self::new(vec![values])
77 }
78}
79
80impl Source for ParallelVectorSource {
81 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82 if self.columns.is_empty() || self.columns[0].is_empty() {
83 return Ok(None);
84 }
85
86 let total_rows = self.columns[0].len();
87 if self.position >= total_rows {
88 return Ok(None);
89 }
90
91 let end = (self.position + chunk_size).min(total_rows);
92 let mut vectors = Vec::with_capacity(self.columns.len());
93
94 for col_values in self.columns.iter() {
95 let slice = &col_values[self.position..end];
96 vectors.push(ValueVector::from_values(slice));
97 }
98
99 self.position = end;
100 Ok(Some(DataChunk::new(vectors)))
101 }
102
103 fn reset(&mut self) {
104 self.position = 0;
105 }
106
107 fn name(&self) -> &'static str {
108 "ParallelVectorSource"
109 }
110}
111
112impl ParallelSource for ParallelVectorSource {
113 fn total_rows(&self) -> Option<usize> {
114 if self.columns.is_empty() {
115 Some(0)
116 } else {
117 Some(self.columns[0].len())
118 }
119 }
120
121 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122 Box::new(PartitionedVectorSource::new(
123 Arc::clone(&self.columns),
124 morsel.start_row,
125 morsel.end_row,
126 ))
127 }
128
129 fn num_columns(&self) -> usize {
130 self.columns.len()
131 }
132}
133
134struct PartitionedVectorSource {
138 columns: Arc<Vec<Vec<Value>>>,
139 start_row: usize,
140 end_row: usize,
141 position: usize,
142}
143
144impl PartitionedVectorSource {
145 fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146 Self {
147 columns,
148 start_row,
149 end_row,
150 position: start_row,
151 }
152 }
153}
154
155impl Source for PartitionedVectorSource {
156 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157 if self.columns.is_empty() || self.position >= self.end_row {
158 return Ok(None);
159 }
160
161 let end = (self.position + chunk_size).min(self.end_row);
162 let mut vectors = Vec::with_capacity(self.columns.len());
163
164 for col_values in self.columns.iter() {
165 let slice = &col_values[self.position..end];
166 vectors.push(ValueVector::from_values(slice));
167 }
168
169 self.position = end;
170 Ok(Some(DataChunk::new(vectors)))
171 }
172
173 fn reset(&mut self) {
174 self.position = self.start_row;
175 }
176
177 fn name(&self) -> &'static str {
178 "PartitionedVectorSource"
179 }
180}
181
182pub struct ParallelChunkSource {
186 chunks: Arc<Vec<DataChunk>>,
187 cumulative_rows: Vec<usize>,
189 total_rows: usize,
191 chunk_index: usize,
193 num_columns: usize,
195}
196
197impl ParallelChunkSource {
198 #[must_use]
200 pub fn new(chunks: Vec<DataChunk>) -> Self {
201 let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202 let mut sum = 0;
203 cumulative_rows.push(0);
204 for chunk in &chunks {
205 sum += chunk.len();
206 cumulative_rows.push(sum);
207 }
208
209 let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211 Self {
212 chunks: Arc::new(chunks),
213 cumulative_rows,
214 total_rows: sum,
215 chunk_index: 0,
216 num_columns,
217 }
218 }
219}
220
221impl Source for ParallelChunkSource {
222 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223 if self.chunk_index >= self.chunks.len() {
224 return Ok(None);
225 }
226
227 let chunk = self.chunks[self.chunk_index].clone();
228 self.chunk_index += 1;
229 Ok(Some(chunk))
230 }
231
232 fn reset(&mut self) {
233 self.chunk_index = 0;
234 }
235
236 fn name(&self) -> &'static str {
237 "ParallelChunkSource"
238 }
239}
240
241impl ParallelSource for ParallelChunkSource {
242 fn total_rows(&self) -> Option<usize> {
243 Some(self.total_rows)
244 }
245
246 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247 Box::new(PartitionedChunkSource::new(
248 Arc::clone(&self.chunks),
249 self.cumulative_rows.clone(),
250 morsel.start_row,
251 morsel.end_row,
252 ))
253 }
254
255 fn num_columns(&self) -> usize {
256 self.num_columns
257 }
258}
259
260struct PartitionedChunkSource {
262 chunks: Arc<Vec<DataChunk>>,
263 cumulative_rows: Vec<usize>,
264 start_row: usize,
265 end_row: usize,
266 current_row: usize,
267}
268
269impl PartitionedChunkSource {
270 fn new(
271 chunks: Arc<Vec<DataChunk>>,
272 cumulative_rows: Vec<usize>,
273 start_row: usize,
274 end_row: usize,
275 ) -> Self {
276 Self {
277 chunks,
278 cumulative_rows,
279 start_row,
280 end_row,
281 current_row: start_row,
282 }
283 }
284
285 fn find_chunk_index(&self, row: usize) -> Option<usize> {
287 match self
289 .cumulative_rows
290 .binary_search_by(|&cumul| cumul.cmp(&row))
291 {
292 Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293 Err(idx) => {
294 if idx == 0 {
295 Some(0)
296 } else {
297 Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298 }
299 }
300 }
301 }
302}
303
304impl Source for PartitionedChunkSource {
305 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306 if self.current_row >= self.end_row || self.chunks.is_empty() {
307 return Ok(None);
308 }
309
310 let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312 return Ok(None);
313 };
314
315 if chunk_idx >= self.chunks.len() {
316 return Ok(None);
317 }
318
319 let chunk_start = self.cumulative_rows[chunk_idx];
320 let chunk = &self.chunks[chunk_idx];
321 let offset_in_chunk = self.current_row - chunk_start;
322
323 let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325 let rows_to_end = self.end_row.saturating_sub(self.current_row);
326 let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328 if rows_to_extract == 0 {
329 return Ok(None);
330 }
331
332 let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334 self.current_row += rows_to_extract;
335
336 Ok(Some(sliced))
337 }
338
339 fn reset(&mut self) {
340 self.current_row = self.start_row;
341 }
342
343 fn name(&self) -> &'static str {
344 "PartitionedChunkSource"
345 }
346}
347
348pub struct RangeSource {
352 total: usize,
353 position: usize,
354}
355
356impl RangeSource {
357 #[must_use]
359 pub fn new(total: usize) -> Self {
360 Self { total, position: 0 }
361 }
362}
363
364impl Source for RangeSource {
365 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366 if self.position >= self.total {
367 return Ok(None);
368 }
369
370 let end = (self.position + chunk_size).min(self.total);
371 let values: Vec<Value> = (self.position..end)
372 .map(|i| {
374 #[allow(clippy::cast_possible_wrap)]
376 let val = Value::Int64(i as i64);
377 val
378 })
379 .collect();
380
381 self.position = end;
382 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
383 &values,
384 )])))
385 }
386
387 fn reset(&mut self) {
388 self.position = 0;
389 }
390
391 fn name(&self) -> &'static str {
392 "RangeSource"
393 }
394}
395
396impl ParallelSource for RangeSource {
397 fn total_rows(&self) -> Option<usize> {
398 Some(self.total)
399 }
400
401 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
402 Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
403 }
404
405 fn num_columns(&self) -> usize {
406 1
407 }
408}
409
410struct RangePartition {
412 start: usize,
413 end: usize,
414 position: usize,
415}
416
417impl RangePartition {
418 fn new(start: usize, end: usize) -> Self {
419 Self {
420 start,
421 end,
422 position: start,
423 }
424 }
425}
426
427impl Source for RangePartition {
428 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
429 if self.position >= self.end {
430 return Ok(None);
431 }
432
433 let end = (self.position + chunk_size).min(self.end);
434 let values: Vec<Value> = (self.position..end)
435 .map(|i| {
438 #[allow(clippy::cast_possible_wrap)]
440 let val = Value::Int64(i as i64);
441 val
442 })
443 .collect();
444
445 self.position = end;
446 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
447 &values,
448 )])))
449 }
450
451 fn reset(&mut self) {
452 self.position = self.start;
453 }
454
455 fn name(&self) -> &'static str {
456 "RangePartition"
457 }
458}
459
460#[cfg(feature = "triple-store")]
465pub struct ParallelTripleScanSource {
466 triples: Arc<Vec<(Value, Value, Value)>>,
468 position: usize,
470}
471
472#[cfg(feature = "triple-store")]
473impl ParallelTripleScanSource {
474 #[must_use]
476 pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
477 Self {
478 triples: Arc::new(triples),
479 position: 0,
480 }
481 }
482
483 pub fn from_triples<I>(iter: I) -> Self
485 where
486 I: IntoIterator<Item = (Value, Value, Value)>,
487 {
488 Self::new(iter.into_iter().collect())
489 }
490}
491
492#[cfg(feature = "triple-store")]
493impl Source for ParallelTripleScanSource {
494 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
495 if self.position >= self.triples.len() {
496 return Ok(None);
497 }
498
499 let end = (self.position + chunk_size).min(self.triples.len());
500 let slice = &self.triples[self.position..end];
501
502 let mut subjects = Vec::with_capacity(slice.len());
503 let mut predicates = Vec::with_capacity(slice.len());
504 let mut objects = Vec::with_capacity(slice.len());
505
506 for (s, p, o) in slice {
507 subjects.push(s.clone());
508 predicates.push(p.clone());
509 objects.push(o.clone());
510 }
511
512 let columns = vec![
513 ValueVector::from_values(&subjects),
514 ValueVector::from_values(&predicates),
515 ValueVector::from_values(&objects),
516 ];
517
518 self.position = end;
519 Ok(Some(DataChunk::new(columns)))
520 }
521
522 fn reset(&mut self) {
523 self.position = 0;
524 }
525
526 fn name(&self) -> &'static str {
527 "ParallelTripleScanSource"
528 }
529}
530
531#[cfg(feature = "triple-store")]
532impl ParallelSource for ParallelTripleScanSource {
533 fn total_rows(&self) -> Option<usize> {
534 Some(self.triples.len())
535 }
536
537 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
538 Box::new(PartitionedTripleScanSource::new(
539 Arc::clone(&self.triples),
540 morsel.start_row,
541 morsel.end_row,
542 ))
543 }
544
545 fn num_columns(&self) -> usize {
546 3 }
548}
549
550#[cfg(feature = "triple-store")]
552struct PartitionedTripleScanSource {
553 triples: Arc<Vec<(Value, Value, Value)>>,
554 start_row: usize,
555 end_row: usize,
556 position: usize,
557}
558
559#[cfg(feature = "triple-store")]
560impl PartitionedTripleScanSource {
561 fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
562 Self {
563 triples,
564 start_row,
565 end_row,
566 position: start_row,
567 }
568 }
569}
570
571#[cfg(feature = "triple-store")]
572impl Source for PartitionedTripleScanSource {
573 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
574 if self.position >= self.end_row || self.position >= self.triples.len() {
575 return Ok(None);
576 }
577
578 let end = (self.position + chunk_size)
579 .min(self.end_row)
580 .min(self.triples.len());
581 let slice = &self.triples[self.position..end];
582
583 let mut subjects = Vec::with_capacity(slice.len());
584 let mut predicates = Vec::with_capacity(slice.len());
585 let mut objects = Vec::with_capacity(slice.len());
586
587 for (s, p, o) in slice {
588 subjects.push(s.clone());
589 predicates.push(p.clone());
590 objects.push(o.clone());
591 }
592
593 let columns = vec![
594 ValueVector::from_values(&subjects),
595 ValueVector::from_values(&predicates),
596 ValueVector::from_values(&objects),
597 ];
598
599 self.position = end;
600 Ok(Some(DataChunk::new(columns)))
601 }
602
603 fn reset(&mut self) {
604 self.position = self.start_row;
605 }
606
607 fn name(&self) -> &'static str {
608 "PartitionedTripleScanSource"
609 }
610}
611
612use crate::graph::GraphStoreSearch;
617use grafeo_common::types::NodeId;
618
619pub struct ParallelNodeScanSource {
640 store: Arc<dyn GraphStoreSearch>,
642 node_ids: Arc<Vec<NodeId>>,
644 position: usize,
646}
647
648impl ParallelNodeScanSource {
649 #[must_use]
651 pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
652 let node_ids = Arc::new(store.node_ids());
653 Self {
654 store,
655 node_ids,
656 position: 0,
657 }
658 }
659
660 #[must_use]
662 pub fn with_label(store: Arc<dyn GraphStoreSearch>, label: &str) -> Self {
663 let node_ids = Arc::new(store.nodes_by_label(label));
664 Self {
665 store,
666 node_ids,
667 position: 0,
668 }
669 }
670
671 #[must_use]
675 pub fn from_node_ids(store: Arc<dyn GraphStoreSearch>, node_ids: Vec<NodeId>) -> Self {
676 Self {
677 store,
678 node_ids: Arc::new(node_ids),
679 position: 0,
680 }
681 }
682
683 #[must_use]
685 pub fn store(&self) -> &Arc<dyn GraphStoreSearch> {
686 &self.store
687 }
688}
689
690impl Source for ParallelNodeScanSource {
691 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
692 if self.position >= self.node_ids.len() {
693 return Ok(None);
694 }
695
696 let end = (self.position + chunk_size).min(self.node_ids.len());
697 let slice = &self.node_ids[self.position..end];
698
699 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
701 for &id in slice {
702 vector.push_node_id(id);
703 }
704
705 self.position = end;
706 Ok(Some(DataChunk::new(vec![vector])))
707 }
708
709 fn reset(&mut self) {
710 self.position = 0;
711 }
712
713 fn name(&self) -> &'static str {
714 "ParallelNodeScanSource"
715 }
716}
717
718impl ParallelSource for ParallelNodeScanSource {
719 fn total_rows(&self) -> Option<usize> {
720 Some(self.node_ids.len())
721 }
722
723 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
724 Box::new(PartitionedNodeScanSource::new(
725 Arc::clone(&self.node_ids),
726 morsel.start_row,
727 morsel.end_row,
728 ))
729 }
730
731 fn num_columns(&self) -> usize {
732 1 }
734}
735
736struct PartitionedNodeScanSource {
738 node_ids: Arc<Vec<NodeId>>,
739 start_row: usize,
740 end_row: usize,
741 position: usize,
742}
743
744impl PartitionedNodeScanSource {
745 fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
746 Self {
747 node_ids,
748 start_row,
749 end_row,
750 position: start_row,
751 }
752 }
753}
754
755impl Source for PartitionedNodeScanSource {
756 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
757 if self.position >= self.end_row || self.position >= self.node_ids.len() {
758 return Ok(None);
759 }
760
761 let end = (self.position + chunk_size)
762 .min(self.end_row)
763 .min(self.node_ids.len());
764 let slice = &self.node_ids[self.position..end];
765
766 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
768 for &id in slice {
769 vector.push_node_id(id);
770 }
771
772 self.position = end;
773 Ok(Some(DataChunk::new(vec![vector])))
774 }
775
776 fn reset(&mut self) {
777 self.position = self.start_row;
778 }
779
780 fn name(&self) -> &'static str {
781 "PartitionedNodeScanSource"
782 }
783}
784
785#[cfg(all(test, feature = "lpg"))]
786mod tests {
787 use super::*;
788 use crate::graph::GraphStoreMut;
789 use crate::graph::lpg::LpgStore;
790
791 #[test]
792 fn test_parallel_vector_source() {
793 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
794 let source = ParallelVectorSource::single_column(values);
795
796 assert_eq!(source.total_rows(), Some(100));
797 assert!(source.is_partitionable());
798 assert_eq!(source.num_columns(), 1);
799
800 let morsels = source.generate_morsels(30, 0);
801 assert_eq!(morsels.len(), 4); }
803
804 #[test]
805 fn test_parallel_vector_source_partition() {
806 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
807 let source = ParallelVectorSource::single_column(values);
808
809 let morsel = Morsel::new(0, 0, 20, 50);
810 let mut partition = source.create_partition(&morsel);
811
812 let mut total = 0;
814 while let Ok(Some(chunk)) = partition.next_chunk(10) {
815 total += chunk.len();
816 }
817 assert_eq!(total, 30);
818 }
819
820 #[test]
821 fn test_range_source() {
822 let source = RangeSource::new(100);
823
824 assert_eq!(source.total_rows(), Some(100));
825 assert!(source.is_partitionable());
826
827 let morsels = source.generate_morsels(25, 0);
828 assert_eq!(morsels.len(), 4);
829 }
830
831 #[test]
832 fn test_range_source_partition() {
833 let source = RangeSource::new(100);
834
835 let morsel = Morsel::new(0, 0, 10, 30);
836 let mut partition = source.create_partition(&morsel);
837
838 let chunk = partition.next_chunk(100).unwrap().unwrap();
839 assert_eq!(chunk.len(), 20);
840
841 let col = chunk.column(0).unwrap();
843 assert_eq!(col.get(0), Some(Value::Int64(10)));
844 assert_eq!(col.get(19), Some(Value::Int64(29)));
845 }
846
847 #[test]
848 fn test_parallel_chunk_source() {
849 let chunks: Vec<DataChunk> = (0..5)
850 .map(|i| {
851 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
852 DataChunk::new(vec![ValueVector::from_values(&values)])
853 })
854 .collect();
855
856 let source = ParallelChunkSource::new(chunks);
857 assert_eq!(source.total_rows(), Some(50));
858 assert_eq!(source.num_columns(), 1);
859 }
860
861 #[test]
862 fn test_parallel_chunk_source_partition() {
863 let chunks: Vec<DataChunk> = (0..5)
864 .map(|i| {
865 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
866 DataChunk::new(vec![ValueVector::from_values(&values)])
867 })
868 .collect();
869
870 let source = ParallelChunkSource::new(chunks);
871
872 let morsel = Morsel::new(0, 0, 15, 35);
874 let mut partition = source.create_partition(&morsel);
875
876 let mut total = 0;
877 let mut first_value: Option<i64> = None;
878 let mut last_value: Option<i64> = None;
879
880 while let Ok(Some(chunk)) = partition.next_chunk(10) {
881 if first_value.is_none()
882 && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
883 {
884 first_value = Some(v);
885 }
886 if let Some(Value::Int64(v)) = chunk
887 .column(0)
888 .and_then(|c| c.get(chunk.len().saturating_sub(1)))
889 {
890 last_value = Some(v);
891 }
892 total += chunk.len();
893 }
894
895 assert_eq!(total, 20);
896 assert_eq!(first_value, Some(15));
897 assert_eq!(last_value, Some(34));
898 }
899
900 #[test]
901 fn test_partitioned_source_reset() {
902 let source = RangeSource::new(100);
903 let morsel = Morsel::new(0, 0, 0, 50);
904 let mut partition = source.create_partition(&morsel);
905
906 while partition.next_chunk(100).unwrap().is_some() {}
908
909 partition.reset();
911 let chunk = partition.next_chunk(100).unwrap().unwrap();
912 assert_eq!(chunk.len(), 50);
913 }
914
915 #[cfg(feature = "triple-store")]
916 #[test]
917 fn test_parallel_triple_scan_source() {
918 let triples = vec![
919 (
920 Value::String("s1".into()),
921 Value::String("p1".into()),
922 Value::String("o1".into()),
923 ),
924 (
925 Value::String("s2".into()),
926 Value::String("p2".into()),
927 Value::String("o2".into()),
928 ),
929 (
930 Value::String("s3".into()),
931 Value::String("p3".into()),
932 Value::String("o3".into()),
933 ),
934 ];
935 let source = ParallelTripleScanSource::new(triples);
936
937 assert_eq!(source.total_rows(), Some(3));
938 assert!(source.is_partitionable());
939 assert_eq!(source.num_columns(), 3);
940 }
941
942 #[cfg(feature = "triple-store")]
943 #[test]
944 fn test_parallel_triple_scan_partition() {
945 let triples: Vec<(Value, Value, Value)> = (0..100)
946 .map(|i| {
947 (
948 Value::String(format!("s{}", i).into()),
949 Value::String(format!("p{}", i).into()),
950 Value::String(format!("o{}", i).into()),
951 )
952 })
953 .collect();
954 let source = ParallelTripleScanSource::new(triples);
955
956 let morsel = Morsel::new(0, 0, 20, 50);
957 let mut partition = source.create_partition(&morsel);
958
959 let mut total = 0;
960 while let Ok(Some(chunk)) = partition.next_chunk(10) {
961 total += chunk.len();
962 }
963 assert_eq!(total, 30);
964 }
965
966 #[test]
967 fn test_parallel_node_scan_source() {
968 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
969
970 for i in 0..100 {
972 if i % 2 == 0 {
973 store.create_node(&["Person", "Employee"]);
974 } else {
975 store.create_node(&["Person"]);
976 }
977 }
978
979 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
981 assert_eq!(source.total_rows(), Some(100));
982 assert!(source.is_partitionable());
983 assert_eq!(source.num_columns(), 1);
984
985 let source_person = ParallelNodeScanSource::with_label(
987 Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
988 "Person",
989 );
990 assert_eq!(source_person.total_rows(), Some(100));
991
992 let source_employee = ParallelNodeScanSource::with_label(
993 Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
994 "Employee",
995 );
996 assert_eq!(source_employee.total_rows(), Some(50));
997 }
998
999 #[test]
1000 fn test_parallel_node_scan_partition() {
1001 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1002
1003 for _ in 0..100 {
1005 store.create_node(&[]);
1006 }
1007
1008 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
1009
1010 let morsel = Morsel::new(0, 0, 20, 50);
1012 let mut partition = source.create_partition(&morsel);
1013
1014 let mut total = 0;
1016 while let Ok(Some(chunk)) = partition.next_chunk(10) {
1017 total += chunk.len();
1018 }
1019 assert_eq!(total, 30);
1020 }
1021
1022 #[test]
1023 fn test_parallel_node_scan_morsels() {
1024 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1025
1026 for _ in 0..1000 {
1028 store.create_node(&[]);
1029 }
1030
1031 let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
1032
1033 let morsels = source.generate_morsels(256, 0);
1035 assert_eq!(morsels.len(), 4); let mut total_rows = 0;
1039 for morsel in &morsels {
1040 total_rows += morsel.end_row - morsel.start_row;
1041 }
1042 assert_eq!(total_rows, 1000);
1043 }
1044}