Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Row count in each chunk (cached for fast morsel generation).
188    #[allow(dead_code)]
189    chunk_row_counts: Vec<usize>,
190    /// Cumulative row count at each chunk start.
191    cumulative_rows: Vec<usize>,
192    /// Total row count.
193    total_rows: usize,
194    /// Current chunk index.
195    chunk_index: usize,
196    /// Number of columns.
197    num_columns: usize,
198}
199
200impl ParallelChunkSource {
201    /// Creates a new parallel chunk source.
202    #[must_use]
203    pub fn new(chunks: Vec<DataChunk>) -> Self {
204        let chunk_row_counts: Vec<usize> = chunks.iter().map(DataChunk::len).collect();
205
206        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
207        let mut sum = 0;
208        cumulative_rows.push(0);
209        for &count in &chunk_row_counts {
210            sum += count;
211            cumulative_rows.push(sum);
212        }
213
214        let num_columns = chunks.first().map_or(0, |c| c.num_columns());
215
216        Self {
217            chunks: Arc::new(chunks),
218            chunk_row_counts,
219            cumulative_rows,
220            total_rows: sum,
221            chunk_index: 0,
222            num_columns,
223        }
224    }
225}
226
227impl Source for ParallelChunkSource {
228    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
229        if self.chunk_index >= self.chunks.len() {
230            return Ok(None);
231        }
232
233        let chunk = self.chunks[self.chunk_index].clone();
234        self.chunk_index += 1;
235        Ok(Some(chunk))
236    }
237
238    fn reset(&mut self) {
239        self.chunk_index = 0;
240    }
241
242    fn name(&self) -> &'static str {
243        "ParallelChunkSource"
244    }
245}
246
247impl ParallelSource for ParallelChunkSource {
248    fn total_rows(&self) -> Option<usize> {
249        Some(self.total_rows)
250    }
251
252    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
253        Box::new(PartitionedChunkSource::new(
254            Arc::clone(&self.chunks),
255            self.cumulative_rows.clone(),
256            morsel.start_row,
257            morsel.end_row,
258        ))
259    }
260
261    fn num_columns(&self) -> usize {
262        self.num_columns
263    }
264}
265
266/// A partitioned view into a chunk source.
267struct PartitionedChunkSource {
268    chunks: Arc<Vec<DataChunk>>,
269    cumulative_rows: Vec<usize>,
270    start_row: usize,
271    end_row: usize,
272    current_row: usize,
273}
274
275impl PartitionedChunkSource {
276    fn new(
277        chunks: Arc<Vec<DataChunk>>,
278        cumulative_rows: Vec<usize>,
279        start_row: usize,
280        end_row: usize,
281    ) -> Self {
282        Self {
283            chunks,
284            cumulative_rows,
285            start_row,
286            end_row,
287            current_row: start_row,
288        }
289    }
290
291    /// Finds the chunk index containing the given row.
292    fn find_chunk_index(&self, row: usize) -> Option<usize> {
293        // Binary search for the chunk containing this row
294        match self
295            .cumulative_rows
296            .binary_search_by(|&cumul| cumul.cmp(&row))
297        {
298            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
299            Err(idx) => {
300                if idx == 0 {
301                    Some(0)
302                } else {
303                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
304                }
305            }
306        }
307    }
308}
309
310impl Source for PartitionedChunkSource {
311    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312        if self.current_row >= self.end_row || self.chunks.is_empty() {
313            return Ok(None);
314        }
315
316        // Find the chunk containing current_row
317        let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
318            return Ok(None);
319        };
320
321        if chunk_idx >= self.chunks.len() {
322            return Ok(None);
323        }
324
325        let chunk_start = self.cumulative_rows[chunk_idx];
326        let chunk = &self.chunks[chunk_idx];
327        let offset_in_chunk = self.current_row - chunk_start;
328
329        // Calculate how many rows to extract
330        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
331        let rows_to_end = self.end_row.saturating_sub(self.current_row);
332        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
333
334        if rows_to_extract == 0 {
335            return Ok(None);
336        }
337
338        // Extract slice from chunk
339        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
340        self.current_row += rows_to_extract;
341
342        Ok(Some(sliced))
343    }
344
345    fn reset(&mut self) {
346        self.current_row = self.start_row;
347    }
348
349    fn name(&self) -> &'static str {
350        "PartitionedChunkSource"
351    }
352}
353
354/// Generates a range source for parallel execution testing.
355///
356/// Produces integers from 0 to n-1 in a single column.
357pub struct RangeSource {
358    total: usize,
359    position: usize,
360}
361
362impl RangeSource {
363    /// Creates a new range source.
364    #[must_use]
365    pub fn new(total: usize) -> Self {
366        Self { total, position: 0 }
367    }
368}
369
370impl Source for RangeSource {
371    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
372        if self.position >= self.total {
373            return Ok(None);
374        }
375
376        let end = (self.position + chunk_size).min(self.total);
377        let values: Vec<Value> = (self.position..end)
378            .map(|i| Value::Int64(i as i64))
379            .collect();
380
381        self.position = end;
382        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
383            &values,
384        )])))
385    }
386
387    fn reset(&mut self) {
388        self.position = 0;
389    }
390
391    fn name(&self) -> &'static str {
392        "RangeSource"
393    }
394}
395
396impl ParallelSource for RangeSource {
397    fn total_rows(&self) -> Option<usize> {
398        Some(self.total)
399    }
400
401    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
402        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
403    }
404
405    fn num_columns(&self) -> usize {
406        1
407    }
408}
409
410/// A partition of a range source.
411struct RangePartition {
412    start: usize,
413    end: usize,
414    position: usize,
415}
416
417impl RangePartition {
418    fn new(start: usize, end: usize) -> Self {
419        Self {
420            start,
421            end,
422            position: start,
423        }
424    }
425}
426
427impl Source for RangePartition {
428    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
429        if self.position >= self.end {
430            return Ok(None);
431        }
432
433        let end = (self.position + chunk_size).min(self.end);
434        let values: Vec<Value> = (self.position..end)
435            .map(|i| Value::Int64(i as i64))
436            .collect();
437
438        self.position = end;
439        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
440            &values,
441        )])))
442    }
443
444    fn reset(&mut self) {
445        self.position = self.start;
446    }
447
448    fn name(&self) -> &'static str {
449        "RangePartition"
450    }
451}
452
453/// Parallel source for RDF triple scanning.
454///
455/// Wraps triple data in a parallel source that can be partitioned for
456/// morsel-driven execution of SPARQL queries.
457#[cfg(feature = "rdf")]
458pub struct ParallelTripleScanSource {
459    /// Triple data: (subject, predicate, object) tuples.
460    triples: Arc<Vec<(Value, Value, Value)>>,
461    /// Current read position.
462    position: usize,
463    /// Variable names for output columns (e.g., ["s", "p", "o"]).
464    output_vars: Vec<String>,
465}
466
467#[cfg(feature = "rdf")]
468impl ParallelTripleScanSource {
469    /// Creates a new parallel triple scan source.
470    #[must_use]
471    pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
472        Self {
473            triples: Arc::new(triples),
474            position: 0,
475            output_vars,
476        }
477    }
478
479    /// Creates from an iterator of triples.
480    pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
481    where
482        I: IntoIterator<Item = (Value, Value, Value)>,
483    {
484        Self::new(iter.into_iter().collect(), output_vars)
485    }
486}
487
488#[cfg(feature = "rdf")]
489impl Source for ParallelTripleScanSource {
490    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
491        if self.position >= self.triples.len() {
492            return Ok(None);
493        }
494
495        let end = (self.position + chunk_size).min(self.triples.len());
496        let slice = &self.triples[self.position..end];
497
498        let mut subjects = Vec::with_capacity(slice.len());
499        let mut predicates = Vec::with_capacity(slice.len());
500        let mut objects = Vec::with_capacity(slice.len());
501
502        for (s, p, o) in slice {
503            subjects.push(s.clone());
504            predicates.push(p.clone());
505            objects.push(o.clone());
506        }
507
508        let columns = vec![
509            ValueVector::from_values(&subjects),
510            ValueVector::from_values(&predicates),
511            ValueVector::from_values(&objects),
512        ];
513
514        self.position = end;
515        Ok(Some(DataChunk::new(columns)))
516    }
517
518    fn reset(&mut self) {
519        self.position = 0;
520    }
521
522    fn name(&self) -> &'static str {
523        "ParallelTripleScanSource"
524    }
525}
526
527#[cfg(feature = "rdf")]
528impl ParallelSource for ParallelTripleScanSource {
529    fn total_rows(&self) -> Option<usize> {
530        Some(self.triples.len())
531    }
532
533    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
534        Box::new(PartitionedTripleScanSource::new(
535            Arc::clone(&self.triples),
536            self.output_vars.clone(),
537            morsel.start_row,
538            morsel.end_row,
539        ))
540    }
541
542    fn num_columns(&self) -> usize {
543        3 // subject, predicate, object
544    }
545}
546
547/// A partitioned view into a triple scan source.
548#[cfg(feature = "rdf")]
549struct PartitionedTripleScanSource {
550    triples: Arc<Vec<(Value, Value, Value)>>,
551    #[allow(dead_code)]
552    output_vars: Vec<String>,
553    start_row: usize,
554    end_row: usize,
555    position: usize,
556}
557
558#[cfg(feature = "rdf")]
559impl PartitionedTripleScanSource {
560    fn new(
561        triples: Arc<Vec<(Value, Value, Value)>>,
562        output_vars: Vec<String>,
563        start_row: usize,
564        end_row: usize,
565    ) -> Self {
566        Self {
567            triples,
568            output_vars,
569            start_row,
570            end_row,
571            position: start_row,
572        }
573    }
574}
575
576#[cfg(feature = "rdf")]
577impl Source for PartitionedTripleScanSource {
578    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
579        if self.position >= self.end_row || self.position >= self.triples.len() {
580            return Ok(None);
581        }
582
583        let end = (self.position + chunk_size)
584            .min(self.end_row)
585            .min(self.triples.len());
586        let slice = &self.triples[self.position..end];
587
588        let mut subjects = Vec::with_capacity(slice.len());
589        let mut predicates = Vec::with_capacity(slice.len());
590        let mut objects = Vec::with_capacity(slice.len());
591
592        for (s, p, o) in slice {
593            subjects.push(s.clone());
594            predicates.push(p.clone());
595            objects.push(o.clone());
596        }
597
598        let columns = vec![
599            ValueVector::from_values(&subjects),
600            ValueVector::from_values(&predicates),
601            ValueVector::from_values(&objects),
602        ];
603
604        self.position = end;
605        Ok(Some(DataChunk::new(columns)))
606    }
607
608    fn reset(&mut self) {
609        self.position = self.start_row;
610    }
611
612    fn name(&self) -> &'static str {
613        "PartitionedTripleScanSource"
614    }
615}
616
617// ---------------------------------------------------------------------------
618// Parallel Node Scan Source (LPG)
619// ---------------------------------------------------------------------------
620
621use crate::graph::lpg::LpgStore;
622use grafeo_common::types::NodeId;
623
624/// Parallel source for scanning nodes from the LPG store.
625///
626/// Enables morsel-driven parallel execution of node scans by label.
627/// Each partition independently scans a range of node IDs, enabling
628/// linear scaling on multi-core systems for large datasets.
629///
630/// # Example
631///
632/// ```ignore
633/// use grafeo_core::execution::parallel::{ParallelNodeScanSource, ParallelPipeline};
634/// use std::sync::Arc;
635///
636/// let store = Arc::new(LpgStore::new());
637/// // ... populate store ...
638///
639/// // Scan all Person nodes in parallel
640/// let source = ParallelNodeScanSource::with_label(store, "Person");
641/// let morsels = source.generate_morsels(4096, 0);
642/// ```
643pub struct ParallelNodeScanSource {
644    /// The store to scan from.
645    store: Arc<LpgStore>,
646    /// Cached node IDs for the scan.
647    node_ids: Arc<Vec<NodeId>>,
648    /// Current read position.
649    position: usize,
650}
651
652impl ParallelNodeScanSource {
653    /// Creates a parallel source for all nodes in the store.
654    #[must_use]
655    pub fn new(store: Arc<LpgStore>) -> Self {
656        let node_ids = Arc::new(store.node_ids());
657        Self {
658            store,
659            node_ids,
660            position: 0,
661        }
662    }
663
664    /// Creates a parallel source for nodes with a specific label.
665    #[must_use]
666    pub fn with_label(store: Arc<LpgStore>, label: &str) -> Self {
667        let node_ids = Arc::new(store.nodes_by_label(label));
668        Self {
669            store,
670            node_ids,
671            position: 0,
672        }
673    }
674
675    /// Creates from pre-computed node IDs.
676    ///
677    /// Useful when node IDs are already available from a previous operation.
678    #[must_use]
679    pub fn from_node_ids(store: Arc<LpgStore>, node_ids: Vec<NodeId>) -> Self {
680        Self {
681            store,
682            node_ids: Arc::new(node_ids),
683            position: 0,
684        }
685    }
686
687    /// Returns the underlying store reference.
688    #[must_use]
689    pub fn store(&self) -> &Arc<LpgStore> {
690        &self.store
691    }
692}
693
694impl Source for ParallelNodeScanSource {
695    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
696        if self.position >= self.node_ids.len() {
697            return Ok(None);
698        }
699
700        let end = (self.position + chunk_size).min(self.node_ids.len());
701        let slice = &self.node_ids[self.position..end];
702
703        // Create a NodeId vector
704        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
705        for &id in slice {
706            vector.push_node_id(id);
707        }
708
709        self.position = end;
710        Ok(Some(DataChunk::new(vec![vector])))
711    }
712
713    fn reset(&mut self) {
714        self.position = 0;
715    }
716
717    fn name(&self) -> &'static str {
718        "ParallelNodeScanSource"
719    }
720}
721
722impl ParallelSource for ParallelNodeScanSource {
723    fn total_rows(&self) -> Option<usize> {
724        Some(self.node_ids.len())
725    }
726
727    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
728        Box::new(PartitionedNodeScanSource::new(
729            Arc::clone(&self.node_ids),
730            morsel.start_row,
731            morsel.end_row,
732        ))
733    }
734
735    fn num_columns(&self) -> usize {
736        1 // Node ID column
737    }
738}
739
740/// A partitioned view into a node scan source.
741struct PartitionedNodeScanSource {
742    node_ids: Arc<Vec<NodeId>>,
743    start_row: usize,
744    end_row: usize,
745    position: usize,
746}
747
748impl PartitionedNodeScanSource {
749    fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
750        Self {
751            node_ids,
752            start_row,
753            end_row,
754            position: start_row,
755        }
756    }
757}
758
759impl Source for PartitionedNodeScanSource {
760    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
761        if self.position >= self.end_row || self.position >= self.node_ids.len() {
762            return Ok(None);
763        }
764
765        let end = (self.position + chunk_size)
766            .min(self.end_row)
767            .min(self.node_ids.len());
768        let slice = &self.node_ids[self.position..end];
769
770        // Create a NodeId vector
771        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
772        for &id in slice {
773            vector.push_node_id(id);
774        }
775
776        self.position = end;
777        Ok(Some(DataChunk::new(vec![vector])))
778    }
779
780    fn reset(&mut self) {
781        self.position = self.start_row;
782    }
783
784    fn name(&self) -> &'static str {
785        "PartitionedNodeScanSource"
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792
793    #[test]
794    fn test_parallel_vector_source() {
795        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
796        let source = ParallelVectorSource::single_column(values);
797
798        assert_eq!(source.total_rows(), Some(100));
799        assert!(source.is_partitionable());
800        assert_eq!(source.num_columns(), 1);
801
802        let morsels = source.generate_morsels(30, 0);
803        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
804    }
805
806    #[test]
807    fn test_parallel_vector_source_partition() {
808        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
809        let source = ParallelVectorSource::single_column(values);
810
811        let morsel = Morsel::new(0, 0, 20, 50);
812        let mut partition = source.create_partition(&morsel);
813
814        // Should produce 30 rows total
815        let mut total = 0;
816        while let Ok(Some(chunk)) = partition.next_chunk(10) {
817            total += chunk.len();
818        }
819        assert_eq!(total, 30);
820    }
821
822    #[test]
823    fn test_range_source() {
824        let source = RangeSource::new(100);
825
826        assert_eq!(source.total_rows(), Some(100));
827        assert!(source.is_partitionable());
828
829        let morsels = source.generate_morsels(25, 0);
830        assert_eq!(morsels.len(), 4);
831    }
832
833    #[test]
834    fn test_range_source_partition() {
835        let source = RangeSource::new(100);
836
837        let morsel = Morsel::new(0, 0, 10, 30);
838        let mut partition = source.create_partition(&morsel);
839
840        let chunk = partition.next_chunk(100).unwrap().unwrap();
841        assert_eq!(chunk.len(), 20);
842
843        // Verify values are in range [10, 30)
844        let col = chunk.column(0).unwrap();
845        assert_eq!(col.get(0), Some(Value::Int64(10)));
846        assert_eq!(col.get(19), Some(Value::Int64(29)));
847    }
848
849    #[test]
850    fn test_parallel_chunk_source() {
851        let chunks: Vec<DataChunk> = (0..5)
852            .map(|i| {
853                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
854                DataChunk::new(vec![ValueVector::from_values(&values)])
855            })
856            .collect();
857
858        let source = ParallelChunkSource::new(chunks);
859        assert_eq!(source.total_rows(), Some(50));
860        assert_eq!(source.num_columns(), 1);
861    }
862
863    #[test]
864    fn test_parallel_chunk_source_partition() {
865        let chunks: Vec<DataChunk> = (0..5)
866            .map(|i| {
867                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
868                DataChunk::new(vec![ValueVector::from_values(&values)])
869            })
870            .collect();
871
872        let source = ParallelChunkSource::new(chunks);
873
874        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
875        let morsel = Morsel::new(0, 0, 15, 35);
876        let mut partition = source.create_partition(&morsel);
877
878        let mut total = 0;
879        let mut first_value: Option<i64> = None;
880        let mut last_value: Option<i64> = None;
881
882        while let Ok(Some(chunk)) = partition.next_chunk(10) {
883            if first_value.is_none()
884                && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
885            {
886                first_value = Some(v);
887            }
888            if let Some(Value::Int64(v)) = chunk
889                .column(0)
890                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
891            {
892                last_value = Some(v);
893            }
894            total += chunk.len();
895        }
896
897        assert_eq!(total, 20);
898        assert_eq!(first_value, Some(15));
899        assert_eq!(last_value, Some(34));
900    }
901
902    #[test]
903    fn test_partitioned_source_reset() {
904        let source = RangeSource::new(100);
905        let morsel = Morsel::new(0, 0, 0, 50);
906        let mut partition = source.create_partition(&morsel);
907
908        // Exhaust partition
909        while partition.next_chunk(100).unwrap().is_some() {}
910
911        // Reset and read again
912        partition.reset();
913        let chunk = partition.next_chunk(100).unwrap().unwrap();
914        assert_eq!(chunk.len(), 50);
915    }
916
917    #[cfg(feature = "rdf")]
918    #[test]
919    fn test_parallel_triple_scan_source() {
920        let triples = vec![
921            (
922                Value::String("s1".into()),
923                Value::String("p1".into()),
924                Value::String("o1".into()),
925            ),
926            (
927                Value::String("s2".into()),
928                Value::String("p2".into()),
929                Value::String("o2".into()),
930            ),
931            (
932                Value::String("s3".into()),
933                Value::String("p3".into()),
934                Value::String("o3".into()),
935            ),
936        ];
937        let source =
938            ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
939
940        assert_eq!(source.total_rows(), Some(3));
941        assert!(source.is_partitionable());
942        assert_eq!(source.num_columns(), 3);
943    }
944
945    #[cfg(feature = "rdf")]
946    #[test]
947    fn test_parallel_triple_scan_partition() {
948        let triples: Vec<(Value, Value, Value)> = (0..100)
949            .map(|i| {
950                (
951                    Value::String(format!("s{}", i).into()),
952                    Value::String(format!("p{}", i).into()),
953                    Value::String(format!("o{}", i).into()),
954                )
955            })
956            .collect();
957        let source =
958            ParallelTripleScanSource::new(triples, vec!["s".into(), "p".into(), "o".into()]);
959
960        let morsel = Morsel::new(0, 0, 20, 50);
961        let mut partition = source.create_partition(&morsel);
962
963        let mut total = 0;
964        while let Ok(Some(chunk)) = partition.next_chunk(10) {
965            total += chunk.len();
966        }
967        assert_eq!(total, 30);
968    }
969
970    #[test]
971    fn test_parallel_node_scan_source() {
972        let store = Arc::new(LpgStore::new());
973
974        // Add some nodes with labels
975        for i in 0..100 {
976            if i % 2 == 0 {
977                store.create_node(&["Person", "Employee"]);
978            } else {
979                store.create_node(&["Person"]);
980            }
981        }
982
983        // Test scan all nodes
984        let source = ParallelNodeScanSource::new(Arc::clone(&store));
985        assert_eq!(source.total_rows(), Some(100));
986        assert!(source.is_partitionable());
987        assert_eq!(source.num_columns(), 1);
988
989        // Test scan by label
990        let source_person = ParallelNodeScanSource::with_label(Arc::clone(&store), "Person");
991        assert_eq!(source_person.total_rows(), Some(100));
992
993        let source_employee = ParallelNodeScanSource::with_label(Arc::clone(&store), "Employee");
994        assert_eq!(source_employee.total_rows(), Some(50));
995    }
996
997    #[test]
998    fn test_parallel_node_scan_partition() {
999        let store = Arc::new(LpgStore::new());
1000
1001        // Add 100 nodes
1002        for _ in 0..100 {
1003            store.create_node(&[]);
1004        }
1005
1006        let source = ParallelNodeScanSource::new(Arc::clone(&store));
1007
1008        // Create partition for rows 20-50
1009        let morsel = Morsel::new(0, 0, 20, 50);
1010        let mut partition = source.create_partition(&morsel);
1011
1012        // Should produce 30 rows total
1013        let mut total = 0;
1014        while let Ok(Some(chunk)) = partition.next_chunk(10) {
1015            total += chunk.len();
1016        }
1017        assert_eq!(total, 30);
1018    }
1019
1020    #[test]
1021    fn test_parallel_node_scan_morsels() {
1022        let store = Arc::new(LpgStore::new());
1023
1024        // Add 1000 nodes
1025        for _ in 0..1000 {
1026            store.create_node(&[]);
1027        }
1028
1029        let source = ParallelNodeScanSource::new(Arc::clone(&store));
1030
1031        // Generate morsels with size 256
1032        let morsels = source.generate_morsels(256, 0);
1033        assert_eq!(morsels.len(), 4); // 1000 / 256 = 3 full + 1 partial
1034
1035        // Verify morsels cover all rows
1036        let mut total_rows = 0;
1037        for morsel in &morsels {
1038            total_rows += morsel.end_row - morsel.start_row;
1039        }
1040        assert_eq!(total_rows, 1000);
1041    }
1042}