1use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14pub trait ParallelSource: Source + Send + Sync {
21 fn total_rows(&self) -> Option<usize>;
25
26 fn is_partitionable(&self) -> bool {
30 self.total_rows().is_some()
31 }
32
33 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39 fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43 match self.total_rows() {
44 Some(total) => generate_morsels(total, morsel_size, source_id),
45 None => Vec::new(),
46 }
47 }
48
49 fn num_columns(&self) -> usize;
51}
52
53pub struct ParallelVectorSource {
57 columns: Arc<Vec<Vec<Value>>>,
59 position: usize,
61}
62
63impl ParallelVectorSource {
64 #[must_use]
66 pub fn new(columns: Vec<Vec<Value>>) -> Self {
67 Self {
68 columns: Arc::new(columns),
69 position: 0,
70 }
71 }
72
73 #[must_use]
75 pub fn single_column(values: Vec<Value>) -> Self {
76 Self::new(vec![values])
77 }
78}
79
80impl Source for ParallelVectorSource {
81 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82 if self.columns.is_empty() || self.columns[0].is_empty() {
83 return Ok(None);
84 }
85
86 let total_rows = self.columns[0].len();
87 if self.position >= total_rows {
88 return Ok(None);
89 }
90
91 let end = (self.position + chunk_size).min(total_rows);
92 let mut vectors = Vec::with_capacity(self.columns.len());
93
94 for col_values in self.columns.iter() {
95 let slice = &col_values[self.position..end];
96 vectors.push(ValueVector::from_values(slice));
97 }
98
99 self.position = end;
100 Ok(Some(DataChunk::new(vectors)))
101 }
102
103 fn reset(&mut self) {
104 self.position = 0;
105 }
106
107 fn name(&self) -> &'static str {
108 "ParallelVectorSource"
109 }
110}
111
112impl ParallelSource for ParallelVectorSource {
113 fn total_rows(&self) -> Option<usize> {
114 if self.columns.is_empty() {
115 Some(0)
116 } else {
117 Some(self.columns[0].len())
118 }
119 }
120
121 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122 Box::new(PartitionedVectorSource::new(
123 Arc::clone(&self.columns),
124 morsel.start_row,
125 morsel.end_row,
126 ))
127 }
128
129 fn num_columns(&self) -> usize {
130 self.columns.len()
131 }
132}
133
134struct PartitionedVectorSource {
138 columns: Arc<Vec<Vec<Value>>>,
139 start_row: usize,
140 end_row: usize,
141 position: usize,
142}
143
144impl PartitionedVectorSource {
145 fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146 Self {
147 columns,
148 start_row,
149 end_row,
150 position: start_row,
151 }
152 }
153}
154
155impl Source for PartitionedVectorSource {
156 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157 if self.columns.is_empty() || self.position >= self.end_row {
158 return Ok(None);
159 }
160
161 let end = (self.position + chunk_size).min(self.end_row);
162 let mut vectors = Vec::with_capacity(self.columns.len());
163
164 for col_values in self.columns.iter() {
165 let slice = &col_values[self.position..end];
166 vectors.push(ValueVector::from_values(slice));
167 }
168
169 self.position = end;
170 Ok(Some(DataChunk::new(vectors)))
171 }
172
173 fn reset(&mut self) {
174 self.position = self.start_row;
175 }
176
177 fn name(&self) -> &'static str {
178 "PartitionedVectorSource"
179 }
180}
181
182pub struct ParallelChunkSource {
186 chunks: Arc<Vec<DataChunk>>,
187 #[allow(dead_code)]
189 chunk_row_counts: Vec<usize>,
190 cumulative_rows: Vec<usize>,
192 total_rows: usize,
194 chunk_index: usize,
196 num_columns: usize,
198}
199
200impl ParallelChunkSource {
201 #[must_use]
203 pub fn new(chunks: Vec<DataChunk>) -> Self {
204 let chunk_row_counts: Vec<usize> = chunks.iter().map(DataChunk::len).collect();
205
206 let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
207 let mut sum = 0;
208 cumulative_rows.push(0);
209 for &count in &chunk_row_counts {
210 sum += count;
211 cumulative_rows.push(sum);
212 }
213
214 let num_columns = chunks.first().map_or(0, |c| c.num_columns());
215
216 Self {
217 chunks: Arc::new(chunks),
218 chunk_row_counts,
219 cumulative_rows,
220 total_rows: sum,
221 chunk_index: 0,
222 num_columns,
223 }
224 }
225}
226
227impl Source for ParallelChunkSource {
228 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
229 if self.chunk_index >= self.chunks.len() {
230 return Ok(None);
231 }
232
233 let chunk = self.chunks[self.chunk_index].clone();
234 self.chunk_index += 1;
235 Ok(Some(chunk))
236 }
237
238 fn reset(&mut self) {
239 self.chunk_index = 0;
240 }
241
242 fn name(&self) -> &'static str {
243 "ParallelChunkSource"
244 }
245}
246
247impl ParallelSource for ParallelChunkSource {
248 fn total_rows(&self) -> Option<usize> {
249 Some(self.total_rows)
250 }
251
252 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
253 Box::new(PartitionedChunkSource::new(
254 Arc::clone(&self.chunks),
255 self.cumulative_rows.clone(),
256 morsel.start_row,
257 morsel.end_row,
258 ))
259 }
260
261 fn num_columns(&self) -> usize {
262 self.num_columns
263 }
264}
265
266struct PartitionedChunkSource {
268 chunks: Arc<Vec<DataChunk>>,
269 cumulative_rows: Vec<usize>,
270 start_row: usize,
271 end_row: usize,
272 current_row: usize,
273}
274
275impl PartitionedChunkSource {
276 fn new(
277 chunks: Arc<Vec<DataChunk>>,
278 cumulative_rows: Vec<usize>,
279 start_row: usize,
280 end_row: usize,
281 ) -> Self {
282 Self {
283 chunks,
284 cumulative_rows,
285 start_row,
286 end_row,
287 current_row: start_row,
288 }
289 }
290
291 fn find_chunk_index(&self, row: usize) -> Option<usize> {
293 match self
295 .cumulative_rows
296 .binary_search_by(|&cumul| cumul.cmp(&row))
297 {
298 Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
299 Err(idx) => {
300 if idx == 0 {
301 Some(0)
302 } else {
303 Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
304 }
305 }
306 }
307 }
308}
309
310impl Source for PartitionedChunkSource {
311 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312 if self.current_row >= self.end_row || self.chunks.is_empty() {
313 return Ok(None);
314 }
315
316 let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
318 return Ok(None);
319 };
320
321 if chunk_idx >= self.chunks.len() {
322 return Ok(None);
323 }
324
325 let chunk_start = self.cumulative_rows[chunk_idx];
326 let chunk = &self.chunks[chunk_idx];
327 let offset_in_chunk = self.current_row - chunk_start;
328
329 let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
331 let rows_to_end = self.end_row.saturating_sub(self.current_row);
332 let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
333
334 if rows_to_extract == 0 {
335 return Ok(None);
336 }
337
338 let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
340 self.current_row += rows_to_extract;
341
342 Ok(Some(sliced))
343 }
344
345 fn reset(&mut self) {
346 self.current_row = self.start_row;
347 }
348
349 fn name(&self) -> &'static str {
350 "PartitionedChunkSource"
351 }
352}
353
354pub struct RangeSource {
358 total: usize,
359 position: usize,
360}
361
362impl RangeSource {
363 #[must_use]
365 pub fn new(total: usize) -> Self {
366 Self { total, position: 0 }
367 }
368}
369
370impl Source for RangeSource {
371 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
372 if self.position >= self.total {
373 return Ok(None);
374 }
375
376 let end = (self.position + chunk_size).min(self.total);
377 let values: Vec<Value> = (self.position..end)
378 .map(|i| Value::Int64(i as i64))
379 .collect();
380
381 self.position = end;
382 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
383 &values,
384 )])))
385 }
386
387 fn reset(&mut self) {
388 self.position = 0;
389 }
390
391 fn name(&self) -> &'static str {
392 "RangeSource"
393 }
394}
395
396impl ParallelSource for RangeSource {
397 fn total_rows(&self) -> Option<usize> {
398 Some(self.total)
399 }
400
401 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
402 Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
403 }
404
405 fn num_columns(&self) -> usize {
406 1
407 }
408}
409
410struct RangePartition {
412 start: usize,
413 end: usize,
414 position: usize,
415}
416
417impl RangePartition {
418 fn new(start: usize, end: usize) -> Self {
419 Self {
420 start,
421 end,
422 position: start,
423 }
424 }
425}
426
427impl Source for RangePartition {
428 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
429 if self.position >= self.end {
430 return Ok(None);
431 }
432
433 let end = (self.position + chunk_size).min(self.end);
434 let values: Vec<Value> = (self.position..end)
435 .map(|i| Value::Int64(i as i64))
436 .collect();
437
438 self.position = end;
439 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
440 &values,
441 )])))
442 }
443
444 fn reset(&mut self) {
445 self.position = self.start;
446 }
447
448 fn name(&self) -> &'static str {
449 "RangePartition"
450 }
451}
452
453#[cfg(feature = "rdf")]
458pub struct ParallelTripleScanSource {
459 triples: Arc<Vec<(Value, Value, Value)>>,
461 position: usize,
463 output_vars: Vec<String>,
465}
466
467#[cfg(feature = "rdf")]
468impl ParallelTripleScanSource {
469 #[must_use]
471 pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
472 Self {
473 triples: Arc::new(triples),
474 position: 0,
475 output_vars,
476 }
477 }
478
479 pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
481 where
482 I: IntoIterator<Item = (Value, Value, Value)>,
483 {
484 Self::new(iter.into_iter().collect(), output_vars)
485 }
486}
487
488#[cfg(feature = "rdf")]
489impl Source for ParallelTripleScanSource {
490 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
491 if self.position >= self.triples.len() {
492 return Ok(None);
493 }
494
495 let end = (self.position + chunk_size).min(self.triples.len());
496 let slice = &self.triples[self.position..end];
497
498 let mut subjects = Vec::with_capacity(slice.len());
499 let mut predicates = Vec::with_capacity(slice.len());
500 let mut objects = Vec::with_capacity(slice.len());
501
502 for (s, p, o) in slice {
503 subjects.push(s.clone());
504 predicates.push(p.clone());
505 objects.push(o.clone());
506 }
507
508 let columns = vec![
509 ValueVector::from_values(&subjects),
510 ValueVector::from_values(&predicates),
511 ValueVector::from_values(&objects),
512 ];
513
514 self.position = end;
515 Ok(Some(DataChunk::new(columns)))
516 }
517
518 fn reset(&mut self) {
519 self.position = 0;
520 }
521
522 fn name(&self) -> &'static str {
523 "ParallelTripleScanSource"
524 }
525}
526
527#[cfg(feature = "rdf")]
528impl ParallelSource for ParallelTripleScanSource {
529 fn total_rows(&self) -> Option<usize> {
530 Some(self.triples.len())
531 }
532
533 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
534 Box::new(PartitionedTripleScanSource::new(
535 Arc::clone(&self.triples),
536 self.output_vars.clone(),
537 morsel.start_row,
538 morsel.end_row,
539 ))
540 }
541
542 fn num_columns(&self) -> usize {
543 3 }
545}
546
547#[cfg(feature = "rdf")]
549struct PartitionedTripleScanSource {
550 triples: Arc<Vec<(Value, Value, Value)>>,
551 #[allow(dead_code)]
552 output_vars: Vec<String>,
553 start_row: usize,
554 end_row: usize,
555 position: usize,
556}
557
558#[cfg(feature = "rdf")]
559impl PartitionedTripleScanSource {
560 fn new(
561 triples: Arc<Vec<(Value, Value, Value)>>,
562 output_vars: Vec<String>,
563 start_row: usize,
564 end_row: usize,
565 ) -> Self {
566 Self {
567 triples,
568 output_vars,
569 start_row,
570 end_row,
571 position: start_row,
572 }
573 }
574}
575
576#[cfg(feature = "rdf")]
577impl Source for PartitionedTripleScanSource {
578 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
579 if self.position >= self.end_row || self.position >= self.triples.len() {
580 return Ok(None);
581 }
582
583 let end = (self.position + chunk_size)
584 .min(self.end_row)
585 .min(self.triples.len());
586 let slice = &self.triples[self.position..end];
587
588 let mut subjects = Vec::with_capacity(slice.len());
589 let mut predicates = Vec::with_capacity(slice.len());
590 let mut objects = Vec::with_capacity(slice.len());
591
592 for (s, p, o) in slice {
593 subjects.push(s.clone());
594 predicates.push(p.clone());
595 objects.push(o.clone());
596 }
597
598 let columns = vec![
599 ValueVector::from_values(&subjects),
600 ValueVector::from_values(&predicates),
601 ValueVector::from_values(&objects),
602 ];
603
604 self.position = end;
605 Ok(Some(DataChunk::new(columns)))
606 }
607
608 fn reset(&mut self) {
609 self.position = self.start_row;
610 }
611
612 fn name(&self) -> &'static str {
613 "PartitionedTripleScanSource"
614 }
615}
616
617use crate::graph::lpg::LpgStore;
622use grafeo_common::types::NodeId;
623
624pub struct ParallelNodeScanSource {
644 store: Arc<LpgStore>,
646 node_ids: Arc<Vec<NodeId>>,
648 position: usize,
650}
651
652impl ParallelNodeScanSource {
653 #[must_use]
655 pub fn new(store: Arc<LpgStore>) -> Self {
656 let node_ids = Arc::new(store.node_ids());
657 Self {
658 store,
659 node_ids,
660 position: 0,
661 }
662 }
663
664 #[must_use]
666 pub fn with_label(store: Arc<LpgStore>, label: &str) -> Self {
667 let node_ids = Arc::new(store.nodes_by_label(label));
668 Self {
669 store,
670 node_ids,
671 position: 0,
672 }
673 }
674
675 #[must_use]
679 pub fn from_node_ids(store: Arc<LpgStore>, node_ids: Vec<NodeId>) -> Self {
680 Self {
681 store,
682 node_ids: Arc::new(node_ids),
683 position: 0,
684 }
685 }
686
687 #[must_use]
689 pub fn store(&self) -> &Arc<LpgStore> {
690 &self.store
691 }
692}
693
694impl Source for ParallelNodeScanSource {
695 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
696 if self.position >= self.node_ids.len() {
697 return Ok(None);
698 }
699
700 let end = (self.position + chunk_size).min(self.node_ids.len());
701 let slice = &self.node_ids[self.position..end];
702
703 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
705 for &id in slice {
706 vector.push_node_id(id);
707 }
708
709 self.position = end;
710 Ok(Some(DataChunk::new(vec![vector])))
711 }
712
713 fn reset(&mut self) {
714 self.position = 0;
715 }
716
717 fn name(&self) -> &'static str {
718 "ParallelNodeScanSource"
719 }
720}
721
722impl ParallelSource for ParallelNodeScanSource {
723 fn total_rows(&self) -> Option<usize> {
724 Some(self.node_ids.len())
725 }
726
727 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
728 Box::new(PartitionedNodeScanSource::new(
729 Arc::clone(&self.node_ids),
730 morsel.start_row,
731 morsel.end_row,
732 ))
733 }
734
735 fn num_columns(&self) -> usize {
736 1 }
738}
739
740struct PartitionedNodeScanSource {
742 node_ids: Arc<Vec<NodeId>>,
743 start_row: usize,
744 end_row: usize,
745 position: usize,
746}
747
748impl PartitionedNodeScanSource {
749 fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
750 Self {
751 node_ids,
752 start_row,
753 end_row,
754 position: start_row,
755 }
756 }
757}
758
759impl Source for PartitionedNodeScanSource {
760 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
761 if self.position >= self.end_row || self.position >= self.node_ids.len() {
762 return Ok(None);
763 }
764
765 let end = (self.position + chunk_size)
766 .min(self.end_row)
767 .min(self.node_ids.len());
768 let slice = &self.node_ids[self.position..end];
769
770 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
772 for &id in slice {
773 vector.push_node_id(id);
774 }
775
776 self.position = end;
777 Ok(Some(DataChunk::new(vec![vector])))
778 }
779
780 fn reset(&mut self) {
781 self.position = self.start_row;
782 }
783
784 fn name(&self) -> &'static str {
785 "PartitionedNodeScanSource"
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792
793 #[test]
794 fn test_parallel_vector_source() {
795 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
796 let source = ParallelVectorSource::single_column(values);
797
798 assert_eq!(source.total_rows(), Some(100));
799 assert!(source.is_partitionable());
800 assert_eq!(source.num_columns(), 1);
801
802 let morsels = source.generate_morsels(30, 0);
803 assert_eq!(morsels.len(), 4); }
805
806 #[test]
807 fn test_parallel_vector_source_partition() {
808 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
809 let source = ParallelVectorSource::single_column(values);
810
811 let morsel = Morsel::new(0, 0, 20, 50);
812 let mut partition = source.create_partition(&morsel);
813
814 let mut total = 0;
816 while let Ok(Some(chunk)) = partition.next_chunk(10) {
817 total += chunk.len();
818 }
819 assert_eq!(total, 30);
820 }
821
822 #[test]
823 fn test_range_source() {
824 let source = RangeSource::new(100);
825
826 assert_eq!(source.total_rows(), Some(100));
827 assert!(source.is_partitionable());
828
829 let morsels = source.generate_morsels(25, 0);
830 assert_eq!(morsels.len(), 4);
831 }
832
833 #[test]
834 fn test_range_source_partition() {
835 let source = RangeSource::new(100);
836
837 let morsel = Morsel::new(0, 0, 10, 30);
838 let mut partition = source.create_partition(&morsel);
839
840 let chunk = partition.next_chunk(100).unwrap().unwrap();
841 assert_eq!(chunk.len(), 20);
842
843 let col = chunk.column(0).unwrap();
845 assert_eq!(col.get(0), Some(Value::Int64(10)));
846 assert_eq!(col.get(19), Some(Value::Int64(29)));
847 }
848
849 #[test]
850 fn test_parallel_chunk_source() {
851 let chunks: Vec<DataChunk> = (0..5)
852 .map(|i| {
853 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
854 DataChunk::new(vec![ValueVector::from_values(&values)])
855 })
856 .collect();
857
858 let source = ParallelChunkSource::new(chunks);
859 assert_eq!(source.total_rows(), Some(50));
860 assert_eq!(source.num_columns(), 1);
861 }
862
863 #[test]
864 fn test_parallel_chunk_source_partition() {
865 let chunks: Vec<DataChunk> = (0..5)
866 .map(|i| {
867 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
868 DataChunk::new(vec![ValueVector::from_values(&values)])
869 })
870 .collect();
871
872 let source = ParallelChunkSource::new(chunks);
873
874 let morsel = Morsel::new(0, 0, 15, 35);
876 let mut partition = source.create_partition(&morsel);
877
878 let mut total = 0;
879 let mut first_value: Option<i64> = None;
880 let mut last_value: Option<i64> = None;
881
882 while let Ok(Some(chunk)) = partition.next_chunk(10) {
883 if first_value.is_none()
884 && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
885 {
886 first_value = Some(v);
887 }
888 if let Some(Value::Int64(v)) = chunk
889 .column(0)
890 .and_then(|c| c.get(chunk.len().saturating_sub(1)))
891 {
892 last_value = Some(v);
893 }
894 total += chunk.len();
895 }
896
897 assert_eq!(total, 20);
898 assert_eq!(first_value, Some(15));
899 assert_eq!(last_value, Some(34));
900 }
901
902 #[test]
903 fn test_partitioned_source_reset() {
904 let source = RangeSource::new(100);
905 let morsel = Morsel::new(0, 0, 0, 50);
906 let mut partition = source.create_partition(&morsel);
907
908 while partition.next_chunk(100).unwrap().is_some() {}
910
911 partition.reset();
913 let chunk = partition.next_chunk(100).unwrap().unwrap();
914 assert_eq!(chunk.len(), 50);
915 }
916
917 #[cfg(feature = "rdf")]
918 #[test]
919 fn test_parallel_triple_scan_source() {
920 let triples = vec![
921 (
922 Value::String("s1".into()),
923 Value::String("p1".into()),
924 Value::String("o1".into()),
925 ),
926 (
927 Value::String("s2".into()),
928 Value::String("p2".into()),
929 Value::String("o2".into()),
930 ),
931 (
932 Value::String("s3".into()),
933 Value::String("p3".into()),
934 Value::String("o3".into()),
935 ),
936 ];
937 let source =
938 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
939
940 assert_eq!(source.total_rows(), Some(3));
941 assert!(source.is_partitionable());
942 assert_eq!(source.num_columns(), 3);
943 }
944
945 #[cfg(feature = "rdf")]
946 #[test]
947 fn test_parallel_triple_scan_partition() {
948 let triples: Vec<(Value, Value, Value)> = (0..100)
949 .map(|i| {
950 (
951 Value::String(format!("s{}", i).into()),
952 Value::String(format!("p{}", i).into()),
953 Value::String(format!("o{}", i).into()),
954 )
955 })
956 .collect();
957 let source =
958 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
959
960 let morsel = Morsel::new(0, 0, 20, 50);
961 let mut partition = source.create_partition(&morsel);
962
963 let mut total = 0;
964 while let Ok(Some(chunk)) = partition.next_chunk(10) {
965 total += chunk.len();
966 }
967 assert_eq!(total, 30);
968 }
969
970 #[test]
971 fn test_parallel_node_scan_source() {
972 let store = Arc::new(LpgStore::new());
973
974 for i in 0..100 {
976 if i % 2 == 0 {
977 store.create_node(&["Person", "Employee"]);
978 } else {
979 store.create_node(&["Person"]);
980 }
981 }
982
983 let source = ParallelNodeScanSource::new(Arc::clone(&store));
985 assert_eq!(source.total_rows(), Some(100));
986 assert!(source.is_partitionable());
987 assert_eq!(source.num_columns(), 1);
988
989 let source_person = ParallelNodeScanSource::with_label(Arc::clone(&store), "Person");
991 assert_eq!(source_person.total_rows(), Some(100));
992
993 let source_employee = ParallelNodeScanSource::with_label(Arc::clone(&store), "Employee");
994 assert_eq!(source_employee.total_rows(), Some(50));
995 }
996
997 #[test]
998 fn test_parallel_node_scan_partition() {
999 let store = Arc::new(LpgStore::new());
1000
1001 for _ in 0..100 {
1003 store.create_node(&[]);
1004 }
1005
1006 let source = ParallelNodeScanSource::new(Arc::clone(&store));
1007
1008 let morsel = Morsel::new(0, 0, 20, 50);
1010 let mut partition = source.create_partition(&morsel);
1011
1012 let mut total = 0;
1014 while let Ok(Some(chunk)) = partition.next_chunk(10) {
1015 total += chunk.len();
1016 }
1017 assert_eq!(total, 30);
1018 }
1019
1020 #[test]
1021 fn test_parallel_node_scan_morsels() {
1022 let store = Arc::new(LpgStore::new());
1023
1024 for _ in 0..1000 {
1026 store.create_node(&[]);
1027 }
1028
1029 let source = ParallelNodeScanSource::new(Arc::clone(&store));
1030
1031 let morsels = source.generate_morsels(256, 0);
1033 assert_eq!(morsels.len(), 4); let mut total_rows = 0;
1037 for morsel in &morsels {
1038 total_rows += morsel.end_row - morsel.start_row;
1039 }
1040 assert_eq!(total_rows, 1000);
1041 }
1042}