Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Row count in each chunk (cached for fast morsel generation).
188    #[allow(dead_code)]
189    chunk_row_counts: Vec<usize>,
190    /// Cumulative row count at each chunk start.
191    cumulative_rows: Vec<usize>,
192    /// Total row count.
193    total_rows: usize,
194    /// Current chunk index.
195    chunk_index: usize,
196    /// Number of columns.
197    num_columns: usize,
198}
199
200impl ParallelChunkSource {
201    /// Creates a new parallel chunk source.
202    #[must_use]
203    pub fn new(chunks: Vec<DataChunk>) -> Self {
204        let chunk_row_counts: Vec<usize> = chunks.iter().map(DataChunk::len).collect();
205
206        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
207        let mut sum = 0;
208        cumulative_rows.push(0);
209        for &count in &chunk_row_counts {
210            sum += count;
211            cumulative_rows.push(sum);
212        }
213
214        let num_columns = chunks.first().map(|c| c.num_columns()).unwrap_or(0);
215
216        Self {
217            chunks: Arc::new(chunks),
218            chunk_row_counts,
219            cumulative_rows,
220            total_rows: sum,
221            chunk_index: 0,
222            num_columns,
223        }
224    }
225}
226
227impl Source for ParallelChunkSource {
228    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
229        if self.chunk_index >= self.chunks.len() {
230            return Ok(None);
231        }
232
233        let chunk = self.chunks[self.chunk_index].clone();
234        self.chunk_index += 1;
235        Ok(Some(chunk))
236    }
237
238    fn reset(&mut self) {
239        self.chunk_index = 0;
240    }
241
242    fn name(&self) -> &'static str {
243        "ParallelChunkSource"
244    }
245}
246
247impl ParallelSource for ParallelChunkSource {
248    fn total_rows(&self) -> Option<usize> {
249        Some(self.total_rows)
250    }
251
252    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
253        Box::new(PartitionedChunkSource::new(
254            Arc::clone(&self.chunks),
255            self.cumulative_rows.clone(),
256            morsel.start_row,
257            morsel.end_row,
258        ))
259    }
260
261    fn num_columns(&self) -> usize {
262        self.num_columns
263    }
264}
265
266/// A partitioned view into a chunk source.
267struct PartitionedChunkSource {
268    chunks: Arc<Vec<DataChunk>>,
269    cumulative_rows: Vec<usize>,
270    start_row: usize,
271    end_row: usize,
272    current_row: usize,
273}
274
275impl PartitionedChunkSource {
276    fn new(
277        chunks: Arc<Vec<DataChunk>>,
278        cumulative_rows: Vec<usize>,
279        start_row: usize,
280        end_row: usize,
281    ) -> Self {
282        Self {
283            chunks,
284            cumulative_rows,
285            start_row,
286            end_row,
287            current_row: start_row,
288        }
289    }
290
291    /// Finds the chunk index containing the given row.
292    fn find_chunk_index(&self, row: usize) -> Option<usize> {
293        // Binary search for the chunk containing this row
294        match self
295            .cumulative_rows
296            .binary_search_by(|&cumul| cumul.cmp(&row))
297        {
298            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
299            Err(idx) => {
300                if idx == 0 {
301                    Some(0)
302                } else {
303                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
304                }
305            }
306        }
307    }
308}
309
310impl Source for PartitionedChunkSource {
311    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312        if self.current_row >= self.end_row || self.chunks.is_empty() {
313            return Ok(None);
314        }
315
316        // Find the chunk containing current_row
317        let chunk_idx = match self.find_chunk_index(self.current_row) {
318            Some(idx) => idx,
319            None => return Ok(None),
320        };
321
322        if chunk_idx >= self.chunks.len() {
323            return Ok(None);
324        }
325
326        let chunk_start = self.cumulative_rows[chunk_idx];
327        let chunk = &self.chunks[chunk_idx];
328        let offset_in_chunk = self.current_row - chunk_start;
329
330        // Calculate how many rows to extract
331        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
332        let rows_to_end = self.end_row.saturating_sub(self.current_row);
333        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
334
335        if rows_to_extract == 0 {
336            return Ok(None);
337        }
338
339        // Extract slice from chunk
340        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
341        self.current_row += rows_to_extract;
342
343        Ok(Some(sliced))
344    }
345
346    fn reset(&mut self) {
347        self.current_row = self.start_row;
348    }
349
350    fn name(&self) -> &'static str {
351        "PartitionedChunkSource"
352    }
353}
354
355/// Generates a range source for parallel execution testing.
356///
357/// Produces integers from 0 to n-1 in a single column.
358pub struct RangeSource {
359    total: usize,
360    position: usize,
361}
362
363impl RangeSource {
364    /// Creates a new range source.
365    #[must_use]
366    pub fn new(total: usize) -> Self {
367        Self { total, position: 0 }
368    }
369}
370
371impl Source for RangeSource {
372    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
373        if self.position >= self.total {
374            return Ok(None);
375        }
376
377        let end = (self.position + chunk_size).min(self.total);
378        let values: Vec<Value> = (self.position..end)
379            .map(|i| Value::Int64(i as i64))
380            .collect();
381
382        self.position = end;
383        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
384            &values,
385        )])))
386    }
387
388    fn reset(&mut self) {
389        self.position = 0;
390    }
391
392    fn name(&self) -> &'static str {
393        "RangeSource"
394    }
395}
396
397impl ParallelSource for RangeSource {
398    fn total_rows(&self) -> Option<usize> {
399        Some(self.total)
400    }
401
402    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
403        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
404    }
405
406    fn num_columns(&self) -> usize {
407        1
408    }
409}
410
411/// A partition of a range source.
412struct RangePartition {
413    start: usize,
414    end: usize,
415    position: usize,
416}
417
418impl RangePartition {
419    fn new(start: usize, end: usize) -> Self {
420        Self {
421            start,
422            end,
423            position: start,
424        }
425    }
426}
427
428impl Source for RangePartition {
429    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
430        if self.position >= self.end {
431            return Ok(None);
432        }
433
434        let end = (self.position + chunk_size).min(self.end);
435        let values: Vec<Value> = (self.position..end)
436            .map(|i| Value::Int64(i as i64))
437            .collect();
438
439        self.position = end;
440        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
441            &values,
442        )])))
443    }
444
445    fn reset(&mut self) {
446        self.position = self.start;
447    }
448
449    fn name(&self) -> &'static str {
450        "RangePartition"
451    }
452}
453
454/// Parallel source for RDF triple scanning.
455///
456/// Wraps triple data in a parallel source that can be partitioned for
457/// morsel-driven execution of SPARQL queries.
458#[cfg(feature = "rdf")]
459pub struct ParallelTripleScanSource {
460    /// Triple data: (subject, predicate, object) tuples.
461    triples: Arc<Vec<(Value, Value, Value)>>,
462    /// Current read position.
463    position: usize,
464    /// Variable names for output columns (e.g., ["s", "p", "o"]).
465    output_vars: Vec<String>,
466}
467
468#[cfg(feature = "rdf")]
469impl ParallelTripleScanSource {
470    /// Creates a new parallel triple scan source.
471    #[must_use]
472    pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
473        Self {
474            triples: Arc::new(triples),
475            position: 0,
476            output_vars,
477        }
478    }
479
480    /// Creates from an iterator of triples.
481    pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
482    where
483        I: IntoIterator<Item = (Value, Value, Value)>,
484    {
485        Self::new(iter.into_iter().collect(), output_vars)
486    }
487}
488
489#[cfg(feature = "rdf")]
490impl Source for ParallelTripleScanSource {
491    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
492        if self.position >= self.triples.len() {
493            return Ok(None);
494        }
495
496        let end = (self.position + chunk_size).min(self.triples.len());
497        let slice = &self.triples[self.position..end];
498
499        let mut subjects = Vec::with_capacity(slice.len());
500        let mut predicates = Vec::with_capacity(slice.len());
501        let mut objects = Vec::with_capacity(slice.len());
502
503        for (s, p, o) in slice {
504            subjects.push(s.clone());
505            predicates.push(p.clone());
506            objects.push(o.clone());
507        }
508
509        let columns = vec![
510            ValueVector::from_values(&subjects),
511            ValueVector::from_values(&predicates),
512            ValueVector::from_values(&objects),
513        ];
514
515        self.position = end;
516        Ok(Some(DataChunk::new(columns)))
517    }
518
519    fn reset(&mut self) {
520        self.position = 0;
521    }
522
523    fn name(&self) -> &'static str {
524        "ParallelTripleScanSource"
525    }
526}
527
528#[cfg(feature = "rdf")]
529impl ParallelSource for ParallelTripleScanSource {
530    fn total_rows(&self) -> Option<usize> {
531        Some(self.triples.len())
532    }
533
534    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
535        Box::new(PartitionedTripleScanSource::new(
536            Arc::clone(&self.triples),
537            self.output_vars.clone(),
538            morsel.start_row,
539            morsel.end_row,
540        ))
541    }
542
543    fn num_columns(&self) -> usize {
544        3 // subject, predicate, object
545    }
546}
547
548/// A partitioned view into a triple scan source.
549#[cfg(feature = "rdf")]
550struct PartitionedTripleScanSource {
551    triples: Arc<Vec<(Value, Value, Value)>>,
552    #[allow(dead_code)]
553    output_vars: Vec<String>,
554    start_row: usize,
555    end_row: usize,
556    position: usize,
557}
558
559#[cfg(feature = "rdf")]
560impl PartitionedTripleScanSource {
561    fn new(
562        triples: Arc<Vec<(Value, Value, Value)>>,
563        output_vars: Vec<String>,
564        start_row: usize,
565        end_row: usize,
566    ) -> Self {
567        Self {
568            triples,
569            output_vars,
570            start_row,
571            end_row,
572            position: start_row,
573        }
574    }
575}
576
577#[cfg(feature = "rdf")]
578impl Source for PartitionedTripleScanSource {
579    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
580        if self.position >= self.end_row || self.position >= self.triples.len() {
581            return Ok(None);
582        }
583
584        let end = (self.position + chunk_size)
585            .min(self.end_row)
586            .min(self.triples.len());
587        let slice = &self.triples[self.position..end];
588
589        let mut subjects = Vec::with_capacity(slice.len());
590        let mut predicates = Vec::with_capacity(slice.len());
591        let mut objects = Vec::with_capacity(slice.len());
592
593        for (s, p, o) in slice {
594            subjects.push(s.clone());
595            predicates.push(p.clone());
596            objects.push(o.clone());
597        }
598
599        let columns = vec![
600            ValueVector::from_values(&subjects),
601            ValueVector::from_values(&predicates),
602            ValueVector::from_values(&objects),
603        ];
604
605        self.position = end;
606        Ok(Some(DataChunk::new(columns)))
607    }
608
609    fn reset(&mut self) {
610        self.position = self.start_row;
611    }
612
613    fn name(&self) -> &'static str {
614        "PartitionedTripleScanSource"
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    #[test]
623    fn test_parallel_vector_source() {
624        let values: Vec<Value> = (0..100).map(|i| Value::Int64(i)).collect();
625        let source = ParallelVectorSource::single_column(values);
626
627        assert_eq!(source.total_rows(), Some(100));
628        assert!(source.is_partitionable());
629        assert_eq!(source.num_columns(), 1);
630
631        let morsels = source.generate_morsels(30, 0);
632        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
633    }
634
635    #[test]
636    fn test_parallel_vector_source_partition() {
637        let values: Vec<Value> = (0..100).map(|i| Value::Int64(i)).collect();
638        let source = ParallelVectorSource::single_column(values);
639
640        let morsel = Morsel::new(0, 0, 20, 50);
641        let mut partition = source.create_partition(&morsel);
642
643        // Should produce 30 rows total
644        let mut total = 0;
645        while let Ok(Some(chunk)) = partition.next_chunk(10) {
646            total += chunk.len();
647        }
648        assert_eq!(total, 30);
649    }
650
651    #[test]
652    fn test_range_source() {
653        let source = RangeSource::new(100);
654
655        assert_eq!(source.total_rows(), Some(100));
656        assert!(source.is_partitionable());
657
658        let morsels = source.generate_morsels(25, 0);
659        assert_eq!(morsels.len(), 4);
660    }
661
662    #[test]
663    fn test_range_source_partition() {
664        let source = RangeSource::new(100);
665
666        let morsel = Morsel::new(0, 0, 10, 30);
667        let mut partition = source.create_partition(&morsel);
668
669        let chunk = partition.next_chunk(100).unwrap().unwrap();
670        assert_eq!(chunk.len(), 20);
671
672        // Verify values are in range [10, 30)
673        let col = chunk.column(0).unwrap();
674        assert_eq!(col.get(0), Some(Value::Int64(10)));
675        assert_eq!(col.get(19), Some(Value::Int64(29)));
676    }
677
678    #[test]
679    fn test_parallel_chunk_source() {
680        let chunks: Vec<DataChunk> = (0..5)
681            .map(|i| {
682                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(|j| Value::Int64(j)).collect();
683                DataChunk::new(vec![ValueVector::from_values(&values)])
684            })
685            .collect();
686
687        let source = ParallelChunkSource::new(chunks);
688        assert_eq!(source.total_rows(), Some(50));
689        assert_eq!(source.num_columns(), 1);
690    }
691
692    #[test]
693    fn test_parallel_chunk_source_partition() {
694        let chunks: Vec<DataChunk> = (0..5)
695            .map(|i| {
696                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(|j| Value::Int64(j)).collect();
697                DataChunk::new(vec![ValueVector::from_values(&values)])
698            })
699            .collect();
700
701        let source = ParallelChunkSource::new(chunks);
702
703        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
704        let morsel = Morsel::new(0, 0, 15, 35);
705        let mut partition = source.create_partition(&morsel);
706
707        let mut total = 0;
708        let mut first_value: Option<i64> = None;
709        let mut last_value: Option<i64> = None;
710
711        while let Ok(Some(chunk)) = partition.next_chunk(10) {
712            if first_value.is_none() {
713                if let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0)) {
714                    first_value = Some(v);
715                }
716            }
717            if let Some(Value::Int64(v)) = chunk
718                .column(0)
719                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
720            {
721                last_value = Some(v);
722            }
723            total += chunk.len();
724        }
725
726        assert_eq!(total, 20);
727        assert_eq!(first_value, Some(15));
728        assert_eq!(last_value, Some(34));
729    }
730
731    #[test]
732    fn test_partitioned_source_reset() {
733        let source = RangeSource::new(100);
734        let morsel = Morsel::new(0, 0, 0, 50);
735        let mut partition = source.create_partition(&morsel);
736
737        // Exhaust partition
738        while partition.next_chunk(100).unwrap().is_some() {}
739
740        // Reset and read again
741        partition.reset();
742        let chunk = partition.next_chunk(100).unwrap().unwrap();
743        assert_eq!(chunk.len(), 50);
744    }
745
746    #[cfg(feature = "rdf")]
747    #[test]
748    fn test_parallel_triple_scan_source() {
749        let triples = vec![
750            (
751                Value::String("s1".into()),
752                Value::String("p1".into()),
753                Value::String("o1".into()),
754            ),
755            (
756                Value::String("s2".into()),
757                Value::String("p2".into()),
758                Value::String("o2".into()),
759            ),
760            (
761                Value::String("s3".into()),
762                Value::String("p3".into()),
763                Value::String("o3".into()),
764            ),
765        ];
766        let source =
767            ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
768
769        assert_eq!(source.total_rows(), Some(3));
770        assert!(source.is_partitionable());
771        assert_eq!(source.num_columns(), 3);
772    }
773
774    #[cfg(feature = "rdf")]
775    #[test]
776    fn test_parallel_triple_scan_partition() {
777        let triples: Vec<(Value, Value, Value)> = (0..100)
778            .map(|i| {
779                (
780                    Value::String(format!("s{}", i).into()),
781                    Value::String(format!("p{}", i).into()),
782                    Value::String(format!("o{}", i).into()),
783                )
784            })
785            .collect();
786        let source =
787            ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
788
789        let morsel = Morsel::new(0, 0, 20, 50);
790        let mut partition = source.create_partition(&morsel);
791
792        let mut total = 0;
793        while let Ok(Some(chunk)) = partition.next_chunk(10) {
794            total += chunk.len();
795        }
796        assert_eq!(total, 30);
797    }
798}