1use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14pub trait ParallelSource: Source + Send + Sync {
21 fn total_rows(&self) -> Option<usize>;
25
26 fn is_partitionable(&self) -> bool {
30 self.total_rows().is_some()
31 }
32
33 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39 fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43 match self.total_rows() {
44 Some(total) => generate_morsels(total, morsel_size, source_id),
45 None => Vec::new(),
46 }
47 }
48
49 fn num_columns(&self) -> usize;
51}
52
53pub struct ParallelVectorSource {
57 columns: Arc<Vec<Vec<Value>>>,
59 position: usize,
61}
62
63impl ParallelVectorSource {
64 #[must_use]
66 pub fn new(columns: Vec<Vec<Value>>) -> Self {
67 Self {
68 columns: Arc::new(columns),
69 position: 0,
70 }
71 }
72
73 #[must_use]
75 pub fn single_column(values: Vec<Value>) -> Self {
76 Self::new(vec![values])
77 }
78}
79
80impl Source for ParallelVectorSource {
81 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82 if self.columns.is_empty() || self.columns[0].is_empty() {
83 return Ok(None);
84 }
85
86 let total_rows = self.columns[0].len();
87 if self.position >= total_rows {
88 return Ok(None);
89 }
90
91 let end = (self.position + chunk_size).min(total_rows);
92 let mut vectors = Vec::with_capacity(self.columns.len());
93
94 for col_values in self.columns.iter() {
95 let slice = &col_values[self.position..end];
96 vectors.push(ValueVector::from_values(slice));
97 }
98
99 self.position = end;
100 Ok(Some(DataChunk::new(vectors)))
101 }
102
103 fn reset(&mut self) {
104 self.position = 0;
105 }
106
107 fn name(&self) -> &'static str {
108 "ParallelVectorSource"
109 }
110}
111
112impl ParallelSource for ParallelVectorSource {
113 fn total_rows(&self) -> Option<usize> {
114 if self.columns.is_empty() {
115 Some(0)
116 } else {
117 Some(self.columns[0].len())
118 }
119 }
120
121 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122 Box::new(PartitionedVectorSource::new(
123 Arc::clone(&self.columns),
124 morsel.start_row,
125 morsel.end_row,
126 ))
127 }
128
129 fn num_columns(&self) -> usize {
130 self.columns.len()
131 }
132}
133
134struct PartitionedVectorSource {
138 columns: Arc<Vec<Vec<Value>>>,
139 start_row: usize,
140 end_row: usize,
141 position: usize,
142}
143
144impl PartitionedVectorSource {
145 fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146 Self {
147 columns,
148 start_row,
149 end_row,
150 position: start_row,
151 }
152 }
153}
154
155impl Source for PartitionedVectorSource {
156 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157 if self.columns.is_empty() || self.position >= self.end_row {
158 return Ok(None);
159 }
160
161 let end = (self.position + chunk_size).min(self.end_row);
162 let mut vectors = Vec::with_capacity(self.columns.len());
163
164 for col_values in self.columns.iter() {
165 let slice = &col_values[self.position..end];
166 vectors.push(ValueVector::from_values(slice));
167 }
168
169 self.position = end;
170 Ok(Some(DataChunk::new(vectors)))
171 }
172
173 fn reset(&mut self) {
174 self.position = self.start_row;
175 }
176
177 fn name(&self) -> &'static str {
178 "PartitionedVectorSource"
179 }
180}
181
182pub struct ParallelChunkSource {
186 chunks: Arc<Vec<DataChunk>>,
187 #[allow(dead_code)]
189 chunk_row_counts: Vec<usize>,
190 cumulative_rows: Vec<usize>,
192 total_rows: usize,
194 chunk_index: usize,
196 num_columns: usize,
198}
199
200impl ParallelChunkSource {
201 #[must_use]
203 pub fn new(chunks: Vec<DataChunk>) -> Self {
204 let chunk_row_counts: Vec<usize> = chunks.iter().map(DataChunk::len).collect();
205
206 let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
207 let mut sum = 0;
208 cumulative_rows.push(0);
209 for &count in &chunk_row_counts {
210 sum += count;
211 cumulative_rows.push(sum);
212 }
213
214 let num_columns = chunks.first().map(|c| c.num_columns()).unwrap_or(0);
215
216 Self {
217 chunks: Arc::new(chunks),
218 chunk_row_counts,
219 cumulative_rows,
220 total_rows: sum,
221 chunk_index: 0,
222 num_columns,
223 }
224 }
225}
226
227impl Source for ParallelChunkSource {
228 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
229 if self.chunk_index >= self.chunks.len() {
230 return Ok(None);
231 }
232
233 let chunk = self.chunks[self.chunk_index].clone();
234 self.chunk_index += 1;
235 Ok(Some(chunk))
236 }
237
238 fn reset(&mut self) {
239 self.chunk_index = 0;
240 }
241
242 fn name(&self) -> &'static str {
243 "ParallelChunkSource"
244 }
245}
246
247impl ParallelSource for ParallelChunkSource {
248 fn total_rows(&self) -> Option<usize> {
249 Some(self.total_rows)
250 }
251
252 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
253 Box::new(PartitionedChunkSource::new(
254 Arc::clone(&self.chunks),
255 self.cumulative_rows.clone(),
256 morsel.start_row,
257 morsel.end_row,
258 ))
259 }
260
261 fn num_columns(&self) -> usize {
262 self.num_columns
263 }
264}
265
266struct PartitionedChunkSource {
268 chunks: Arc<Vec<DataChunk>>,
269 cumulative_rows: Vec<usize>,
270 start_row: usize,
271 end_row: usize,
272 current_row: usize,
273}
274
275impl PartitionedChunkSource {
276 fn new(
277 chunks: Arc<Vec<DataChunk>>,
278 cumulative_rows: Vec<usize>,
279 start_row: usize,
280 end_row: usize,
281 ) -> Self {
282 Self {
283 chunks,
284 cumulative_rows,
285 start_row,
286 end_row,
287 current_row: start_row,
288 }
289 }
290
291 fn find_chunk_index(&self, row: usize) -> Option<usize> {
293 match self
295 .cumulative_rows
296 .binary_search_by(|&cumul| cumul.cmp(&row))
297 {
298 Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
299 Err(idx) => {
300 if idx == 0 {
301 Some(0)
302 } else {
303 Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
304 }
305 }
306 }
307 }
308}
309
310impl Source for PartitionedChunkSource {
311 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312 if self.current_row >= self.end_row || self.chunks.is_empty() {
313 return Ok(None);
314 }
315
316 let chunk_idx = match self.find_chunk_index(self.current_row) {
318 Some(idx) => idx,
319 None => return Ok(None),
320 };
321
322 if chunk_idx >= self.chunks.len() {
323 return Ok(None);
324 }
325
326 let chunk_start = self.cumulative_rows[chunk_idx];
327 let chunk = &self.chunks[chunk_idx];
328 let offset_in_chunk = self.current_row - chunk_start;
329
330 let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
332 let rows_to_end = self.end_row.saturating_sub(self.current_row);
333 let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
334
335 if rows_to_extract == 0 {
336 return Ok(None);
337 }
338
339 let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
341 self.current_row += rows_to_extract;
342
343 Ok(Some(sliced))
344 }
345
346 fn reset(&mut self) {
347 self.current_row = self.start_row;
348 }
349
350 fn name(&self) -> &'static str {
351 "PartitionedChunkSource"
352 }
353}
354
355pub struct RangeSource {
359 total: usize,
360 position: usize,
361}
362
363impl RangeSource {
364 #[must_use]
366 pub fn new(total: usize) -> Self {
367 Self { total, position: 0 }
368 }
369}
370
371impl Source for RangeSource {
372 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
373 if self.position >= self.total {
374 return Ok(None);
375 }
376
377 let end = (self.position + chunk_size).min(self.total);
378 let values: Vec<Value> = (self.position..end)
379 .map(|i| Value::Int64(i as i64))
380 .collect();
381
382 self.position = end;
383 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
384 &values,
385 )])))
386 }
387
388 fn reset(&mut self) {
389 self.position = 0;
390 }
391
392 fn name(&self) -> &'static str {
393 "RangeSource"
394 }
395}
396
397impl ParallelSource for RangeSource {
398 fn total_rows(&self) -> Option<usize> {
399 Some(self.total)
400 }
401
402 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
403 Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
404 }
405
406 fn num_columns(&self) -> usize {
407 1
408 }
409}
410
411struct RangePartition {
413 start: usize,
414 end: usize,
415 position: usize,
416}
417
418impl RangePartition {
419 fn new(start: usize, end: usize) -> Self {
420 Self {
421 start,
422 end,
423 position: start,
424 }
425 }
426}
427
428impl Source for RangePartition {
429 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
430 if self.position >= self.end {
431 return Ok(None);
432 }
433
434 let end = (self.position + chunk_size).min(self.end);
435 let values: Vec<Value> = (self.position..end)
436 .map(|i| Value::Int64(i as i64))
437 .collect();
438
439 self.position = end;
440 Ok(Some(DataChunk::new(vec![ValueVector::from_values(
441 &values,
442 )])))
443 }
444
445 fn reset(&mut self) {
446 self.position = self.start;
447 }
448
449 fn name(&self) -> &'static str {
450 "RangePartition"
451 }
452}
453
454#[cfg(feature = "rdf")]
459pub struct ParallelTripleScanSource {
460 triples: Arc<Vec<(Value, Value, Value)>>,
462 position: usize,
464 output_vars: Vec<String>,
466}
467
468#[cfg(feature = "rdf")]
469impl ParallelTripleScanSource {
470 #[must_use]
472 pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
473 Self {
474 triples: Arc::new(triples),
475 position: 0,
476 output_vars,
477 }
478 }
479
480 pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
482 where
483 I: IntoIterator<Item = (Value, Value, Value)>,
484 {
485 Self::new(iter.into_iter().collect(), output_vars)
486 }
487}
488
489#[cfg(feature = "rdf")]
490impl Source for ParallelTripleScanSource {
491 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
492 if self.position >= self.triples.len() {
493 return Ok(None);
494 }
495
496 let end = (self.position + chunk_size).min(self.triples.len());
497 let slice = &self.triples[self.position..end];
498
499 let mut subjects = Vec::with_capacity(slice.len());
500 let mut predicates = Vec::with_capacity(slice.len());
501 let mut objects = Vec::with_capacity(slice.len());
502
503 for (s, p, o) in slice {
504 subjects.push(s.clone());
505 predicates.push(p.clone());
506 objects.push(o.clone());
507 }
508
509 let columns = vec![
510 ValueVector::from_values(&subjects),
511 ValueVector::from_values(&predicates),
512 ValueVector::from_values(&objects),
513 ];
514
515 self.position = end;
516 Ok(Some(DataChunk::new(columns)))
517 }
518
519 fn reset(&mut self) {
520 self.position = 0;
521 }
522
523 fn name(&self) -> &'static str {
524 "ParallelTripleScanSource"
525 }
526}
527
528#[cfg(feature = "rdf")]
529impl ParallelSource for ParallelTripleScanSource {
530 fn total_rows(&self) -> Option<usize> {
531 Some(self.triples.len())
532 }
533
534 fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
535 Box::new(PartitionedTripleScanSource::new(
536 Arc::clone(&self.triples),
537 self.output_vars.clone(),
538 morsel.start_row,
539 morsel.end_row,
540 ))
541 }
542
543 fn num_columns(&self) -> usize {
544 3 }
546}
547
548#[cfg(feature = "rdf")]
550struct PartitionedTripleScanSource {
551 triples: Arc<Vec<(Value, Value, Value)>>,
552 #[allow(dead_code)]
553 output_vars: Vec<String>,
554 start_row: usize,
555 end_row: usize,
556 position: usize,
557}
558
559#[cfg(feature = "rdf")]
560impl PartitionedTripleScanSource {
561 fn new(
562 triples: Arc<Vec<(Value, Value, Value)>>,
563 output_vars: Vec<String>,
564 start_row: usize,
565 end_row: usize,
566 ) -> Self {
567 Self {
568 triples,
569 output_vars,
570 start_row,
571 end_row,
572 position: start_row,
573 }
574 }
575}
576
577#[cfg(feature = "rdf")]
578impl Source for PartitionedTripleScanSource {
579 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
580 if self.position >= self.end_row || self.position >= self.triples.len() {
581 return Ok(None);
582 }
583
584 let end = (self.position + chunk_size)
585 .min(self.end_row)
586 .min(self.triples.len());
587 let slice = &self.triples[self.position..end];
588
589 let mut subjects = Vec::with_capacity(slice.len());
590 let mut predicates = Vec::with_capacity(slice.len());
591 let mut objects = Vec::with_capacity(slice.len());
592
593 for (s, p, o) in slice {
594 subjects.push(s.clone());
595 predicates.push(p.clone());
596 objects.push(o.clone());
597 }
598
599 let columns = vec![
600 ValueVector::from_values(&subjects),
601 ValueVector::from_values(&predicates),
602 ValueVector::from_values(&objects),
603 ];
604
605 self.position = end;
606 Ok(Some(DataChunk::new(columns)))
607 }
608
609 fn reset(&mut self) {
610 self.position = self.start_row;
611 }
612
613 fn name(&self) -> &'static str {
614 "PartitionedTripleScanSource"
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 #[test]
623 fn test_parallel_vector_source() {
624 let values: Vec<Value> = (0..100).map(|i| Value::Int64(i)).collect();
625 let source = ParallelVectorSource::single_column(values);
626
627 assert_eq!(source.total_rows(), Some(100));
628 assert!(source.is_partitionable());
629 assert_eq!(source.num_columns(), 1);
630
631 let morsels = source.generate_morsels(30, 0);
632 assert_eq!(morsels.len(), 4); }
634
635 #[test]
636 fn test_parallel_vector_source_partition() {
637 let values: Vec<Value> = (0..100).map(|i| Value::Int64(i)).collect();
638 let source = ParallelVectorSource::single_column(values);
639
640 let morsel = Morsel::new(0, 0, 20, 50);
641 let mut partition = source.create_partition(&morsel);
642
643 let mut total = 0;
645 while let Ok(Some(chunk)) = partition.next_chunk(10) {
646 total += chunk.len();
647 }
648 assert_eq!(total, 30);
649 }
650
651 #[test]
652 fn test_range_source() {
653 let source = RangeSource::new(100);
654
655 assert_eq!(source.total_rows(), Some(100));
656 assert!(source.is_partitionable());
657
658 let morsels = source.generate_morsels(25, 0);
659 assert_eq!(morsels.len(), 4);
660 }
661
662 #[test]
663 fn test_range_source_partition() {
664 let source = RangeSource::new(100);
665
666 let morsel = Morsel::new(0, 0, 10, 30);
667 let mut partition = source.create_partition(&morsel);
668
669 let chunk = partition.next_chunk(100).unwrap().unwrap();
670 assert_eq!(chunk.len(), 20);
671
672 let col = chunk.column(0).unwrap();
674 assert_eq!(col.get(0), Some(Value::Int64(10)));
675 assert_eq!(col.get(19), Some(Value::Int64(29)));
676 }
677
678 #[test]
679 fn test_parallel_chunk_source() {
680 let chunks: Vec<DataChunk> = (0..5)
681 .map(|i| {
682 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(|j| Value::Int64(j)).collect();
683 DataChunk::new(vec![ValueVector::from_values(&values)])
684 })
685 .collect();
686
687 let source = ParallelChunkSource::new(chunks);
688 assert_eq!(source.total_rows(), Some(50));
689 assert_eq!(source.num_columns(), 1);
690 }
691
692 #[test]
693 fn test_parallel_chunk_source_partition() {
694 let chunks: Vec<DataChunk> = (0..5)
695 .map(|i| {
696 let values: Vec<Value> = (i * 10..(i + 1) * 10).map(|j| Value::Int64(j)).collect();
697 DataChunk::new(vec![ValueVector::from_values(&values)])
698 })
699 .collect();
700
701 let source = ParallelChunkSource::new(chunks);
702
703 let morsel = Morsel::new(0, 0, 15, 35);
705 let mut partition = source.create_partition(&morsel);
706
707 let mut total = 0;
708 let mut first_value: Option<i64> = None;
709 let mut last_value: Option<i64> = None;
710
711 while let Ok(Some(chunk)) = partition.next_chunk(10) {
712 if first_value.is_none() {
713 if let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0)) {
714 first_value = Some(v);
715 }
716 }
717 if let Some(Value::Int64(v)) = chunk
718 .column(0)
719 .and_then(|c| c.get(chunk.len().saturating_sub(1)))
720 {
721 last_value = Some(v);
722 }
723 total += chunk.len();
724 }
725
726 assert_eq!(total, 20);
727 assert_eq!(first_value, Some(15));
728 assert_eq!(last_value, Some(34));
729 }
730
731 #[test]
732 fn test_partitioned_source_reset() {
733 let source = RangeSource::new(100);
734 let morsel = Morsel::new(0, 0, 0, 50);
735 let mut partition = source.create_partition(&morsel);
736
737 while partition.next_chunk(100).unwrap().is_some() {}
739
740 partition.reset();
742 let chunk = partition.next_chunk(100).unwrap().unwrap();
743 assert_eq!(chunk.len(), 50);
744 }
745
746 #[cfg(feature = "rdf")]
747 #[test]
748 fn test_parallel_triple_scan_source() {
749 let triples = vec![
750 (
751 Value::String("s1".into()),
752 Value::String("p1".into()),
753 Value::String("o1".into()),
754 ),
755 (
756 Value::String("s2".into()),
757 Value::String("p2".into()),
758 Value::String("o2".into()),
759 ),
760 (
761 Value::String("s3".into()),
762 Value::String("p3".into()),
763 Value::String("o3".into()),
764 ),
765 ];
766 let source =
767 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
768
769 assert_eq!(source.total_rows(), Some(3));
770 assert!(source.is_partitionable());
771 assert_eq!(source.num_columns(), 3);
772 }
773
774 #[cfg(feature = "rdf")]
775 #[test]
776 fn test_parallel_triple_scan_partition() {
777 let triples: Vec<(Value, Value, Value)> = (0..100)
778 .map(|i| {
779 (
780 Value::String(format!("s{}", i).into()),
781 Value::String(format!("p{}", i).into()),
782 Value::String(format!("o{}", i).into()),
783 )
784 })
785 .collect();
786 let source =
787 ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
788
789 let morsel = Morsel::new(0, 0, 20, 50);
790 let mut partition = source.create_partition(&morsel);
791
792 let mut total = 0;
793 while let Ok(Some(chunk)) = partition.next_chunk(10) {
794 total += chunk.len();
795 }
796 assert_eq!(total, 30);
797 }
798}