Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Cumulative row count at each chunk start.
188    cumulative_rows: Vec<usize>,
189    /// Total row count.
190    total_rows: usize,
191    /// Current chunk index.
192    chunk_index: usize,
193    /// Number of columns.
194    num_columns: usize,
195}
196
197impl ParallelChunkSource {
198    /// Creates a new parallel chunk source.
199    #[must_use]
200    pub fn new(chunks: Vec<DataChunk>) -> Self {
201        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202        let mut sum = 0;
203        cumulative_rows.push(0);
204        for chunk in &chunks {
205            sum += chunk.len();
206            cumulative_rows.push(sum);
207        }
208
209        let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211        Self {
212            chunks: Arc::new(chunks),
213            cumulative_rows,
214            total_rows: sum,
215            chunk_index: 0,
216            num_columns,
217        }
218    }
219}
220
221impl Source for ParallelChunkSource {
222    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223        if self.chunk_index >= self.chunks.len() {
224            return Ok(None);
225        }
226
227        let chunk = self.chunks[self.chunk_index].clone();
228        self.chunk_index += 1;
229        Ok(Some(chunk))
230    }
231
232    fn reset(&mut self) {
233        self.chunk_index = 0;
234    }
235
236    fn name(&self) -> &'static str {
237        "ParallelChunkSource"
238    }
239}
240
241impl ParallelSource for ParallelChunkSource {
242    fn total_rows(&self) -> Option<usize> {
243        Some(self.total_rows)
244    }
245
246    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247        Box::new(PartitionedChunkSource::new(
248            Arc::clone(&self.chunks),
249            self.cumulative_rows.clone(),
250            morsel.start_row,
251            morsel.end_row,
252        ))
253    }
254
255    fn num_columns(&self) -> usize {
256        self.num_columns
257    }
258}
259
260/// A partitioned view into a chunk source.
261struct PartitionedChunkSource {
262    chunks: Arc<Vec<DataChunk>>,
263    cumulative_rows: Vec<usize>,
264    start_row: usize,
265    end_row: usize,
266    current_row: usize,
267}
268
269impl PartitionedChunkSource {
270    fn new(
271        chunks: Arc<Vec<DataChunk>>,
272        cumulative_rows: Vec<usize>,
273        start_row: usize,
274        end_row: usize,
275    ) -> Self {
276        Self {
277            chunks,
278            cumulative_rows,
279            start_row,
280            end_row,
281            current_row: start_row,
282        }
283    }
284
285    /// Finds the chunk index containing the given row.
286    fn find_chunk_index(&self, row: usize) -> Option<usize> {
287        // Binary search for the chunk containing this row
288        match self
289            .cumulative_rows
290            .binary_search_by(|&cumul| cumul.cmp(&row))
291        {
292            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293            Err(idx) => {
294                if idx == 0 {
295                    Some(0)
296                } else {
297                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298                }
299            }
300        }
301    }
302}
303
304impl Source for PartitionedChunkSource {
305    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306        if self.current_row >= self.end_row || self.chunks.is_empty() {
307            return Ok(None);
308        }
309
310        // Find the chunk containing current_row
311        let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312            return Ok(None);
313        };
314
315        if chunk_idx >= self.chunks.len() {
316            return Ok(None);
317        }
318
319        let chunk_start = self.cumulative_rows[chunk_idx];
320        let chunk = &self.chunks[chunk_idx];
321        let offset_in_chunk = self.current_row - chunk_start;
322
323        // Calculate how many rows to extract
324        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325        let rows_to_end = self.end_row.saturating_sub(self.current_row);
326        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328        if rows_to_extract == 0 {
329            return Ok(None);
330        }
331
332        // Extract slice from chunk
333        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334        self.current_row += rows_to_extract;
335
336        Ok(Some(sliced))
337    }
338
339    fn reset(&mut self) {
340        self.current_row = self.start_row;
341    }
342
343    fn name(&self) -> &'static str {
344        "PartitionedChunkSource"
345    }
346}
347
348/// Generates a range source for parallel execution testing.
349///
350/// Produces integers from 0 to n-1 in a single column.
351pub struct RangeSource {
352    total: usize,
353    position: usize,
354}
355
356impl RangeSource {
357    /// Creates a new range source.
358    #[must_use]
359    pub fn new(total: usize) -> Self {
360        Self { total, position: 0 }
361    }
362}
363
364impl Source for RangeSource {
365    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366        if self.position >= self.total {
367            return Ok(None);
368        }
369
370        let end = (self.position + chunk_size).min(self.total);
371        let values: Vec<Value> = (self.position..end)
372            .map(|i| Value::Int64(i as i64))
373            .collect();
374
375        self.position = end;
376        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
377            &values,
378        )])))
379    }
380
381    fn reset(&mut self) {
382        self.position = 0;
383    }
384
385    fn name(&self) -> &'static str {
386        "RangeSource"
387    }
388}
389
390impl ParallelSource for RangeSource {
391    fn total_rows(&self) -> Option<usize> {
392        Some(self.total)
393    }
394
395    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
396        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
397    }
398
399    fn num_columns(&self) -> usize {
400        1
401    }
402}
403
404/// A partition of a range source.
405struct RangePartition {
406    start: usize,
407    end: usize,
408    position: usize,
409}
410
411impl RangePartition {
412    fn new(start: usize, end: usize) -> Self {
413        Self {
414            start,
415            end,
416            position: start,
417        }
418    }
419}
420
421impl Source for RangePartition {
422    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
423        if self.position >= self.end {
424            return Ok(None);
425        }
426
427        let end = (self.position + chunk_size).min(self.end);
428        let values: Vec<Value> = (self.position..end)
429            .map(|i| Value::Int64(i as i64))
430            .collect();
431
432        self.position = end;
433        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
434            &values,
435        )])))
436    }
437
438    fn reset(&mut self) {
439        self.position = self.start;
440    }
441
442    fn name(&self) -> &'static str {
443        "RangePartition"
444    }
445}
446
447/// Parallel source for RDF triple scanning.
448///
449/// Wraps triple data in a parallel source that can be partitioned for
450/// morsel-driven execution of SPARQL queries.
451#[cfg(feature = "rdf")]
452pub struct ParallelTripleScanSource {
453    /// Triple data: (subject, predicate, object) tuples.
454    triples: Arc<Vec<(Value, Value, Value)>>,
455    /// Current read position.
456    position: usize,
457}
458
459#[cfg(feature = "rdf")]
460impl ParallelTripleScanSource {
461    /// Creates a new parallel triple scan source.
462    #[must_use]
463    pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
464        Self {
465            triples: Arc::new(triples),
466            position: 0,
467        }
468    }
469
470    /// Creates from an iterator of triples.
471    pub fn from_triples<I>(iter: I) -> Self
472    where
473        I: IntoIterator<Item = (Value, Value, Value)>,
474    {
475        Self::new(iter.into_iter().collect())
476    }
477}
478
479#[cfg(feature = "rdf")]
480impl Source for ParallelTripleScanSource {
481    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
482        if self.position >= self.triples.len() {
483            return Ok(None);
484        }
485
486        let end = (self.position + chunk_size).min(self.triples.len());
487        let slice = &self.triples[self.position..end];
488
489        let mut subjects = Vec::with_capacity(slice.len());
490        let mut predicates = Vec::with_capacity(slice.len());
491        let mut objects = Vec::with_capacity(slice.len());
492
493        for (s, p, o) in slice {
494            subjects.push(s.clone());
495            predicates.push(p.clone());
496            objects.push(o.clone());
497        }
498
499        let columns = vec![
500            ValueVector::from_values(&subjects),
501            ValueVector::from_values(&predicates),
502            ValueVector::from_values(&objects),
503        ];
504
505        self.position = end;
506        Ok(Some(DataChunk::new(columns)))
507    }
508
509    fn reset(&mut self) {
510        self.position = 0;
511    }
512
513    fn name(&self) -> &'static str {
514        "ParallelTripleScanSource"
515    }
516}
517
518#[cfg(feature = "rdf")]
519impl ParallelSource for ParallelTripleScanSource {
520    fn total_rows(&self) -> Option<usize> {
521        Some(self.triples.len())
522    }
523
524    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
525        Box::new(PartitionedTripleScanSource::new(
526            Arc::clone(&self.triples),
527            morsel.start_row,
528            morsel.end_row,
529        ))
530    }
531
532    fn num_columns(&self) -> usize {
533        3 // subject, predicate, object
534    }
535}
536
537/// A partitioned view into a triple scan source.
538#[cfg(feature = "rdf")]
539struct PartitionedTripleScanSource {
540    triples: Arc<Vec<(Value, Value, Value)>>,
541    start_row: usize,
542    end_row: usize,
543    position: usize,
544}
545
546#[cfg(feature = "rdf")]
547impl PartitionedTripleScanSource {
548    fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
549        Self {
550            triples,
551            start_row,
552            end_row,
553            position: start_row,
554        }
555    }
556}
557
558#[cfg(feature = "rdf")]
559impl Source for PartitionedTripleScanSource {
560    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
561        if self.position >= self.end_row || self.position >= self.triples.len() {
562            return Ok(None);
563        }
564
565        let end = (self.position + chunk_size)
566            .min(self.end_row)
567            .min(self.triples.len());
568        let slice = &self.triples[self.position..end];
569
570        let mut subjects = Vec::with_capacity(slice.len());
571        let mut predicates = Vec::with_capacity(slice.len());
572        let mut objects = Vec::with_capacity(slice.len());
573
574        for (s, p, o) in slice {
575            subjects.push(s.clone());
576            predicates.push(p.clone());
577            objects.push(o.clone());
578        }
579
580        let columns = vec![
581            ValueVector::from_values(&subjects),
582            ValueVector::from_values(&predicates),
583            ValueVector::from_values(&objects),
584        ];
585
586        self.position = end;
587        Ok(Some(DataChunk::new(columns)))
588    }
589
590    fn reset(&mut self) {
591        self.position = self.start_row;
592    }
593
594    fn name(&self) -> &'static str {
595        "PartitionedTripleScanSource"
596    }
597}
598
599// ---------------------------------------------------------------------------
600// Parallel Node Scan Source (LPG)
601// ---------------------------------------------------------------------------
602
603use crate::graph::lpg::LpgStore;
604use grafeo_common::types::NodeId;
605
606/// Parallel source for scanning nodes from the LPG store.
607///
608/// Enables morsel-driven parallel execution of node scans by label.
609/// Each partition independently scans a range of node IDs, enabling
610/// linear scaling on multi-core systems for large datasets.
611///
612/// # Example
613///
614/// ```rust
615/// use grafeo_core::execution::parallel::{ParallelNodeScanSource, ParallelSource};
616/// use grafeo_core::graph::lpg::LpgStore;
617/// use std::sync::Arc;
618///
619/// let store = Arc::new(LpgStore::new());
620/// // ... populate store ...
621///
622/// // Scan all Person nodes in parallel
623/// let source = ParallelNodeScanSource::with_label(store, "Person");
624/// let morsels = source.generate_morsels(4096, 0);
625/// ```
626pub struct ParallelNodeScanSource {
627    /// The store to scan from.
628    store: Arc<LpgStore>,
629    /// Cached node IDs for the scan.
630    node_ids: Arc<Vec<NodeId>>,
631    /// Current read position.
632    position: usize,
633}
634
635impl ParallelNodeScanSource {
636    /// Creates a parallel source for all nodes in the store.
637    #[must_use]
638    pub fn new(store: Arc<LpgStore>) -> Self {
639        let node_ids = Arc::new(store.node_ids());
640        Self {
641            store,
642            node_ids,
643            position: 0,
644        }
645    }
646
647    /// Creates a parallel source for nodes with a specific label.
648    #[must_use]
649    pub fn with_label(store: Arc<LpgStore>, label: &str) -> Self {
650        let node_ids = Arc::new(store.nodes_by_label(label));
651        Self {
652            store,
653            node_ids,
654            position: 0,
655        }
656    }
657
658    /// Creates from pre-computed node IDs.
659    ///
660    /// Useful when node IDs are already available from a previous operation.
661    #[must_use]
662    pub fn from_node_ids(store: Arc<LpgStore>, node_ids: Vec<NodeId>) -> Self {
663        Self {
664            store,
665            node_ids: Arc::new(node_ids),
666            position: 0,
667        }
668    }
669
670    /// Returns the underlying store reference.
671    #[must_use]
672    pub fn store(&self) -> &Arc<LpgStore> {
673        &self.store
674    }
675}
676
677impl Source for ParallelNodeScanSource {
678    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
679        if self.position >= self.node_ids.len() {
680            return Ok(None);
681        }
682
683        let end = (self.position + chunk_size).min(self.node_ids.len());
684        let slice = &self.node_ids[self.position..end];
685
686        // Create a NodeId vector
687        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
688        for &id in slice {
689            vector.push_node_id(id);
690        }
691
692        self.position = end;
693        Ok(Some(DataChunk::new(vec![vector])))
694    }
695
696    fn reset(&mut self) {
697        self.position = 0;
698    }
699
700    fn name(&self) -> &'static str {
701        "ParallelNodeScanSource"
702    }
703}
704
705impl ParallelSource for ParallelNodeScanSource {
706    fn total_rows(&self) -> Option<usize> {
707        Some(self.node_ids.len())
708    }
709
710    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
711        Box::new(PartitionedNodeScanSource::new(
712            Arc::clone(&self.node_ids),
713            morsel.start_row,
714            morsel.end_row,
715        ))
716    }
717
718    fn num_columns(&self) -> usize {
719        1 // Node ID column
720    }
721}
722
723/// A partitioned view into a node scan source.
724struct PartitionedNodeScanSource {
725    node_ids: Arc<Vec<NodeId>>,
726    start_row: usize,
727    end_row: usize,
728    position: usize,
729}
730
731impl PartitionedNodeScanSource {
732    fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
733        Self {
734            node_ids,
735            start_row,
736            end_row,
737            position: start_row,
738        }
739    }
740}
741
742impl Source for PartitionedNodeScanSource {
743    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
744        if self.position >= self.end_row || self.position >= self.node_ids.len() {
745            return Ok(None);
746        }
747
748        let end = (self.position + chunk_size)
749            .min(self.end_row)
750            .min(self.node_ids.len());
751        let slice = &self.node_ids[self.position..end];
752
753        // Create a NodeId vector
754        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
755        for &id in slice {
756            vector.push_node_id(id);
757        }
758
759        self.position = end;
760        Ok(Some(DataChunk::new(vec![vector])))
761    }
762
763    fn reset(&mut self) {
764        self.position = self.start_row;
765    }
766
767    fn name(&self) -> &'static str {
768        "PartitionedNodeScanSource"
769    }
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775
776    #[test]
777    fn test_parallel_vector_source() {
778        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
779        let source = ParallelVectorSource::single_column(values);
780
781        assert_eq!(source.total_rows(), Some(100));
782        assert!(source.is_partitionable());
783        assert_eq!(source.num_columns(), 1);
784
785        let morsels = source.generate_morsels(30, 0);
786        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
787    }
788
789    #[test]
790    fn test_parallel_vector_source_partition() {
791        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
792        let source = ParallelVectorSource::single_column(values);
793
794        let morsel = Morsel::new(0, 0, 20, 50);
795        let mut partition = source.create_partition(&morsel);
796
797        // Should produce 30 rows total
798        let mut total = 0;
799        while let Ok(Some(chunk)) = partition.next_chunk(10) {
800            total += chunk.len();
801        }
802        assert_eq!(total, 30);
803    }
804
805    #[test]
806    fn test_range_source() {
807        let source = RangeSource::new(100);
808
809        assert_eq!(source.total_rows(), Some(100));
810        assert!(source.is_partitionable());
811
812        let morsels = source.generate_morsels(25, 0);
813        assert_eq!(morsels.len(), 4);
814    }
815
816    #[test]
817    fn test_range_source_partition() {
818        let source = RangeSource::new(100);
819
820        let morsel = Morsel::new(0, 0, 10, 30);
821        let mut partition = source.create_partition(&morsel);
822
823        let chunk = partition.next_chunk(100).unwrap().unwrap();
824        assert_eq!(chunk.len(), 20);
825
826        // Verify values are in range [10, 30)
827        let col = chunk.column(0).unwrap();
828        assert_eq!(col.get(0), Some(Value::Int64(10)));
829        assert_eq!(col.get(19), Some(Value::Int64(29)));
830    }
831
832    #[test]
833    fn test_parallel_chunk_source() {
834        let chunks: Vec<DataChunk> = (0..5)
835            .map(|i| {
836                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
837                DataChunk::new(vec![ValueVector::from_values(&values)])
838            })
839            .collect();
840
841        let source = ParallelChunkSource::new(chunks);
842        assert_eq!(source.total_rows(), Some(50));
843        assert_eq!(source.num_columns(), 1);
844    }
845
846    #[test]
847    fn test_parallel_chunk_source_partition() {
848        let chunks: Vec<DataChunk> = (0..5)
849            .map(|i| {
850                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
851                DataChunk::new(vec![ValueVector::from_values(&values)])
852            })
853            .collect();
854
855        let source = ParallelChunkSource::new(chunks);
856
857        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
858        let morsel = Morsel::new(0, 0, 15, 35);
859        let mut partition = source.create_partition(&morsel);
860
861        let mut total = 0;
862        let mut first_value: Option<i64> = None;
863        let mut last_value: Option<i64> = None;
864
865        while let Ok(Some(chunk)) = partition.next_chunk(10) {
866            if first_value.is_none()
867                && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
868            {
869                first_value = Some(v);
870            }
871            if let Some(Value::Int64(v)) = chunk
872                .column(0)
873                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
874            {
875                last_value = Some(v);
876            }
877            total += chunk.len();
878        }
879
880        assert_eq!(total, 20);
881        assert_eq!(first_value, Some(15));
882        assert_eq!(last_value, Some(34));
883    }
884
885    #[test]
886    fn test_partitioned_source_reset() {
887        let source = RangeSource::new(100);
888        let morsel = Morsel::new(0, 0, 0, 50);
889        let mut partition = source.create_partition(&morsel);
890
891        // Exhaust partition
892        while partition.next_chunk(100).unwrap().is_some() {}
893
894        // Reset and read again
895        partition.reset();
896        let chunk = partition.next_chunk(100).unwrap().unwrap();
897        assert_eq!(chunk.len(), 50);
898    }
899
900    #[cfg(feature = "rdf")]
901    #[test]
902    fn test_parallel_triple_scan_source() {
903        let triples = vec![
904            (
905                Value::String("s1".into()),
906                Value::String("p1".into()),
907                Value::String("o1".into()),
908            ),
909            (
910                Value::String("s2".into()),
911                Value::String("p2".into()),
912                Value::String("o2".into()),
913            ),
914            (
915                Value::String("s3".into()),
916                Value::String("p3".into()),
917                Value::String("o3".into()),
918            ),
919        ];
920        let source = ParallelTripleScanSource::new(triples);
921
922        assert_eq!(source.total_rows(), Some(3));
923        assert!(source.is_partitionable());
924        assert_eq!(source.num_columns(), 3);
925    }
926
927    #[cfg(feature = "rdf")]
928    #[test]
929    fn test_parallel_triple_scan_partition() {
930        let triples: Vec<(Value, Value, Value)> = (0..100)
931            .map(|i| {
932                (
933                    Value::String(format!("s{}", i).into()),
934                    Value::String(format!("p{}", i).into()),
935                    Value::String(format!("o{}", i).into()),
936                )
937            })
938            .collect();
939        let source = ParallelTripleScanSource::new(triples);
940
941        let morsel = Morsel::new(0, 0, 20, 50);
942        let mut partition = source.create_partition(&morsel);
943
944        let mut total = 0;
945        while let Ok(Some(chunk)) = partition.next_chunk(10) {
946            total += chunk.len();
947        }
948        assert_eq!(total, 30);
949    }
950
951    #[test]
952    fn test_parallel_node_scan_source() {
953        let store = Arc::new(LpgStore::new());
954
955        // Add some nodes with labels
956        for i in 0..100 {
957            if i % 2 == 0 {
958                store.create_node(&["Person", "Employee"]);
959            } else {
960                store.create_node(&["Person"]);
961            }
962        }
963
964        // Test scan all nodes
965        let source = ParallelNodeScanSource::new(Arc::clone(&store));
966        assert_eq!(source.total_rows(), Some(100));
967        assert!(source.is_partitionable());
968        assert_eq!(source.num_columns(), 1);
969
970        // Test scan by label
971        let source_person = ParallelNodeScanSource::with_label(Arc::clone(&store), "Person");
972        assert_eq!(source_person.total_rows(), Some(100));
973
974        let source_employee = ParallelNodeScanSource::with_label(Arc::clone(&store), "Employee");
975        assert_eq!(source_employee.total_rows(), Some(50));
976    }
977
978    #[test]
979    fn test_parallel_node_scan_partition() {
980        let store = Arc::new(LpgStore::new());
981
982        // Add 100 nodes
983        for _ in 0..100 {
984            store.create_node(&[]);
985        }
986
987        let source = ParallelNodeScanSource::new(Arc::clone(&store));
988
989        // Create partition for rows 20-50
990        let morsel = Morsel::new(0, 0, 20, 50);
991        let mut partition = source.create_partition(&morsel);
992
993        // Should produce 30 rows total
994        let mut total = 0;
995        while let Ok(Some(chunk)) = partition.next_chunk(10) {
996            total += chunk.len();
997        }
998        assert_eq!(total, 30);
999    }
1000
1001    #[test]
1002    fn test_parallel_node_scan_morsels() {
1003        let store = Arc::new(LpgStore::new());
1004
1005        // Add 1000 nodes
1006        for _ in 0..1000 {
1007            store.create_node(&[]);
1008        }
1009
1010        let source = ParallelNodeScanSource::new(Arc::clone(&store));
1011
1012        // Generate morsels with size 256
1013        let morsels = source.generate_morsels(256, 0);
1014        assert_eq!(morsels.len(), 4); // 1000 / 256 = 3 full + 1 partial
1015
1016        // Verify morsels cover all rows
1017        let mut total_rows = 0;
1018        for morsel in &morsels {
1019            total_rows += morsel.end_row - morsel.start_row;
1020        }
1021        assert_eq!(total_rows, 1000);
1022    }
1023}