Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Cumulative row count at each chunk start.
188    cumulative_rows: Vec<usize>,
189    /// Total row count.
190    total_rows: usize,
191    /// Current chunk index.
192    chunk_index: usize,
193    /// Number of columns.
194    num_columns: usize,
195}
196
197impl ParallelChunkSource {
198    /// Creates a new parallel chunk source.
199    #[must_use]
200    pub fn new(chunks: Vec<DataChunk>) -> Self {
201        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202        let mut sum = 0;
203        cumulative_rows.push(0);
204        for chunk in &chunks {
205            sum += chunk.len();
206            cumulative_rows.push(sum);
207        }
208
209        let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211        Self {
212            chunks: Arc::new(chunks),
213            cumulative_rows,
214            total_rows: sum,
215            chunk_index: 0,
216            num_columns,
217        }
218    }
219}
220
221impl Source for ParallelChunkSource {
222    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223        if self.chunk_index >= self.chunks.len() {
224            return Ok(None);
225        }
226
227        let chunk = self.chunks[self.chunk_index].clone();
228        self.chunk_index += 1;
229        Ok(Some(chunk))
230    }
231
232    fn reset(&mut self) {
233        self.chunk_index = 0;
234    }
235
236    fn name(&self) -> &'static str {
237        "ParallelChunkSource"
238    }
239}
240
241impl ParallelSource for ParallelChunkSource {
242    fn total_rows(&self) -> Option<usize> {
243        Some(self.total_rows)
244    }
245
246    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247        Box::new(PartitionedChunkSource::new(
248            Arc::clone(&self.chunks),
249            self.cumulative_rows.clone(),
250            morsel.start_row,
251            morsel.end_row,
252        ))
253    }
254
255    fn num_columns(&self) -> usize {
256        self.num_columns
257    }
258}
259
260/// A partitioned view into a chunk source.
261struct PartitionedChunkSource {
262    chunks: Arc<Vec<DataChunk>>,
263    cumulative_rows: Vec<usize>,
264    start_row: usize,
265    end_row: usize,
266    current_row: usize,
267}
268
269impl PartitionedChunkSource {
270    fn new(
271        chunks: Arc<Vec<DataChunk>>,
272        cumulative_rows: Vec<usize>,
273        start_row: usize,
274        end_row: usize,
275    ) -> Self {
276        Self {
277            chunks,
278            cumulative_rows,
279            start_row,
280            end_row,
281            current_row: start_row,
282        }
283    }
284
285    /// Finds the chunk index containing the given row.
286    fn find_chunk_index(&self, row: usize) -> Option<usize> {
287        // Binary search for the chunk containing this row
288        match self
289            .cumulative_rows
290            .binary_search_by(|&cumul| cumul.cmp(&row))
291        {
292            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293            Err(idx) => {
294                if idx == 0 {
295                    Some(0)
296                } else {
297                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298                }
299            }
300        }
301    }
302}
303
304impl Source for PartitionedChunkSource {
305    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306        if self.current_row >= self.end_row || self.chunks.is_empty() {
307            return Ok(None);
308        }
309
310        // Find the chunk containing current_row
311        let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312            return Ok(None);
313        };
314
315        if chunk_idx >= self.chunks.len() {
316            return Ok(None);
317        }
318
319        let chunk_start = self.cumulative_rows[chunk_idx];
320        let chunk = &self.chunks[chunk_idx];
321        let offset_in_chunk = self.current_row - chunk_start;
322
323        // Calculate how many rows to extract
324        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325        let rows_to_end = self.end_row.saturating_sub(self.current_row);
326        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328        if rows_to_extract == 0 {
329            return Ok(None);
330        }
331
332        // Extract slice from chunk
333        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334        self.current_row += rows_to_extract;
335
336        Ok(Some(sliced))
337    }
338
339    fn reset(&mut self) {
340        self.current_row = self.start_row;
341    }
342
343    fn name(&self) -> &'static str {
344        "PartitionedChunkSource"
345    }
346}
347
348/// Generates a range source for parallel execution testing.
349///
350/// Produces integers from 0 to n-1 in a single column.
351pub struct RangeSource {
352    total: usize,
353    position: usize,
354}
355
356impl RangeSource {
357    /// Creates a new range source.
358    #[must_use]
359    pub fn new(total: usize) -> Self {
360        Self { total, position: 0 }
361    }
362}
363
364impl Source for RangeSource {
365    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366        if self.position >= self.total {
367            return Ok(None);
368        }
369
370        let end = (self.position + chunk_size).min(self.total);
371        let values: Vec<Value> = (self.position..end)
372            .map(|i| Value::Int64(i as i64))
373            .collect();
374
375        self.position = end;
376        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
377            &values,
378        )])))
379    }
380
381    fn reset(&mut self) {
382        self.position = 0;
383    }
384
385    fn name(&self) -> &'static str {
386        "RangeSource"
387    }
388}
389
390impl ParallelSource for RangeSource {
391    fn total_rows(&self) -> Option<usize> {
392        Some(self.total)
393    }
394
395    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
396        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
397    }
398
399    fn num_columns(&self) -> usize {
400        1
401    }
402}
403
404/// A partition of a range source.
405struct RangePartition {
406    start: usize,
407    end: usize,
408    position: usize,
409}
410
411impl RangePartition {
412    fn new(start: usize, end: usize) -> Self {
413        Self {
414            start,
415            end,
416            position: start,
417        }
418    }
419}
420
421impl Source for RangePartition {
422    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
423        if self.position >= self.end {
424            return Ok(None);
425        }
426
427        let end = (self.position + chunk_size).min(self.end);
428        let values: Vec<Value> = (self.position..end)
429            .map(|i| Value::Int64(i as i64))
430            .collect();
431
432        self.position = end;
433        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
434            &values,
435        )])))
436    }
437
438    fn reset(&mut self) {
439        self.position = self.start;
440    }
441
442    fn name(&self) -> &'static str {
443        "RangePartition"
444    }
445}
446
447/// Parallel source for RDF triple scanning.
448///
449/// Wraps triple data in a parallel source that can be partitioned for
450/// morsel-driven execution of SPARQL queries.
451#[cfg(feature = "rdf")]
452pub struct ParallelTripleScanSource {
453    /// Triple data: (subject, predicate, object) tuples.
454    triples: Arc<Vec<(Value, Value, Value)>>,
455    /// Current read position.
456    position: usize,
457}
458
459#[cfg(feature = "rdf")]
460impl ParallelTripleScanSource {
461    /// Creates a new parallel triple scan source.
462    #[must_use]
463    pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
464        Self {
465            triples: Arc::new(triples),
466            position: 0,
467        }
468    }
469
470    /// Creates from an iterator of triples.
471    pub fn from_triples<I>(iter: I) -> Self
472    where
473        I: IntoIterator<Item = (Value, Value, Value)>,
474    {
475        Self::new(iter.into_iter().collect())
476    }
477}
478
479#[cfg(feature = "rdf")]
480impl Source for ParallelTripleScanSource {
481    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
482        if self.position >= self.triples.len() {
483            return Ok(None);
484        }
485
486        let end = (self.position + chunk_size).min(self.triples.len());
487        let slice = &self.triples[self.position..end];
488
489        let mut subjects = Vec::with_capacity(slice.len());
490        let mut predicates = Vec::with_capacity(slice.len());
491        let mut objects = Vec::with_capacity(slice.len());
492
493        for (s, p, o) in slice {
494            subjects.push(s.clone());
495            predicates.push(p.clone());
496            objects.push(o.clone());
497        }
498
499        let columns = vec![
500            ValueVector::from_values(&subjects),
501            ValueVector::from_values(&predicates),
502            ValueVector::from_values(&objects),
503        ];
504
505        self.position = end;
506        Ok(Some(DataChunk::new(columns)))
507    }
508
509    fn reset(&mut self) {
510        self.position = 0;
511    }
512
513    fn name(&self) -> &'static str {
514        "ParallelTripleScanSource"
515    }
516}
517
518#[cfg(feature = "rdf")]
519impl ParallelSource for ParallelTripleScanSource {
520    fn total_rows(&self) -> Option<usize> {
521        Some(self.triples.len())
522    }
523
524    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
525        Box::new(PartitionedTripleScanSource::new(
526            Arc::clone(&self.triples),
527            morsel.start_row,
528            morsel.end_row,
529        ))
530    }
531
532    fn num_columns(&self) -> usize {
533        3 // subject, predicate, object
534    }
535}
536
537/// A partitioned view into a triple scan source.
538#[cfg(feature = "rdf")]
539struct PartitionedTripleScanSource {
540    triples: Arc<Vec<(Value, Value, Value)>>,
541    start_row: usize,
542    end_row: usize,
543    position: usize,
544}
545
546#[cfg(feature = "rdf")]
547impl PartitionedTripleScanSource {
548    fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
549        Self {
550            triples,
551            start_row,
552            end_row,
553            position: start_row,
554        }
555    }
556}
557
558#[cfg(feature = "rdf")]
559impl Source for PartitionedTripleScanSource {
560    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
561        if self.position >= self.end_row || self.position >= self.triples.len() {
562            return Ok(None);
563        }
564
565        let end = (self.position + chunk_size)
566            .min(self.end_row)
567            .min(self.triples.len());
568        let slice = &self.triples[self.position..end];
569
570        let mut subjects = Vec::with_capacity(slice.len());
571        let mut predicates = Vec::with_capacity(slice.len());
572        let mut objects = Vec::with_capacity(slice.len());
573
574        for (s, p, o) in slice {
575            subjects.push(s.clone());
576            predicates.push(p.clone());
577            objects.push(o.clone());
578        }
579
580        let columns = vec![
581            ValueVector::from_values(&subjects),
582            ValueVector::from_values(&predicates),
583            ValueVector::from_values(&objects),
584        ];
585
586        self.position = end;
587        Ok(Some(DataChunk::new(columns)))
588    }
589
590    fn reset(&mut self) {
591        self.position = self.start_row;
592    }
593
594    fn name(&self) -> &'static str {
595        "PartitionedTripleScanSource"
596    }
597}
598
599// ---------------------------------------------------------------------------
600// Parallel Node Scan Source (LPG)
601// ---------------------------------------------------------------------------
602
603use crate::graph::lpg::LpgStore;
604use grafeo_common::types::NodeId;
605
606/// Parallel source for scanning nodes from the LPG store.
607///
608/// Enables morsel-driven parallel execution of node scans by label.
609/// Each partition independently scans a range of node IDs, enabling
610/// linear scaling on multi-core systems for large datasets.
611///
612/// # Example
613///
614/// ```ignore
615/// use grafeo_core::execution::parallel::{ParallelNodeScanSource, ParallelPipeline};
616/// use std::sync::Arc;
617///
618/// let store = Arc::new(LpgStore::new());
619/// // ... populate store ...
620///
621/// // Scan all Person nodes in parallel
622/// let source = ParallelNodeScanSource::with_label(store, "Person");
623/// let morsels = source.generate_morsels(4096, 0);
624/// ```
625pub struct ParallelNodeScanSource {
626    /// The store to scan from.
627    store: Arc<LpgStore>,
628    /// Cached node IDs for the scan.
629    node_ids: Arc<Vec<NodeId>>,
630    /// Current read position.
631    position: usize,
632}
633
634impl ParallelNodeScanSource {
635    /// Creates a parallel source for all nodes in the store.
636    #[must_use]
637    pub fn new(store: Arc<LpgStore>) -> Self {
638        let node_ids = Arc::new(store.node_ids());
639        Self {
640            store,
641            node_ids,
642            position: 0,
643        }
644    }
645
646    /// Creates a parallel source for nodes with a specific label.
647    #[must_use]
648    pub fn with_label(store: Arc<LpgStore>, label: &str) -> Self {
649        let node_ids = Arc::new(store.nodes_by_label(label));
650        Self {
651            store,
652            node_ids,
653            position: 0,
654        }
655    }
656
657    /// Creates from pre-computed node IDs.
658    ///
659    /// Useful when node IDs are already available from a previous operation.
660    #[must_use]
661    pub fn from_node_ids(store: Arc<LpgStore>, node_ids: Vec<NodeId>) -> Self {
662        Self {
663            store,
664            node_ids: Arc::new(node_ids),
665            position: 0,
666        }
667    }
668
669    /// Returns the underlying store reference.
670    #[must_use]
671    pub fn store(&self) -> &Arc<LpgStore> {
672        &self.store
673    }
674}
675
676impl Source for ParallelNodeScanSource {
677    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
678        if self.position >= self.node_ids.len() {
679            return Ok(None);
680        }
681
682        let end = (self.position + chunk_size).min(self.node_ids.len());
683        let slice = &self.node_ids[self.position..end];
684
685        // Create a NodeId vector
686        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
687        for &id in slice {
688            vector.push_node_id(id);
689        }
690
691        self.position = end;
692        Ok(Some(DataChunk::new(vec![vector])))
693    }
694
695    fn reset(&mut self) {
696        self.position = 0;
697    }
698
699    fn name(&self) -> &'static str {
700        "ParallelNodeScanSource"
701    }
702}
703
704impl ParallelSource for ParallelNodeScanSource {
705    fn total_rows(&self) -> Option<usize> {
706        Some(self.node_ids.len())
707    }
708
709    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
710        Box::new(PartitionedNodeScanSource::new(
711            Arc::clone(&self.node_ids),
712            morsel.start_row,
713            morsel.end_row,
714        ))
715    }
716
717    fn num_columns(&self) -> usize {
718        1 // Node ID column
719    }
720}
721
722/// A partitioned view into a node scan source.
723struct PartitionedNodeScanSource {
724    node_ids: Arc<Vec<NodeId>>,
725    start_row: usize,
726    end_row: usize,
727    position: usize,
728}
729
730impl PartitionedNodeScanSource {
731    fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
732        Self {
733            node_ids,
734            start_row,
735            end_row,
736            position: start_row,
737        }
738    }
739}
740
741impl Source for PartitionedNodeScanSource {
742    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
743        if self.position >= self.end_row || self.position >= self.node_ids.len() {
744            return Ok(None);
745        }
746
747        let end = (self.position + chunk_size)
748            .min(self.end_row)
749            .min(self.node_ids.len());
750        let slice = &self.node_ids[self.position..end];
751
752        // Create a NodeId vector
753        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
754        for &id in slice {
755            vector.push_node_id(id);
756        }
757
758        self.position = end;
759        Ok(Some(DataChunk::new(vec![vector])))
760    }
761
762    fn reset(&mut self) {
763        self.position = self.start_row;
764    }
765
766    fn name(&self) -> &'static str {
767        "PartitionedNodeScanSource"
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774
775    #[test]
776    fn test_parallel_vector_source() {
777        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
778        let source = ParallelVectorSource::single_column(values);
779
780        assert_eq!(source.total_rows(), Some(100));
781        assert!(source.is_partitionable());
782        assert_eq!(source.num_columns(), 1);
783
784        let morsels = source.generate_morsels(30, 0);
785        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
786    }
787
788    #[test]
789    fn test_parallel_vector_source_partition() {
790        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
791        let source = ParallelVectorSource::single_column(values);
792
793        let morsel = Morsel::new(0, 0, 20, 50);
794        let mut partition = source.create_partition(&morsel);
795
796        // Should produce 30 rows total
797        let mut total = 0;
798        while let Ok(Some(chunk)) = partition.next_chunk(10) {
799            total += chunk.len();
800        }
801        assert_eq!(total, 30);
802    }
803
804    #[test]
805    fn test_range_source() {
806        let source = RangeSource::new(100);
807
808        assert_eq!(source.total_rows(), Some(100));
809        assert!(source.is_partitionable());
810
811        let morsels = source.generate_morsels(25, 0);
812        assert_eq!(morsels.len(), 4);
813    }
814
815    #[test]
816    fn test_range_source_partition() {
817        let source = RangeSource::new(100);
818
819        let morsel = Morsel::new(0, 0, 10, 30);
820        let mut partition = source.create_partition(&morsel);
821
822        let chunk = partition.next_chunk(100).unwrap().unwrap();
823        assert_eq!(chunk.len(), 20);
824
825        // Verify values are in range [10, 30)
826        let col = chunk.column(0).unwrap();
827        assert_eq!(col.get(0), Some(Value::Int64(10)));
828        assert_eq!(col.get(19), Some(Value::Int64(29)));
829    }
830
831    #[test]
832    fn test_parallel_chunk_source() {
833        let chunks: Vec<DataChunk> = (0..5)
834            .map(|i| {
835                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
836                DataChunk::new(vec![ValueVector::from_values(&values)])
837            })
838            .collect();
839
840        let source = ParallelChunkSource::new(chunks);
841        assert_eq!(source.total_rows(), Some(50));
842        assert_eq!(source.num_columns(), 1);
843    }
844
845    #[test]
846    fn test_parallel_chunk_source_partition() {
847        let chunks: Vec<DataChunk> = (0..5)
848            .map(|i| {
849                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
850                DataChunk::new(vec![ValueVector::from_values(&values)])
851            })
852            .collect();
853
854        let source = ParallelChunkSource::new(chunks);
855
856        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
857        let morsel = Morsel::new(0, 0, 15, 35);
858        let mut partition = source.create_partition(&morsel);
859
860        let mut total = 0;
861        let mut first_value: Option<i64> = None;
862        let mut last_value: Option<i64> = None;
863
864        while let Ok(Some(chunk)) = partition.next_chunk(10) {
865            if first_value.is_none()
866                && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
867            {
868                first_value = Some(v);
869            }
870            if let Some(Value::Int64(v)) = chunk
871                .column(0)
872                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
873            {
874                last_value = Some(v);
875            }
876            total += chunk.len();
877        }
878
879        assert_eq!(total, 20);
880        assert_eq!(first_value, Some(15));
881        assert_eq!(last_value, Some(34));
882    }
883
884    #[test]
885    fn test_partitioned_source_reset() {
886        let source = RangeSource::new(100);
887        let morsel = Morsel::new(0, 0, 0, 50);
888        let mut partition = source.create_partition(&morsel);
889
890        // Exhaust partition
891        while partition.next_chunk(100).unwrap().is_some() {}
892
893        // Reset and read again
894        partition.reset();
895        let chunk = partition.next_chunk(100).unwrap().unwrap();
896        assert_eq!(chunk.len(), 50);
897    }
898
899    #[cfg(feature = "rdf")]
900    #[test]
901    fn test_parallel_triple_scan_source() {
902        let triples = vec![
903            (
904                Value::String("s1".into()),
905                Value::String("p1".into()),
906                Value::String("o1".into()),
907            ),
908            (
909                Value::String("s2".into()),
910                Value::String("p2".into()),
911                Value::String("o2".into()),
912            ),
913            (
914                Value::String("s3".into()),
915                Value::String("p3".into()),
916                Value::String("o3".into()),
917            ),
918        ];
919        let source = ParallelTripleScanSource::new(triples);
920
921        assert_eq!(source.total_rows(), Some(3));
922        assert!(source.is_partitionable());
923        assert_eq!(source.num_columns(), 3);
924    }
925
926    #[cfg(feature = "rdf")]
927    #[test]
928    fn test_parallel_triple_scan_partition() {
929        let triples: Vec<(Value, Value, Value)> = (0..100)
930            .map(|i| {
931                (
932                    Value::String(format!("s{}", i).into()),
933                    Value::String(format!("p{}", i).into()),
934                    Value::String(format!("o{}", i).into()),
935                )
936            })
937            .collect();
938        let source = ParallelTripleScanSource::new(triples);
939
940        let morsel = Morsel::new(0, 0, 20, 50);
941        let mut partition = source.create_partition(&morsel);
942
943        let mut total = 0;
944        while let Ok(Some(chunk)) = partition.next_chunk(10) {
945            total += chunk.len();
946        }
947        assert_eq!(total, 30);
948    }
949
950    #[test]
951    fn test_parallel_node_scan_source() {
952        let store = Arc::new(LpgStore::new());
953
954        // Add some nodes with labels
955        for i in 0..100 {
956            if i % 2 == 0 {
957                store.create_node(&["Person", "Employee"]);
958            } else {
959                store.create_node(&["Person"]);
960            }
961        }
962
963        // Test scan all nodes
964        let source = ParallelNodeScanSource::new(Arc::clone(&store));
965        assert_eq!(source.total_rows(), Some(100));
966        assert!(source.is_partitionable());
967        assert_eq!(source.num_columns(), 1);
968
969        // Test scan by label
970        let source_person = ParallelNodeScanSource::with_label(Arc::clone(&store), "Person");
971        assert_eq!(source_person.total_rows(), Some(100));
972
973        let source_employee = ParallelNodeScanSource::with_label(Arc::clone(&store), "Employee");
974        assert_eq!(source_employee.total_rows(), Some(50));
975    }
976
977    #[test]
978    fn test_parallel_node_scan_partition() {
979        let store = Arc::new(LpgStore::new());
980
981        // Add 100 nodes
982        for _ in 0..100 {
983            store.create_node(&[]);
984        }
985
986        let source = ParallelNodeScanSource::new(Arc::clone(&store));
987
988        // Create partition for rows 20-50
989        let morsel = Morsel::new(0, 0, 20, 50);
990        let mut partition = source.create_partition(&morsel);
991
992        // Should produce 30 rows total
993        let mut total = 0;
994        while let Ok(Some(chunk)) = partition.next_chunk(10) {
995            total += chunk.len();
996        }
997        assert_eq!(total, 30);
998    }
999
1000    #[test]
1001    fn test_parallel_node_scan_morsels() {
1002        let store = Arc::new(LpgStore::new());
1003
1004        // Add 1000 nodes
1005        for _ in 0..1000 {
1006            store.create_node(&[]);
1007        }
1008
1009        let source = ParallelNodeScanSource::new(Arc::clone(&store));
1010
1011        // Generate morsels with size 256
1012        let morsels = source.generate_morsels(256, 0);
1013        assert_eq!(morsels.len(), 4); // 1000 / 256 = 3 full + 1 partial
1014
1015        // Verify morsels cover all rows
1016        let mut total_rows = 0;
1017        for morsel in &morsels {
1018            total_rows += morsel.end_row - morsel.start_row;
1019        }
1020        assert_eq!(total_rows, 1000);
1021    }
1022}