Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Cumulative row count at each chunk start.
188    cumulative_rows: Vec<usize>,
189    /// Total row count.
190    total_rows: usize,
191    /// Current chunk index.
192    chunk_index: usize,
193    /// Number of columns.
194    num_columns: usize,
195}
196
197impl ParallelChunkSource {
198    /// Creates a new parallel chunk source.
199    #[must_use]
200    pub fn new(chunks: Vec<DataChunk>) -> Self {
201        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202        let mut sum = 0;
203        cumulative_rows.push(0);
204        for chunk in &chunks {
205            sum += chunk.len();
206            cumulative_rows.push(sum);
207        }
208
209        let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211        Self {
212            chunks: Arc::new(chunks),
213            cumulative_rows,
214            total_rows: sum,
215            chunk_index: 0,
216            num_columns,
217        }
218    }
219}
220
221impl Source for ParallelChunkSource {
222    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223        if self.chunk_index >= self.chunks.len() {
224            return Ok(None);
225        }
226
227        let chunk = self.chunks[self.chunk_index].clone();
228        self.chunk_index += 1;
229        Ok(Some(chunk))
230    }
231
232    fn reset(&mut self) {
233        self.chunk_index = 0;
234    }
235
236    fn name(&self) -> &'static str {
237        "ParallelChunkSource"
238    }
239}
240
241impl ParallelSource for ParallelChunkSource {
242    fn total_rows(&self) -> Option<usize> {
243        Some(self.total_rows)
244    }
245
246    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247        Box::new(PartitionedChunkSource::new(
248            Arc::clone(&self.chunks),
249            self.cumulative_rows.clone(),
250            morsel.start_row,
251            morsel.end_row,
252        ))
253    }
254
255    fn num_columns(&self) -> usize {
256        self.num_columns
257    }
258}
259
260/// A partitioned view into a chunk source.
261struct PartitionedChunkSource {
262    chunks: Arc<Vec<DataChunk>>,
263    cumulative_rows: Vec<usize>,
264    start_row: usize,
265    end_row: usize,
266    current_row: usize,
267}
268
269impl PartitionedChunkSource {
270    fn new(
271        chunks: Arc<Vec<DataChunk>>,
272        cumulative_rows: Vec<usize>,
273        start_row: usize,
274        end_row: usize,
275    ) -> Self {
276        Self {
277            chunks,
278            cumulative_rows,
279            start_row,
280            end_row,
281            current_row: start_row,
282        }
283    }
284
285    /// Finds the chunk index containing the given row.
286    fn find_chunk_index(&self, row: usize) -> Option<usize> {
287        // Binary search for the chunk containing this row
288        match self
289            .cumulative_rows
290            .binary_search_by(|&cumul| cumul.cmp(&row))
291        {
292            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293            Err(idx) => {
294                if idx == 0 {
295                    Some(0)
296                } else {
297                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298                }
299            }
300        }
301    }
302}
303
304impl Source for PartitionedChunkSource {
305    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306        if self.current_row >= self.end_row || self.chunks.is_empty() {
307            return Ok(None);
308        }
309
310        // Find the chunk containing current_row
311        let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312            return Ok(None);
313        };
314
315        if chunk_idx >= self.chunks.len() {
316            return Ok(None);
317        }
318
319        let chunk_start = self.cumulative_rows[chunk_idx];
320        let chunk = &self.chunks[chunk_idx];
321        let offset_in_chunk = self.current_row - chunk_start;
322
323        // Calculate how many rows to extract
324        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325        let rows_to_end = self.end_row.saturating_sub(self.current_row);
326        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328        if rows_to_extract == 0 {
329            return Ok(None);
330        }
331
332        // Extract slice from chunk
333        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334        self.current_row += rows_to_extract;
335
336        Ok(Some(sliced))
337    }
338
339    fn reset(&mut self) {
340        self.current_row = self.start_row;
341    }
342
343    fn name(&self) -> &'static str {
344        "PartitionedChunkSource"
345    }
346}
347
348/// Generates a range source for parallel execution testing.
349///
350/// Produces integers from 0 to n-1 in a single column.
351pub struct RangeSource {
352    total: usize,
353    position: usize,
354}
355
356impl RangeSource {
357    /// Creates a new range source.
358    #[must_use]
359    pub fn new(total: usize) -> Self {
360        Self { total, position: 0 }
361    }
362}
363
364impl Source for RangeSource {
365    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366        if self.position >= self.total {
367            return Ok(None);
368        }
369
370        let end = (self.position + chunk_size).min(self.total);
371        let values: Vec<Value> = (self.position..end)
372            .map(|i| Value::Int64(i as i64))
373            .collect();
374
375        self.position = end;
376        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
377            &values,
378        )])))
379    }
380
381    fn reset(&mut self) {
382        self.position = 0;
383    }
384
385    fn name(&self) -> &'static str {
386        "RangeSource"
387    }
388}
389
390impl ParallelSource for RangeSource {
391    fn total_rows(&self) -> Option<usize> {
392        Some(self.total)
393    }
394
395    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
396        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
397    }
398
399    fn num_columns(&self) -> usize {
400        1
401    }
402}
403
404/// A partition of a range source.
405struct RangePartition {
406    start: usize,
407    end: usize,
408    position: usize,
409}
410
411impl RangePartition {
412    fn new(start: usize, end: usize) -> Self {
413        Self {
414            start,
415            end,
416            position: start,
417        }
418    }
419}
420
421impl Source for RangePartition {
422    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
423        if self.position >= self.end {
424            return Ok(None);
425        }
426
427        let end = (self.position + chunk_size).min(self.end);
428        let values: Vec<Value> = (self.position..end)
429            .map(|i| Value::Int64(i as i64))
430            .collect();
431
432        self.position = end;
433        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
434            &values,
435        )])))
436    }
437
438    fn reset(&mut self) {
439        self.position = self.start;
440    }
441
442    fn name(&self) -> &'static str {
443        "RangePartition"
444    }
445}
446
447/// Parallel source for RDF triple scanning.
448///
449/// Wraps triple data in a parallel source that can be partitioned for
450/// morsel-driven execution of SPARQL queries.
451#[cfg(feature = "rdf")]
452pub struct ParallelTripleScanSource {
453    /// Triple data: (subject, predicate, object) tuples.
454    triples: Arc<Vec<(Value, Value, Value)>>,
455    /// Current read position.
456    position: usize,
457}
458
459#[cfg(feature = "rdf")]
460impl ParallelTripleScanSource {
461    /// Creates a new parallel triple scan source.
462    #[must_use]
463    pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
464        Self {
465            triples: Arc::new(triples),
466            position: 0,
467        }
468    }
469
470    /// Creates from an iterator of triples.
471    pub fn from_triples<I>(iter: I) -> Self
472    where
473        I: IntoIterator<Item = (Value, Value, Value)>,
474    {
475        Self::new(iter.into_iter().collect())
476    }
477}
478
479#[cfg(feature = "rdf")]
480impl Source for ParallelTripleScanSource {
481    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
482        if self.position >= self.triples.len() {
483            return Ok(None);
484        }
485
486        let end = (self.position + chunk_size).min(self.triples.len());
487        let slice = &self.triples[self.position..end];
488
489        let mut subjects = Vec::with_capacity(slice.len());
490        let mut predicates = Vec::with_capacity(slice.len());
491        let mut objects = Vec::with_capacity(slice.len());
492
493        for (s, p, o) in slice {
494            subjects.push(s.clone());
495            predicates.push(p.clone());
496            objects.push(o.clone());
497        }
498
499        let columns = vec![
500            ValueVector::from_values(&subjects),
501            ValueVector::from_values(&predicates),
502            ValueVector::from_values(&objects),
503        ];
504
505        self.position = end;
506        Ok(Some(DataChunk::new(columns)))
507    }
508
509    fn reset(&mut self) {
510        self.position = 0;
511    }
512
513    fn name(&self) -> &'static str {
514        "ParallelTripleScanSource"
515    }
516}
517
518#[cfg(feature = "rdf")]
519impl ParallelSource for ParallelTripleScanSource {
520    fn total_rows(&self) -> Option<usize> {
521        Some(self.triples.len())
522    }
523
524    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
525        Box::new(PartitionedTripleScanSource::new(
526            Arc::clone(&self.triples),
527            morsel.start_row,
528            morsel.end_row,
529        ))
530    }
531
532    fn num_columns(&self) -> usize {
533        3 // subject, predicate, object
534    }
535}
536
537/// A partitioned view into a triple scan source.
538#[cfg(feature = "rdf")]
539struct PartitionedTripleScanSource {
540    triples: Arc<Vec<(Value, Value, Value)>>,
541    start_row: usize,
542    end_row: usize,
543    position: usize,
544}
545
546#[cfg(feature = "rdf")]
547impl PartitionedTripleScanSource {
548    fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
549        Self {
550            triples,
551            start_row,
552            end_row,
553            position: start_row,
554        }
555    }
556}
557
558#[cfg(feature = "rdf")]
559impl Source for PartitionedTripleScanSource {
560    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
561        if self.position >= self.end_row || self.position >= self.triples.len() {
562            return Ok(None);
563        }
564
565        let end = (self.position + chunk_size)
566            .min(self.end_row)
567            .min(self.triples.len());
568        let slice = &self.triples[self.position..end];
569
570        let mut subjects = Vec::with_capacity(slice.len());
571        let mut predicates = Vec::with_capacity(slice.len());
572        let mut objects = Vec::with_capacity(slice.len());
573
574        for (s, p, o) in slice {
575            subjects.push(s.clone());
576            predicates.push(p.clone());
577            objects.push(o.clone());
578        }
579
580        let columns = vec![
581            ValueVector::from_values(&subjects),
582            ValueVector::from_values(&predicates),
583            ValueVector::from_values(&objects),
584        ];
585
586        self.position = end;
587        Ok(Some(DataChunk::new(columns)))
588    }
589
590    fn reset(&mut self) {
591        self.position = self.start_row;
592    }
593
594    fn name(&self) -> &'static str {
595        "PartitionedTripleScanSource"
596    }
597}
598
599// ---------------------------------------------------------------------------
600// Parallel Node Scan Source (LPG)
601// ---------------------------------------------------------------------------
602
603use crate::graph::GraphStore;
604use grafeo_common::types::NodeId;
605
606/// Parallel source for scanning nodes from the LPG store.
607///
608/// Enables morsel-driven parallel execution of node scans by label.
609/// Each partition independently scans a range of node IDs, enabling
610/// linear scaling on multi-core systems for large datasets.
611///
612/// # Example
613///
614/// ```rust
615/// use grafeo_core::execution::parallel::{ParallelNodeScanSource, ParallelSource};
616/// use grafeo_core::graph::lpg::LpgStore;
617/// use std::sync::Arc;
618///
619/// let store = Arc::new(LpgStore::new());
620/// // ... populate store ...
621///
622/// // Scan all Person nodes in parallel
623/// let source = ParallelNodeScanSource::with_label(store, "Person");
624/// let morsels = source.generate_morsels(4096, 0);
625/// ```
626pub struct ParallelNodeScanSource {
627    /// The store to scan from.
628    store: Arc<dyn GraphStore>,
629    /// Cached node IDs for the scan.
630    node_ids: Arc<Vec<NodeId>>,
631    /// Current read position.
632    position: usize,
633}
634
635impl ParallelNodeScanSource {
636    /// Creates a parallel source for all nodes in the store.
637    #[must_use]
638    pub fn new(store: Arc<dyn GraphStore>) -> Self {
639        let node_ids = Arc::new(store.node_ids());
640        Self {
641            store,
642            node_ids,
643            position: 0,
644        }
645    }
646
647    /// Creates a parallel source for nodes with a specific label.
648    #[must_use]
649    pub fn with_label(store: Arc<dyn GraphStore>, label: &str) -> Self {
650        let node_ids = Arc::new(store.nodes_by_label(label));
651        Self {
652            store,
653            node_ids,
654            position: 0,
655        }
656    }
657
658    /// Creates from pre-computed node IDs.
659    ///
660    /// Useful when node IDs are already available from a previous operation.
661    #[must_use]
662    pub fn from_node_ids(store: Arc<dyn GraphStore>, node_ids: Vec<NodeId>) -> Self {
663        Self {
664            store,
665            node_ids: Arc::new(node_ids),
666            position: 0,
667        }
668    }
669
670    /// Returns the underlying store reference.
671    #[must_use]
672    pub fn store(&self) -> &Arc<dyn GraphStore> {
673        &self.store
674    }
675}
676
677impl Source for ParallelNodeScanSource {
678    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
679        if self.position >= self.node_ids.len() {
680            return Ok(None);
681        }
682
683        let end = (self.position + chunk_size).min(self.node_ids.len());
684        let slice = &self.node_ids[self.position..end];
685
686        // Create a NodeId vector
687        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
688        for &id in slice {
689            vector.push_node_id(id);
690        }
691
692        self.position = end;
693        Ok(Some(DataChunk::new(vec![vector])))
694    }
695
696    fn reset(&mut self) {
697        self.position = 0;
698    }
699
700    fn name(&self) -> &'static str {
701        "ParallelNodeScanSource"
702    }
703}
704
705impl ParallelSource for ParallelNodeScanSource {
706    fn total_rows(&self) -> Option<usize> {
707        Some(self.node_ids.len())
708    }
709
710    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
711        Box::new(PartitionedNodeScanSource::new(
712            Arc::clone(&self.node_ids),
713            morsel.start_row,
714            morsel.end_row,
715        ))
716    }
717
718    fn num_columns(&self) -> usize {
719        1 // Node ID column
720    }
721}
722
723/// A partitioned view into a node scan source.
724struct PartitionedNodeScanSource {
725    node_ids: Arc<Vec<NodeId>>,
726    start_row: usize,
727    end_row: usize,
728    position: usize,
729}
730
731impl PartitionedNodeScanSource {
732    fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
733        Self {
734            node_ids,
735            start_row,
736            end_row,
737            position: start_row,
738        }
739    }
740}
741
742impl Source for PartitionedNodeScanSource {
743    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
744        if self.position >= self.end_row || self.position >= self.node_ids.len() {
745            return Ok(None);
746        }
747
748        let end = (self.position + chunk_size)
749            .min(self.end_row)
750            .min(self.node_ids.len());
751        let slice = &self.node_ids[self.position..end];
752
753        // Create a NodeId vector
754        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
755        for &id in slice {
756            vector.push_node_id(id);
757        }
758
759        self.position = end;
760        Ok(Some(DataChunk::new(vec![vector])))
761    }
762
763    fn reset(&mut self) {
764        self.position = self.start_row;
765    }
766
767    fn name(&self) -> &'static str {
768        "PartitionedNodeScanSource"
769    }
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775    use crate::graph::GraphStoreMut;
776    use crate::graph::lpg::LpgStore;
777
778    #[test]
779    fn test_parallel_vector_source() {
780        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
781        let source = ParallelVectorSource::single_column(values);
782
783        assert_eq!(source.total_rows(), Some(100));
784        assert!(source.is_partitionable());
785        assert_eq!(source.num_columns(), 1);
786
787        let morsels = source.generate_morsels(30, 0);
788        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
789    }
790
791    #[test]
792    fn test_parallel_vector_source_partition() {
793        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
794        let source = ParallelVectorSource::single_column(values);
795
796        let morsel = Morsel::new(0, 0, 20, 50);
797        let mut partition = source.create_partition(&morsel);
798
799        // Should produce 30 rows total
800        let mut total = 0;
801        while let Ok(Some(chunk)) = partition.next_chunk(10) {
802            total += chunk.len();
803        }
804        assert_eq!(total, 30);
805    }
806
807    #[test]
808    fn test_range_source() {
809        let source = RangeSource::new(100);
810
811        assert_eq!(source.total_rows(), Some(100));
812        assert!(source.is_partitionable());
813
814        let morsels = source.generate_morsels(25, 0);
815        assert_eq!(morsels.len(), 4);
816    }
817
818    #[test]
819    fn test_range_source_partition() {
820        let source = RangeSource::new(100);
821
822        let morsel = Morsel::new(0, 0, 10, 30);
823        let mut partition = source.create_partition(&morsel);
824
825        let chunk = partition.next_chunk(100).unwrap().unwrap();
826        assert_eq!(chunk.len(), 20);
827
828        // Verify values are in range [10, 30)
829        let col = chunk.column(0).unwrap();
830        assert_eq!(col.get(0), Some(Value::Int64(10)));
831        assert_eq!(col.get(19), Some(Value::Int64(29)));
832    }
833
834    #[test]
835    fn test_parallel_chunk_source() {
836        let chunks: Vec<DataChunk> = (0..5)
837            .map(|i| {
838                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
839                DataChunk::new(vec![ValueVector::from_values(&values)])
840            })
841            .collect();
842
843        let source = ParallelChunkSource::new(chunks);
844        assert_eq!(source.total_rows(), Some(50));
845        assert_eq!(source.num_columns(), 1);
846    }
847
848    #[test]
849    fn test_parallel_chunk_source_partition() {
850        let chunks: Vec<DataChunk> = (0..5)
851            .map(|i| {
852                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
853                DataChunk::new(vec![ValueVector::from_values(&values)])
854            })
855            .collect();
856
857        let source = ParallelChunkSource::new(chunks);
858
859        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
860        let morsel = Morsel::new(0, 0, 15, 35);
861        let mut partition = source.create_partition(&morsel);
862
863        let mut total = 0;
864        let mut first_value: Option<i64> = None;
865        let mut last_value: Option<i64> = None;
866
867        while let Ok(Some(chunk)) = partition.next_chunk(10) {
868            if first_value.is_none()
869                && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
870            {
871                first_value = Some(v);
872            }
873            if let Some(Value::Int64(v)) = chunk
874                .column(0)
875                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
876            {
877                last_value = Some(v);
878            }
879            total += chunk.len();
880        }
881
882        assert_eq!(total, 20);
883        assert_eq!(first_value, Some(15));
884        assert_eq!(last_value, Some(34));
885    }
886
887    #[test]
888    fn test_partitioned_source_reset() {
889        let source = RangeSource::new(100);
890        let morsel = Morsel::new(0, 0, 0, 50);
891        let mut partition = source.create_partition(&morsel);
892
893        // Exhaust partition
894        while partition.next_chunk(100).unwrap().is_some() {}
895
896        // Reset and read again
897        partition.reset();
898        let chunk = partition.next_chunk(100).unwrap().unwrap();
899        assert_eq!(chunk.len(), 50);
900    }
901
902    #[cfg(feature = "rdf")]
903    #[test]
904    fn test_parallel_triple_scan_source() {
905        let triples = vec![
906            (
907                Value::String("s1".into()),
908                Value::String("p1".into()),
909                Value::String("o1".into()),
910            ),
911            (
912                Value::String("s2".into()),
913                Value::String("p2".into()),
914                Value::String("o2".into()),
915            ),
916            (
917                Value::String("s3".into()),
918                Value::String("p3".into()),
919                Value::String("o3".into()),
920            ),
921        ];
922        let source = ParallelTripleScanSource::new(triples);
923
924        assert_eq!(source.total_rows(), Some(3));
925        assert!(source.is_partitionable());
926        assert_eq!(source.num_columns(), 3);
927    }
928
929    #[cfg(feature = "rdf")]
930    #[test]
931    fn test_parallel_triple_scan_partition() {
932        let triples: Vec<(Value, Value, Value)> = (0..100)
933            .map(|i| {
934                (
935                    Value::String(format!("s{}", i).into()),
936                    Value::String(format!("p{}", i).into()),
937                    Value::String(format!("o{}", i).into()),
938                )
939            })
940            .collect();
941        let source = ParallelTripleScanSource::new(triples);
942
943        let morsel = Morsel::new(0, 0, 20, 50);
944        let mut partition = source.create_partition(&morsel);
945
946        let mut total = 0;
947        while let Ok(Some(chunk)) = partition.next_chunk(10) {
948            total += chunk.len();
949        }
950        assert_eq!(total, 30);
951    }
952
953    #[test]
954    fn test_parallel_node_scan_source() {
955        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
956
957        // Add some nodes with labels
958        for i in 0..100 {
959            if i % 2 == 0 {
960                store.create_node(&["Person", "Employee"]);
961            } else {
962                store.create_node(&["Person"]);
963            }
964        }
965
966        // Test scan all nodes
967        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
968        assert_eq!(source.total_rows(), Some(100));
969        assert!(source.is_partitionable());
970        assert_eq!(source.num_columns(), 1);
971
972        // Test scan by label
973        let source_person =
974            ParallelNodeScanSource::with_label(Arc::clone(&store) as Arc<dyn GraphStore>, "Person");
975        assert_eq!(source_person.total_rows(), Some(100));
976
977        let source_employee = ParallelNodeScanSource::with_label(
978            Arc::clone(&store) as Arc<dyn GraphStore>,
979            "Employee",
980        );
981        assert_eq!(source_employee.total_rows(), Some(50));
982    }
983
984    #[test]
985    fn test_parallel_node_scan_partition() {
986        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
987
988        // Add 100 nodes
989        for _ in 0..100 {
990            store.create_node(&[]);
991        }
992
993        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
994
995        // Create partition for rows 20-50
996        let morsel = Morsel::new(0, 0, 20, 50);
997        let mut partition = source.create_partition(&morsel);
998
999        // Should produce 30 rows total
1000        let mut total = 0;
1001        while let Ok(Some(chunk)) = partition.next_chunk(10) {
1002            total += chunk.len();
1003        }
1004        assert_eq!(total, 30);
1005    }
1006
1007    #[test]
1008    fn test_parallel_node_scan_morsels() {
1009        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new());
1010
1011        // Add 1000 nodes
1012        for _ in 0..1000 {
1013            store.create_node(&[]);
1014        }
1015
1016        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1017
1018        // Generate morsels with size 256
1019        let morsels = source.generate_morsels(256, 0);
1020        assert_eq!(morsels.len(), 4); // 1000 / 256 = 3 full + 1 partial
1021
1022        // Verify morsels cover all rows
1023        let mut total_rows = 0;
1024        for morsel in &morsels {
1025            total_rows += morsel.end_row - morsel.start_row;
1026        }
1027        assert_eq!(total_rows, 1000);
1028    }
1029}