1use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14pub trait ParallelSource: Source + Send + Sync {
21 fn total_rows(&self) -> Option<usize>;
25
26 fn is_partitionable(&self) -> bool {
30 self.total_rows().is_some()
31 }
32
33 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39 fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43 match self.total_rows() {
44 Some(total) => generate_morsels(total, morsel_size, source_id),
45 None => Vec::new(),
46 }
47 }
48
49 fn num_columns(&self) -> usize;
51}
52
53pub struct ParallelVectorSource {
57 columns: Arc<Vec<Vec<Value>>>,
59 position: usize,
61}
62
63impl ParallelVectorSource {
64 #[must_use]
66 pub fn new(columns: Vec<Vec<Value>>) -> Self {
67 Self {
68 columns: Arc::new(columns),
69 position: 0,
70 }
71 }
72
73 #[must_use]
75 pub fn single_column(values: Vec<Value>) -> Self {
76 Self::new(vec![values])
77 }
78}
79
80impl Source for ParallelVectorSource {
81 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82 if self.columns.is_empty() || self.columns[0].is_empty() {
83 return Ok(None);
84 }
85
86 let total_rows = self.columns[0].len();
87 if self.position >= total_rows {
88 return Ok(None);
89 }
90
91 let end = (self.position + chunk_size).min(total_rows);
92 let mut vectors = Vec::with_capacity(self.columns.len());
93
94 for col_values in self.columns.iter() {
95 let slice = &col_values[self.position..end];
96 vectors.push(ValueVector::from_values(slice));
97 }
98
99 self.position = end;
100 Ok(Some(DataChunk::new(vectors)))
101 }
102
103 fn reset(&mut self) {
104 self.position = 0;
105 }
106
107 fn name(&self) -> &'static str {
108 "ParallelVectorSource"
109 }
110}
111
112impl ParallelSource for ParallelVectorSource {
113 fn total_rows(&self) -> Option<usize> {
114 if self.columns.is_empty() {
115 Some(0)
116 } else {
117 Some(self.columns[0].len())
118 }
119 }
120
121 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122 Box::new(PartitionedVectorSource::new(
123 Arc::clone(&self.columns),
124 morsel.start_row,
125 morsel.end_row,
126 ))
127 }
128
129 fn num_columns(&self) -> usize {
130 self.columns.len()
131 }
132}
133
134struct PartitionedVectorSource {
138 columns: Arc<Vec<Vec<Value>>>,
139 start_row: usize,
140 end_row: usize,
141 position: usize,
142}
143
144impl PartitionedVectorSource {
145 fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146 Self {
147 columns,
148 start_row,
149 end_row,
150 position: start_row,
151 }
152 }
153}
154
155impl Source for PartitionedVectorSource {
156 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157 if self.columns.is_empty() || self.position >= self.end_row {
158 return Ok(None);
159 }
160
161 let end = (self.position + chunk_size).min(self.end_row);
162 let mut vectors = Vec::with_capacity(self.columns.len());
163
164 for col_values in self.columns.iter() {
165 let slice = &col_values[self.position..end];
166 vectors.push(ValueVector::from_values(slice));
167 }
168
169 self.position = end;
170 Ok(Some(DataChunk::new(vectors)))
171 }
172
173 fn reset(&mut self) {
174 self.position = self.start_row;
175 }
176
177 fn name(&self) -> &'static str {
178 "PartitionedVectorSource"
179 }
180}
181
182pub struct ParallelChunkSource {
186 chunks: Arc<Vec<DataChunk>>,
187 #[allow(dead_code)]
189 chunk_row_counts: Vec<usize>,
190 cumulative_rows: Vec<usize>,
192 total_rows: usize,
194 chunk_index: usize,
196 num_columns: usize,
198}
199
200impl ParallelChunkSource {
201 #[must_use]
203 pub fn new(chunks: Vec<DataChunk>) -> Self {
204 let chunk_row_counts: Vec<usize> = chunks.iter().map(DataChunk::len).collect();
205
206 let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
207 let mut sum = 0;
208 cumulative_rows.push(0);
209 for &count in &chunk_row_counts {
210 sum += count;
211 cumulative_rows.push(sum);
212 }
213
214 let num_columns = chunks.first().map(|c| c.num_columns()).unwrap_or(0);
215
216 Self {
217 chunks: Arc::new(chunks),
218 chunk_row_counts,
219 cumulative_rows,
220 total_rows: sum,
221 chunk_index: 0,
222 num_columns,
223 }
224 }
225}
226
227impl Source for ParallelChunkSource {
228 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
229 if self.chunk_index >= self.chunks.len() {
230 return Ok(None);
231 }
232
233 let chunk = self.chunks[self.chunk_index].clone();
234 self.chunk_index += 1;
235 Ok(Some(chunk))
236 }
237
238 fn reset(&mut self) {
239 self.chunk_index = 0;
240 }
241
242 fn name(&self) -> &'static str {
243 "ParallelChunkSource"
244 }
245}
246
247impl ParallelSource for ParallelChunkSource {
248 fn total_rows(&self) -> Option<usize> {
249 Some(self.total_rows)
250 }
251
252 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
253 Box::new(PartitionedChunkSource::new(
254 Arc::clone(&self.chunks),
255 self.cumulative_rows.clone(),
256 morsel.start_row,
257 morsel.end_row,
258 ))
259 }
260
261 fn num_columns(&self) -> usize {
262 self.num_columns
263 }
264}
265
266struct PartitionedChunkSource {
268 chunks: Arc<Vec<DataChunk>>,
269 cumulative_rows: Vec<usize>,
270 start_row: usize,
271 end_row: usize,
272 current_row: usize,
273}
274
275impl PartitionedChunkSource {
276 fn new(
277 chunks: Arc<Vec<DataChunk>>,
278 cumulative_rows: Vec<usize>,
279 start_row: usize,
280 end_row: usize,
281 ) -> Self {
282 Self {
283 chunks,
284 cumulative_rows,
285 start_row,
286 end_row,
287 current_row: start_row,
288 }
289 }
290
291 fn find_chunk_index(&self, row: usize) -> Option<usize> {
293 match self
295 .cumulative_rows
296 .binary_search_by(|&cumul| cumul.cmp(&row))
297 {
298 Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
299 Err(idx) => {
300 if idx == 0 {
301 Some(0)
302 } else {
303 Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
304 }
305 }
306 }
307 }
308}
309
310impl Source for PartitionedChunkSource {
311 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312 if self.current_row >= self.end_row || self.chunks.is_empty() {
313 return Ok(None);
314 }
315
316 let chunk_idx = match self.find_chunk_index(self.current_row) {
318 Some(idx) => idx,
319 None => return Ok(None),
320 };
321
322 if chunk_idx >= self.chunks.len() {
323 return Ok(None);
324 }
325
326 let chunk_start = self.cumulative_rows[chunk_idx];
327 let chunk = &self.chunks[chunk_idx];
328 let offset_in_chunk = self.current_row - chunk_start;
329
330 let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
332 let rows_to_end = self.end_row.saturating_sub(self.current_row);
333 let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
334
335 if rows_to_extract == 0 {
336 return Ok(None);
337 }
338
339 let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
341 self.current_row += rows_to_extract;
342
343 Ok(Some(sliced))
344 }
345
346 fn reset(&mut self) {
347 self.current_row = self.start_row;
348 }
349
350 fn name(&self) -> &'static str {
351 "PartitionedChunkSource"
352 }
353}
354
355pub struct RangeSource {
359 total: usize,
360 position: usize,
361}
362
363impl RangeSource {
364 #[must_use]
366 pub fn new(total: usize) -> Self {
367 Self { total, position: 0 }
368 }
369}
370
371impl Source for RangeSource {
372 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
373 if self.position >= self.total {
374 return Ok(None);
375 }
376
377 let end = (self.position + chunk_size).min(self.total);
378 let values: Vec<Value> = (self.position..end)
379 .map(|i| Value::Int64(i as i64))
380 .collect();
381
382 self.position = end;
383 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
384 &values,
385 )])))
386 }
387
388 fn reset(&mut self) {
389 self.position = 0;
390 }
391
392 fn name(&self) -> &'static str {
393 "RangeSource"
394 }
395}
396
397impl ParallelSource for RangeSource {
398 fn total_rows(&self) -> Option<usize> {
399 Some(self.total)
400 }
401
402 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
403 Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
404 }
405
406 fn num_columns(&self) -> usize {
407 1
408 }
409}
410
411struct RangePartition {
413 start: usize,
414 end: usize,
415 position: usize,
416}
417
418impl RangePartition {
419 fn new(start: usize, end: usize) -> Self {
420 Self {
421 start,
422 end,
423 position: start,
424 }
425 }
426}
427
428impl Source for RangePartition {
429 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
430 if self.position >= self.end {
431 return Ok(None);
432 }
433
434 let end = (self.position + chunk_size).min(self.end);
435 let values: Vec<Value> = (self.position..end)
436 .map(|i| Value::Int64(i as i64))
437 .collect();
438
439 self.position = end;
440 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
441 &values,
442 )])))
443 }
444
445 fn reset(&mut self) {
446 self.position = self.start;
447 }
448
449 fn name(&self) -> &'static str {
450 "RangePartition"
451 }
452}
453
454#[cfg(feature = "rdf")]
459pub struct ParallelTripleScanSource {
460 triples: Arc<Vec<(Value, Value, Value)>>,
462 position: usize,
464 output_vars: Vec<String>,
466}
467
468#[cfg(feature = "rdf")]
469impl ParallelTripleScanSource {
470 #[must_use]
472 pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
473 Self {
474 triples: Arc::new(triples),
475 position: 0,
476 output_vars,
477 }
478 }
479
480 pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
482 where
483 I: IntoIterator<Item = (Value, Value, Value)>,
484 {
485 Self::new(iter.into_iter().collect(), output_vars)
486 }
487}
488
489#[cfg(feature = "rdf")]
490impl Source for ParallelTripleScanSource {
491 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
492 if self.position >= self.triples.len() {
493 return Ok(None);
494 }
495
496 let end = (self.position + chunk_size).min(self.triples.len());
497 let slice = &self.triples[self.position..end];
498
499 let mut subjects = Vec::with_capacity(slice.len());
500 let mut predicates = Vec::with_capacity(slice.len());
501 let mut objects = Vec::with_capacity(slice.len());
502
503 for (s, p, o) in slice {
504 subjects.push(s.clone());
505 predicates.push(p.clone());
506 objects.push(o.clone());
507 }
508
509 let columns = vec![
510 ValueVector::from_values(&subjects),
511 ValueVector::from_values(&predicates),
512 ValueVector::from_values(&objects),
513 ];
514
515 self.position = end;
516 Ok(Some(DataChunk::new(columns)))
517 }
518
519 fn reset(&mut self) {
520 self.position = 0;
521 }
522
523 fn name(&self) -> &'static str {
524 "ParallelTripleScanSource"
525 }
526}
527
528#[cfg(feature = "rdf")]
529impl ParallelSource for ParallelTripleScanSource {
530 fn total_rows(&self) -> Option<usize> {
531 Some(self.triples.len())
532 }
533
534 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
535 Box::new(PartitionedTripleScanSource::new(
536 Arc::clone(&self.triples),
537 self.output_vars.clone(),
538 morsel.start_row,
539 morsel.end_row,
540 ))
541 }
542
543 fn num_columns(&self) -> usize {
544 3 }
546}
547
548#[cfg(feature = "rdf")]
550struct PartitionedTripleScanSource {
551 triples: Arc<Vec<(Value, Value, Value)>>,
552 #[allow(dead_code)]
553 output_vars: Vec<String>,
554 start_row: usize,
555 end_row: usize,
556 position: usize,
557}
558
559#[cfg(feature = "rdf")]
560impl PartitionedTripleScanSource {
561 fn new(
562 triples: Arc<Vec<(Value, Value, Value)>>,
563 output_vars: Vec<String>,
564 start_row: usize,
565 end_row: usize,
566 ) -> Self {
567 Self {
568 triples,
569 output_vars,
570 start_row,
571 end_row,
572 position: start_row,
573 }
574 }
575}
576
577#[cfg(feature = "rdf")]
578impl Source for PartitionedTripleScanSource {
579 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
580 if self.position >= self.end_row || self.position >= self.triples.len() {
581 return Ok(None);
582 }
583
584 let end = (self.position + chunk_size)
585 .min(self.end_row)
586 .min(self.triples.len());
587 let slice = &self.triples[self.position..end];
588
589 let mut subjects = Vec::with_capacity(slice.len());
590 let mut predicates = Vec::with_capacity(slice.len());
591 let mut objects = Vec::with_capacity(slice.len());
592
593 for (s, p, o) in slice {
594 subjects.push(s.clone());
595 predicates.push(p.clone());
596 objects.push(o.clone());
597 }
598
599 let columns = vec![
600 ValueVector::from_values(&subjects),
601 ValueVector::from_values(&predicates),
602 ValueVector::from_values(&objects),
603 ];
604
605 self.position = end;
606 Ok(Some(DataChunk::new(columns)))
607 }
608
609 fn reset(&mut self) {
610 self.position = self.start_row;
611 }
612
613 fn name(&self) -> &'static str {
614 "PartitionedTripleScanSource"
615 }
616}
617
618use crate::graph::lpg::LpgStore;
623use grafeo_common::types::NodeId;
624
625pub struct ParallelNodeScanSource {
645 store: Arc<LpgStore>,
647 node_ids: Arc<Vec<NodeId>>,
649 position: usize,
651}
652
653impl ParallelNodeScanSource {
654 #[must_use]
656 pub fn new(store: Arc<LpgStore>) -> Self {
657 let node_ids = Arc::new(store.node_ids());
658 Self {
659 store,
660 node_ids,
661 position: 0,
662 }
663 }
664
665 #[must_use]
667 pub fn with_label(store: Arc<LpgStore>, label: &str) -> Self {
668 let node_ids = Arc::new(store.nodes_by_label(label));
669 Self {
670 store,
671 node_ids,
672 position: 0,
673 }
674 }
675
676 #[must_use]
680 pub fn from_node_ids(store: Arc<LpgStore>, node_ids: Vec<NodeId>) -> Self {
681 Self {
682 store,
683 node_ids: Arc::new(node_ids),
684 position: 0,
685 }
686 }
687
688 #[must_use]
690 pub fn store(&self) -> &Arc<LpgStore> {
691 &self.store
692 }
693}
694
695impl Source for ParallelNodeScanSource {
696 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
697 if self.position >= self.node_ids.len() {
698 return Ok(None);
699 }
700
701 let end = (self.position + chunk_size).min(self.node_ids.len());
702 let slice = &self.node_ids[self.position..end];
703
704 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
706 for &id in slice {
707 vector.push_node_id(id);
708 }
709
710 self.position = end;
711 Ok(Some(DataChunk::new(vec![vector])))
712 }
713
714 fn reset(&mut self) {
715 self.position = 0;
716 }
717
718 fn name(&self) -> &'static str {
719 "ParallelNodeScanSource"
720 }
721}
722
723impl ParallelSource for ParallelNodeScanSource {
724 fn total_rows(&self) -> Option<usize> {
725 Some(self.node_ids.len())
726 }
727
728 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
729 Box::new(PartitionedNodeScanSource::new(
730 Arc::clone(&self.node_ids),
731 morsel.start_row,
732 morsel.end_row,
733 ))
734 }
735
736 fn num_columns(&self) -> usize {
737 1 }
739}
740
741struct PartitionedNodeScanSource {
743 node_ids: Arc<Vec<NodeId>>,
744 start_row: usize,
745 end_row: usize,
746 position: usize,
747}
748
749impl PartitionedNodeScanSource {
750 fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
751 Self {
752 node_ids,
753 start_row,
754 end_row,
755 position: start_row,
756 }
757 }
758}
759
760impl Source for PartitionedNodeScanSource {
761 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
762 if self.position >= self.end_row || self.position >= self.node_ids.len() {
763 return Ok(None);
764 }
765
766 let end = (self.position + chunk_size)
767 .min(self.end_row)
768 .min(self.node_ids.len());
769 let slice = &self.node_ids[self.position..end];
770
771 let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
773 for &id in slice {
774 vector.push_node_id(id);
775 }
776
777 self.position = end;
778 Ok(Some(DataChunk::new(vec![vector])))
779 }
780
781 fn reset(&mut self) {
782 self.position = self.start_row;
783 }
784
785 fn name(&self) -> &'static str {
786 "PartitionedNodeScanSource"
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793
794 #[test]
795 fn test_parallel_vector_source() {
796 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
797 let source = ParallelVectorSource::single_column(values);
798
799 assert_eq!(source.total_rows(), Some(100));
800 assert!(source.is_partitionable());
801 assert_eq!(source.num_columns(), 1);
802
803 let morsels = source.generate_morsels(30, 0);
804 assert_eq!(morsels.len(), 4); }
806
807 #[test]
808 fn test_parallel_vector_source_partition() {
809 let values: Vec<Value> = (0..100).map(Value::Int64).collect();
810 let source = ParallelVectorSource::single_column(values);
811
812 let morsel = Morsel::new(0, 0, 20, 50);
813 let mut partition = source.create_partition(&morsel);
814
815 let mut total = 0;
817 while let Ok(Some(chunk)) = partition.next_chunk(10) {
818 total += chunk.len();
819 }
820 assert_eq!(total, 30);
821 }
822
823 #[test]
824 fn test_range_source() {
825 let source = RangeSource::new(100);
826
827 assert_eq!(source.total_rows(), Some(100));
828 assert!(source.is_partitionable());
829
830 let morsels = source.generate_morsels(25, 0);
831 assert_eq!(morsels.len(), 4);
832 }
833
834 #[test]
835 fn test_range_source_partition() {
836 let source = RangeSource::new(100);
837
838 let morsel = Morsel::new(0, 0, 10, 30);
839 let mut partition = source.create_partition(&morsel);
840
841 let chunk = partition.next_chunk(100).unwrap().unwrap();
842 assert_eq!(chunk.len(), 20);
843
844 let col = chunk.column(0).unwrap();
846 assert_eq!(col.get(0), Some(Value::Int64(10)));
847 assert_eq!(col.get(19), Some(Value::Int64(29)));
848 }
849
850 #[test]
851 fn test_parallel_chunk_source() {
852 let chunks: Vec<DataChunk> = (0..5)
853 .map(|i| {
854 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
855 DataChunk::new(vec![ValueVector::from_values(&values)])
856 })
857 .collect();
858
859 let source = ParallelChunkSource::new(chunks);
860 assert_eq!(source.total_rows(), Some(50));
861 assert_eq!(source.num_columns(), 1);
862 }
863
864 #[test]
865 fn test_parallel_chunk_source_partition() {
866 let chunks: Vec<DataChunk> = (0..5)
867 .map(|i| {
868 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
869 DataChunk::new(vec![ValueVector::from_values(&values)])
870 })
871 .collect();
872
873 let source = ParallelChunkSource::new(chunks);
874
875 let morsel = Morsel::new(0, 0, 15, 35);
877 let mut partition = source.create_partition(&morsel);
878
879 let mut total = 0;
880 let mut first_value: Option<i64> = None;
881 let mut last_value: Option<i64> = None;
882
883 while let Ok(Some(chunk)) = partition.next_chunk(10) {
884 if first_value.is_none() {
885 if let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0)) {
886 first_value = Some(v);
887 }
888 }
889 if let Some(Value::Int64(v)) = chunk
890 .column(0)
891 .and_then(|c| c.get(chunk.len().saturating_sub(1)))
892 {
893 last_value = Some(v);
894 }
895 total += chunk.len();
896 }
897
898 assert_eq!(total, 20);
899 assert_eq!(first_value, Some(15));
900 assert_eq!(last_value, Some(34));
901 }
902
903 #[test]
904 fn test_partitioned_source_reset() {
905 let source = RangeSource::new(100);
906 let morsel = Morsel::new(0, 0, 0, 50);
907 let mut partition = source.create_partition(&morsel);
908
909 while partition.next_chunk(100).unwrap().is_some() {}
911
912 partition.reset();
914 let chunk = partition.next_chunk(100).unwrap().unwrap();
915 assert_eq!(chunk.len(), 50);
916 }
917
918 #[cfg(feature = "rdf")]
919 #[test]
920 fn test_parallel_triple_scan_source() {
921 let triples = vec![
922 (
923 Value::String("s1".into()),
924 Value::String("p1".into()),
925 Value::String("o1".into()),
926 ),
927 (
928 Value::String("s2".into()),
929 Value::String("p2".into()),
930 Value::String("o2".into()),
931 ),
932 (
933 Value::String("s3".into()),
934 Value::String("p3".into()),
935 Value::String("o3".into()),
936 ),
937 ];
938 let source =
939 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
940
941 assert_eq!(source.total_rows(), Some(3));
942 assert!(source.is_partitionable());
943 assert_eq!(source.num_columns(), 3);
944 }
945
946 #[cfg(feature = "rdf")]
947 #[test]
948 fn test_parallel_triple_scan_partition() {
949 let triples: Vec<(Value, Value, Value)> = (0..100)
950 .map(|i| {
951 (
952 Value::String(format!("s{}", i).into()),
953 Value::String(format!("p{}", i).into()),
954 Value::String(format!("o{}", i).into()),
955 )
956 })
957 .collect();
958 let source =
959 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
960
961 let morsel = Morsel::new(0, 0, 20, 50);
962 let mut partition = source.create_partition(&morsel);
963
964 let mut total = 0;
965 while let Ok(Some(chunk)) = partition.next_chunk(10) {
966 total += chunk.len();
967 }
968 assert_eq!(total, 30);
969 }
970
971 #[test]
972 fn test_parallel_node_scan_source() {
973 let store = Arc::new(LpgStore::new());
974
975 for i in 0..100 {
977 if i % 2 == 0 {
978 store.create_node(&["Person", "Employee"]);
979 } else {
980 store.create_node(&["Person"]);
981 }
982 }
983
984 let source = ParallelNodeScanSource::new(Arc::clone(&store));
986 assert_eq!(source.total_rows(), Some(100));
987 assert!(source.is_partitionable());
988 assert_eq!(source.num_columns(), 1);
989
990 let source_person = ParallelNodeScanSource::with_label(Arc::clone(&store), "Person");
992 assert_eq!(source_person.total_rows(), Some(100));
993
994 let source_employee = ParallelNodeScanSource::with_label(Arc::clone(&store), "Employee");
995 assert_eq!(source_employee.total_rows(), Some(50));
996 }
997
998 #[test]
999 fn test_parallel_node_scan_partition() {
1000 let store = Arc::new(LpgStore::new());
1001
1002 for _ in 0..100 {
1004 store.create_node(&[]);
1005 }
1006
1007 let source = ParallelNodeScanSource::new(Arc::clone(&store));
1008
1009 let morsel = Morsel::new(0, 0, 20, 50);
1011 let mut partition = source.create_partition(&morsel);
1012
1013 let mut total = 0;
1015 while let Ok(Some(chunk)) = partition.next_chunk(10) {
1016 total += chunk.len();
1017 }
1018 assert_eq!(total, 30);
1019 }
1020
1021 #[test]
1022 fn test_parallel_node_scan_morsels() {
1023 let store = Arc::new(LpgStore::new());
1024
1025 for _ in 0..1000 {
1027 store.create_node(&[]);
1028 }
1029
1030 let source = ParallelNodeScanSource::new(Arc::clone(&store));
1031
1032 let morsels = source.generate_morsels(256, 0);
1034 assert_eq!(morsels.len(), 4); let mut total_rows = 0;
1038 for morsel in &morsels {
1039 total_rows += morsel.end_row - morsel.start_row;
1040 }
1041 assert_eq!(total_rows, 1000);
1042 }
1043}