Skip to main content

grafeo_core/execution/parallel/
source.rs

1//! Parallel source trait for partitionable data sources.
2//!
3//! Extends the Source trait with capabilities needed for parallel execution:
4//! knowing total row count, creating partitions for morsels, etc.
5
6use super::morsel::{Morsel, generate_morsels};
7use crate::execution::chunk::DataChunk;
8use crate::execution::operators::OperatorError;
9use crate::execution::pipeline::Source;
10use crate::execution::vector::ValueVector;
11use grafeo_common::types::Value;
12use std::sync::Arc;
13
14/// Trait for sources that support parallel partitioning.
15///
16/// Parallel sources can:
17/// - Report their total row count (if known)
18/// - Be partitioned into independent morsels
19/// - Create partition sources for specific morsels
20pub trait ParallelSource: Source + Send + Sync {
21    /// Returns the total number of rows in this source, if known.
22    ///
23    /// Returns `None` if the total is unknown (e.g., for streaming sources).
24    fn total_rows(&self) -> Option<usize>;
25
26    /// Returns whether this source can be partitioned.
27    ///
28    /// Some sources (like streaming or network sources) cannot be partitioned.
29    fn is_partitionable(&self) -> bool {
30        self.total_rows().is_some()
31    }
32
33    /// Creates a partition source for the given morsel.
34    ///
35    /// The returned source produces data only for the row range specified
36    /// in the morsel.
37    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source>;
38
39    /// Generates morsels that cover all rows in this source.
40    ///
41    /// Returns an empty vector if the source has no rows or cannot be partitioned.
42    fn generate_morsels(&self, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
43        match self.total_rows() {
44            Some(total) => generate_morsels(total, morsel_size, source_id),
45            None => Vec::new(),
46        }
47    }
48
49    /// Returns the number of columns in this source.
50    fn num_columns(&self) -> usize;
51}
52
53/// Parallel source wrapper for vector data.
54///
55/// Wraps columnar data in a parallel source that can be partitioned.
56pub struct ParallelVectorSource {
57    /// Column data (shared across partitions).
58    columns: Arc<Vec<Vec<Value>>>,
59    /// Current read position.
60    position: usize,
61}
62
63impl ParallelVectorSource {
64    /// Creates a new parallel vector source.
65    #[must_use]
66    pub fn new(columns: Vec<Vec<Value>>) -> Self {
67        Self {
68            columns: Arc::new(columns),
69            position: 0,
70        }
71    }
72
73    /// Creates a single-column source.
74    #[must_use]
75    pub fn single_column(values: Vec<Value>) -> Self {
76        Self::new(vec![values])
77    }
78}
79
80impl Source for ParallelVectorSource {
81    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
82        if self.columns.is_empty() || self.columns[0].is_empty() {
83            return Ok(None);
84        }
85
86        let total_rows = self.columns[0].len();
87        if self.position >= total_rows {
88            return Ok(None);
89        }
90
91        let end = (self.position + chunk_size).min(total_rows);
92        let mut vectors = Vec::with_capacity(self.columns.len());
93
94        for col_values in self.columns.iter() {
95            let slice = &col_values[self.position..end];
96            vectors.push(ValueVector::from_values(slice));
97        }
98
99        self.position = end;
100        Ok(Some(DataChunk::new(vectors)))
101    }
102
103    fn reset(&mut self) {
104        self.position = 0;
105    }
106
107    fn name(&self) -> &'static str {
108        "ParallelVectorSource"
109    }
110}
111
112impl ParallelSource for ParallelVectorSource {
113    fn total_rows(&self) -> Option<usize> {
114        if self.columns.is_empty() {
115            Some(0)
116        } else {
117            Some(self.columns[0].len())
118        }
119    }
120
121    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
122        Box::new(PartitionedVectorSource::new(
123            Arc::clone(&self.columns),
124            morsel.start_row,
125            morsel.end_row,
126        ))
127    }
128
129    fn num_columns(&self) -> usize {
130        self.columns.len()
131    }
132}
133
134/// A partitioned view into a vector source.
135///
136/// Only produces data for a specific row range.
137struct PartitionedVectorSource {
138    columns: Arc<Vec<Vec<Value>>>,
139    start_row: usize,
140    end_row: usize,
141    position: usize,
142}
143
144impl PartitionedVectorSource {
145    fn new(columns: Arc<Vec<Vec<Value>>>, start_row: usize, end_row: usize) -> Self {
146        Self {
147            columns,
148            start_row,
149            end_row,
150            position: start_row,
151        }
152    }
153}
154
155impl Source for PartitionedVectorSource {
156    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
157        if self.columns.is_empty() || self.position >= self.end_row {
158            return Ok(None);
159        }
160
161        let end = (self.position + chunk_size).min(self.end_row);
162        let mut vectors = Vec::with_capacity(self.columns.len());
163
164        for col_values in self.columns.iter() {
165            let slice = &col_values[self.position..end];
166            vectors.push(ValueVector::from_values(slice));
167        }
168
169        self.position = end;
170        Ok(Some(DataChunk::new(vectors)))
171    }
172
173    fn reset(&mut self) {
174        self.position = self.start_row;
175    }
176
177    fn name(&self) -> &'static str {
178        "PartitionedVectorSource"
179    }
180}
181
182/// Parallel source for pre-built chunks.
183///
184/// Wraps a collection of DataChunks in a parallel source.
185pub struct ParallelChunkSource {
186    chunks: Arc<Vec<DataChunk>>,
187    /// Cumulative row count at each chunk start.
188    cumulative_rows: Vec<usize>,
189    /// Total row count.
190    total_rows: usize,
191    /// Current chunk index.
192    chunk_index: usize,
193    /// Number of columns.
194    num_columns: usize,
195}
196
197impl ParallelChunkSource {
198    /// Creates a new parallel chunk source.
199    #[must_use]
200    pub fn new(chunks: Vec<DataChunk>) -> Self {
201        let mut cumulative_rows = Vec::with_capacity(chunks.len() + 1);
202        let mut sum = 0;
203        cumulative_rows.push(0);
204        for chunk in &chunks {
205            sum += chunk.len();
206            cumulative_rows.push(sum);
207        }
208
209        let num_columns = chunks.first().map_or(0, |c| c.num_columns());
210
211        Self {
212            chunks: Arc::new(chunks),
213            cumulative_rows,
214            total_rows: sum,
215            chunk_index: 0,
216            num_columns,
217        }
218    }
219}
220
221impl Source for ParallelChunkSource {
222    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
223        if self.chunk_index >= self.chunks.len() {
224            return Ok(None);
225        }
226
227        let chunk = self.chunks[self.chunk_index].clone();
228        self.chunk_index += 1;
229        Ok(Some(chunk))
230    }
231
232    fn reset(&mut self) {
233        self.chunk_index = 0;
234    }
235
236    fn name(&self) -> &'static str {
237        "ParallelChunkSource"
238    }
239}
240
241impl ParallelSource for ParallelChunkSource {
242    fn total_rows(&self) -> Option<usize> {
243        Some(self.total_rows)
244    }
245
246    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
247        Box::new(PartitionedChunkSource::new(
248            Arc::clone(&self.chunks),
249            self.cumulative_rows.clone(),
250            morsel.start_row,
251            morsel.end_row,
252        ))
253    }
254
255    fn num_columns(&self) -> usize {
256        self.num_columns
257    }
258}
259
260/// A partitioned view into a chunk source.
261struct PartitionedChunkSource {
262    chunks: Arc<Vec<DataChunk>>,
263    cumulative_rows: Vec<usize>,
264    start_row: usize,
265    end_row: usize,
266    current_row: usize,
267}
268
269impl PartitionedChunkSource {
270    fn new(
271        chunks: Arc<Vec<DataChunk>>,
272        cumulative_rows: Vec<usize>,
273        start_row: usize,
274        end_row: usize,
275    ) -> Self {
276        Self {
277            chunks,
278            cumulative_rows,
279            start_row,
280            end_row,
281            current_row: start_row,
282        }
283    }
284
285    /// Finds the chunk index containing the given row.
286    fn find_chunk_index(&self, row: usize) -> Option<usize> {
287        // Binary search for the chunk containing this row
288        match self
289            .cumulative_rows
290            .binary_search_by(|&cumul| cumul.cmp(&row))
291        {
292            Ok(idx) => Some(idx.min(self.chunks.len().saturating_sub(1))),
293            Err(idx) => {
294                if idx == 0 {
295                    Some(0)
296                } else {
297                    Some((idx - 1).min(self.chunks.len().saturating_sub(1)))
298                }
299            }
300        }
301    }
302}
303
304impl Source for PartitionedChunkSource {
305    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
306        if self.current_row >= self.end_row || self.chunks.is_empty() {
307            return Ok(None);
308        }
309
310        // Find the chunk containing current_row
311        let Some(chunk_idx) = self.find_chunk_index(self.current_row) else {
312            return Ok(None);
313        };
314
315        if chunk_idx >= self.chunks.len() {
316            return Ok(None);
317        }
318
319        let chunk_start = self.cumulative_rows[chunk_idx];
320        let chunk = &self.chunks[chunk_idx];
321        let offset_in_chunk = self.current_row - chunk_start;
322
323        // Calculate how many rows to extract
324        let rows_in_chunk = chunk.len().saturating_sub(offset_in_chunk);
325        let rows_to_end = self.end_row.saturating_sub(self.current_row);
326        let rows_to_extract = rows_in_chunk.min(rows_to_end).min(chunk_size);
327
328        if rows_to_extract == 0 {
329            return Ok(None);
330        }
331
332        // Extract slice from chunk
333        let sliced = chunk.slice(offset_in_chunk, rows_to_extract);
334        self.current_row += rows_to_extract;
335
336        Ok(Some(sliced))
337    }
338
339    fn reset(&mut self) {
340        self.current_row = self.start_row;
341    }
342
343    fn name(&self) -> &'static str {
344        "PartitionedChunkSource"
345    }
346}
347
348/// Generates a range source for parallel execution testing.
349///
350/// Produces integers from 0 to n-1 in a single column.
351pub struct RangeSource {
352    total: usize,
353    position: usize,
354}
355
356impl RangeSource {
357    /// Creates a new range source.
358    #[must_use]
359    pub fn new(total: usize) -> Self {
360        Self { total, position: 0 }
361    }
362}
363
364impl Source for RangeSource {
365    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
366        if self.position >= self.total {
367            return Ok(None);
368        }
369
370        let end = (self.position + chunk_size).min(self.total);
371        let values: Vec<Value> = (self.position..end)
372            // reason: range index fits i64 for practical sizes
373            .map(|i| {
374                // reason: range index fits i64 for practical sizes
375                #[allow(clippy::cast_possible_wrap)]
376                let val = Value::Int64(i as i64);
377                val
378            })
379            .collect();
380
381        self.position = end;
382        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
383            &values,
384        )])))
385    }
386
387    fn reset(&mut self) {
388        self.position = 0;
389    }
390
391    fn name(&self) -> &'static str {
392        "RangeSource"
393    }
394}
395
396impl ParallelSource for RangeSource {
397    fn total_rows(&self) -> Option<usize> {
398        Some(self.total)
399    }
400
401    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
402        Box::new(RangePartition::new(morsel.start_row, morsel.end_row))
403    }
404
405    fn num_columns(&self) -> usize {
406        1
407    }
408}
409
410/// A partition of a range source.
411struct RangePartition {
412    start: usize,
413    end: usize,
414    position: usize,
415}
416
417impl RangePartition {
418    fn new(start: usize, end: usize) -> Self {
419        Self {
420            start,
421            end,
422            position: start,
423        }
424    }
425}
426
427impl Source for RangePartition {
428    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
429        if self.position >= self.end {
430            return Ok(None);
431        }
432
433        let end = (self.position + chunk_size).min(self.end);
434        let values: Vec<Value> = (self.position..end)
435            // reason: range index fits i64 for practical sizes
436            // reason: range index fits i64 for practical sizes
437            .map(|i| {
438                // reason: range index fits i64 for practical sizes
439                #[allow(clippy::cast_possible_wrap)]
440                let val = Value::Int64(i as i64);
441                val
442            })
443            .collect();
444
445        self.position = end;
446        Ok(Some(DataChunk::new(vec![ValueVector::from_values(
447            &values,
448        )])))
449    }
450
451    fn reset(&mut self) {
452        self.position = self.start;
453    }
454
455    fn name(&self) -> &'static str {
456        "RangePartition"
457    }
458}
459
460/// Parallel source for RDF triple scanning.
461///
462/// Wraps triple data in a parallel source that can be partitioned for
463/// morsel-driven execution of SPARQL queries.
464#[cfg(feature = "triple-store")]
465pub struct ParallelTripleScanSource {
466    /// Triple data: (subject, predicate, object) tuples.
467    triples: Arc<Vec<(Value, Value, Value)>>,
468    /// Current read position.
469    position: usize,
470}
471
472#[cfg(feature = "triple-store")]
473impl ParallelTripleScanSource {
474    /// Creates a new parallel triple scan source.
475    #[must_use]
476    pub fn new(triples: Vec<(Value, Value, Value)>) -> Self {
477        Self {
478            triples: Arc::new(triples),
479            position: 0,
480        }
481    }
482
483    /// Creates from an iterator of triples.
484    pub fn from_triples<I>(iter: I) -> Self
485    where
486        I: IntoIterator<Item = (Value, Value, Value)>,
487    {
488        Self::new(iter.into_iter().collect())
489    }
490}
491
492#[cfg(feature = "triple-store")]
493impl Source for ParallelTripleScanSource {
494    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
495        if self.position >= self.triples.len() {
496            return Ok(None);
497        }
498
499        let end = (self.position + chunk_size).min(self.triples.len());
500        let slice = &self.triples[self.position..end];
501
502        let mut subjects = Vec::with_capacity(slice.len());
503        let mut predicates = Vec::with_capacity(slice.len());
504        let mut objects = Vec::with_capacity(slice.len());
505
506        for (s, p, o) in slice {
507            subjects.push(s.clone());
508            predicates.push(p.clone());
509            objects.push(o.clone());
510        }
511
512        let columns = vec![
513            ValueVector::from_values(&subjects),
514            ValueVector::from_values(&predicates),
515            ValueVector::from_values(&objects),
516        ];
517
518        self.position = end;
519        Ok(Some(DataChunk::new(columns)))
520    }
521
522    fn reset(&mut self) {
523        self.position = 0;
524    }
525
526    fn name(&self) -> &'static str {
527        "ParallelTripleScanSource"
528    }
529}
530
531#[cfg(feature = "triple-store")]
532impl ParallelSource for ParallelTripleScanSource {
533    fn total_rows(&self) -> Option<usize> {
534        Some(self.triples.len())
535    }
536
537    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
538        Box::new(PartitionedTripleScanSource::new(
539            Arc::clone(&self.triples),
540            morsel.start_row,
541            morsel.end_row,
542        ))
543    }
544
545    fn num_columns(&self) -> usize {
546        3 // subject, predicate, object
547    }
548}
549
550/// A partitioned view into a triple scan source.
551#[cfg(feature = "triple-store")]
552struct PartitionedTripleScanSource {
553    triples: Arc<Vec<(Value, Value, Value)>>,
554    start_row: usize,
555    end_row: usize,
556    position: usize,
557}
558
559#[cfg(feature = "triple-store")]
560impl PartitionedTripleScanSource {
561    fn new(triples: Arc<Vec<(Value, Value, Value)>>, start_row: usize, end_row: usize) -> Self {
562        Self {
563            triples,
564            start_row,
565            end_row,
566            position: start_row,
567        }
568    }
569}
570
571#[cfg(feature = "triple-store")]
572impl Source for PartitionedTripleScanSource {
573    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
574        if self.position >= self.end_row || self.position >= self.triples.len() {
575            return Ok(None);
576        }
577
578        let end = (self.position + chunk_size)
579            .min(self.end_row)
580            .min(self.triples.len());
581        let slice = &self.triples[self.position..end];
582
583        let mut subjects = Vec::with_capacity(slice.len());
584        let mut predicates = Vec::with_capacity(slice.len());
585        let mut objects = Vec::with_capacity(slice.len());
586
587        for (s, p, o) in slice {
588            subjects.push(s.clone());
589            predicates.push(p.clone());
590            objects.push(o.clone());
591        }
592
593        let columns = vec![
594            ValueVector::from_values(&subjects),
595            ValueVector::from_values(&predicates),
596            ValueVector::from_values(&objects),
597        ];
598
599        self.position = end;
600        Ok(Some(DataChunk::new(columns)))
601    }
602
603    fn reset(&mut self) {
604        self.position = self.start_row;
605    }
606
607    fn name(&self) -> &'static str {
608        "PartitionedTripleScanSource"
609    }
610}
611
612// ---------------------------------------------------------------------------
613// Parallel Node Scan Source (LPG)
614// ---------------------------------------------------------------------------
615
616use crate::graph::GraphStoreSearch;
617use grafeo_common::types::NodeId;
618
619/// Parallel source for scanning nodes from the LPG store.
620///
621/// Enables morsel-driven parallel execution of node scans by label.
622/// Each partition independently scans a range of node IDs, enabling
623/// linear scaling on multi-core systems for large datasets.
624///
625/// # Example
626///
627/// ```rust
628/// use grafeo_core::execution::parallel::{ParallelNodeScanSource, ParallelSource};
629/// use grafeo_core::graph::lpg::LpgStore;
630/// use std::sync::Arc;
631///
632/// let store = Arc::new(LpgStore::new().unwrap());
633/// // ... populate store ...
634///
635/// // Scan all Person nodes in parallel
636/// let source = ParallelNodeScanSource::with_label(store, "Person");
637/// let morsels = source.generate_morsels(4096, 0);
638/// ```
639pub struct ParallelNodeScanSource {
640    /// The store to scan from.
641    store: Arc<dyn GraphStoreSearch>,
642    /// Cached node IDs for the scan.
643    node_ids: Arc<Vec<NodeId>>,
644    /// Current read position.
645    position: usize,
646}
647
648impl ParallelNodeScanSource {
649    /// Creates a parallel source for all nodes in the store.
650    #[must_use]
651    pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
652        let node_ids = Arc::new(store.node_ids());
653        Self {
654            store,
655            node_ids,
656            position: 0,
657        }
658    }
659
660    /// Creates a parallel source for nodes with a specific label.
661    #[must_use]
662    pub fn with_label(store: Arc<dyn GraphStoreSearch>, label: &str) -> Self {
663        let node_ids = Arc::new(store.nodes_by_label(label));
664        Self {
665            store,
666            node_ids,
667            position: 0,
668        }
669    }
670
671    /// Creates from pre-computed node IDs.
672    ///
673    /// Useful when node IDs are already available from a previous operation.
674    #[must_use]
675    pub fn from_node_ids(store: Arc<dyn GraphStoreSearch>, node_ids: Vec<NodeId>) -> Self {
676        Self {
677            store,
678            node_ids: Arc::new(node_ids),
679            position: 0,
680        }
681    }
682
683    /// Returns the underlying store reference.
684    #[must_use]
685    pub fn store(&self) -> &Arc<dyn GraphStoreSearch> {
686        &self.store
687    }
688}
689
690impl Source for ParallelNodeScanSource {
691    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
692        if self.position >= self.node_ids.len() {
693            return Ok(None);
694        }
695
696        let end = (self.position + chunk_size).min(self.node_ids.len());
697        let slice = &self.node_ids[self.position..end];
698
699        // Create a NodeId vector
700        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
701        for &id in slice {
702            vector.push_node_id(id);
703        }
704
705        self.position = end;
706        Ok(Some(DataChunk::new(vec![vector])))
707    }
708
709    fn reset(&mut self) {
710        self.position = 0;
711    }
712
713    fn name(&self) -> &'static str {
714        "ParallelNodeScanSource"
715    }
716}
717
718impl ParallelSource for ParallelNodeScanSource {
719    fn total_rows(&self) -> Option<usize> {
720        Some(self.node_ids.len())
721    }
722
723    fn create_partition(&self, morsel: &Morsel) -> Box<dyn Source> {
724        Box::new(PartitionedNodeScanSource::new(
725            Arc::clone(&self.node_ids),
726            morsel.start_row,
727            morsel.end_row,
728        ))
729    }
730
731    fn num_columns(&self) -> usize {
732        1 // Node ID column
733    }
734}
735
736/// A partitioned view into a node scan source.
737struct PartitionedNodeScanSource {
738    node_ids: Arc<Vec<NodeId>>,
739    start_row: usize,
740    end_row: usize,
741    position: usize,
742}
743
744impl PartitionedNodeScanSource {
745    fn new(node_ids: Arc<Vec<NodeId>>, start_row: usize, end_row: usize) -> Self {
746        Self {
747            node_ids,
748            start_row,
749            end_row,
750            position: start_row,
751        }
752    }
753}
754
755impl Source for PartitionedNodeScanSource {
756    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
757        if self.position >= self.end_row || self.position >= self.node_ids.len() {
758            return Ok(None);
759        }
760
761        let end = (self.position + chunk_size)
762            .min(self.end_row)
763            .min(self.node_ids.len());
764        let slice = &self.node_ids[self.position..end];
765
766        // Create a NodeId vector
767        let mut vector = ValueVector::with_type(grafeo_common::types::LogicalType::Node);
768        for &id in slice {
769            vector.push_node_id(id);
770        }
771
772        self.position = end;
773        Ok(Some(DataChunk::new(vec![vector])))
774    }
775
776    fn reset(&mut self) {
777        self.position = self.start_row;
778    }
779
780    fn name(&self) -> &'static str {
781        "PartitionedNodeScanSource"
782    }
783}
784
785#[cfg(all(test, feature = "lpg"))]
786mod tests {
787    use super::*;
788    use crate::graph::GraphStoreMut;
789    use crate::graph::lpg::LpgStore;
790
791    #[test]
792    fn test_parallel_vector_source() {
793        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
794        let source = ParallelVectorSource::single_column(values);
795
796        assert_eq!(source.total_rows(), Some(100));
797        assert!(source.is_partitionable());
798        assert_eq!(source.num_columns(), 1);
799
800        let morsels = source.generate_morsels(30, 0);
801        assert_eq!(morsels.len(), 4); // 100 / 30 = 3 full + 1 partial
802    }
803
804    #[test]
805    fn test_parallel_vector_source_partition() {
806        let values: Vec<Value> = (0..100).map(Value::Int64).collect();
807        let source = ParallelVectorSource::single_column(values);
808
809        let morsel = Morsel::new(0, 0, 20, 50);
810        let mut partition = source.create_partition(&morsel);
811
812        // Should produce 30 rows total
813        let mut total = 0;
814        while let Ok(Some(chunk)) = partition.next_chunk(10) {
815            total += chunk.len();
816        }
817        assert_eq!(total, 30);
818    }
819
820    #[test]
821    fn test_range_source() {
822        let source = RangeSource::new(100);
823
824        assert_eq!(source.total_rows(), Some(100));
825        assert!(source.is_partitionable());
826
827        let morsels = source.generate_morsels(25, 0);
828        assert_eq!(morsels.len(), 4);
829    }
830
831    #[test]
832    fn test_range_source_partition() {
833        let source = RangeSource::new(100);
834
835        let morsel = Morsel::new(0, 0, 10, 30);
836        let mut partition = source.create_partition(&morsel);
837
838        let chunk = partition.next_chunk(100).unwrap().unwrap();
839        assert_eq!(chunk.len(), 20);
840
841        // Verify values are in range [10, 30)
842        let col = chunk.column(0).unwrap();
843        assert_eq!(col.get(0), Some(Value::Int64(10)));
844        assert_eq!(col.get(19), Some(Value::Int64(29)));
845    }
846
847    #[test]
848    fn test_parallel_chunk_source() {
849        let chunks: Vec<DataChunk> = (0..5)
850            .map(|i| {
851                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
852                DataChunk::new(vec![ValueVector::from_values(&values)])
853            })
854            .collect();
855
856        let source = ParallelChunkSource::new(chunks);
857        assert_eq!(source.total_rows(), Some(50));
858        assert_eq!(source.num_columns(), 1);
859    }
860
861    #[test]
862    fn test_parallel_chunk_source_partition() {
863        let chunks: Vec<DataChunk> = (0..5)
864            .map(|i| {
865                let values: Vec<Value> = (i * 10..(i + 1) * 10).map(Value::Int64).collect();
866                DataChunk::new(vec![ValueVector::from_values(&values)])
867            })
868            .collect();
869
870        let source = ParallelChunkSource::new(chunks);
871
872        // Partition spanning parts of chunks 1 and 2 (rows 15-35)
873        let morsel = Morsel::new(0, 0, 15, 35);
874        let mut partition = source.create_partition(&morsel);
875
876        let mut total = 0;
877        let mut first_value: Option<i64> = None;
878        let mut last_value: Option<i64> = None;
879
880        while let Ok(Some(chunk)) = partition.next_chunk(10) {
881            if first_value.is_none()
882                && let Some(Value::Int64(v)) = chunk.column(0).and_then(|c| c.get(0))
883            {
884                first_value = Some(v);
885            }
886            if let Some(Value::Int64(v)) = chunk
887                .column(0)
888                .and_then(|c| c.get(chunk.len().saturating_sub(1)))
889            {
890                last_value = Some(v);
891            }
892            total += chunk.len();
893        }
894
895        assert_eq!(total, 20);
896        assert_eq!(first_value, Some(15));
897        assert_eq!(last_value, Some(34));
898    }
899
900    #[test]
901    fn test_partitioned_source_reset() {
902        let source = RangeSource::new(100);
903        let morsel = Morsel::new(0, 0, 0, 50);
904        let mut partition = source.create_partition(&morsel);
905
906        // Exhaust partition
907        while partition.next_chunk(100).unwrap().is_some() {}
908
909        // Reset and read again
910        partition.reset();
911        let chunk = partition.next_chunk(100).unwrap().unwrap();
912        assert_eq!(chunk.len(), 50);
913    }
914
915    #[cfg(feature = "triple-store")]
916    #[test]
917    fn test_parallel_triple_scan_source() {
918        let triples = vec![
919            (
920                Value::String("s1".into()),
921                Value::String("p1".into()),
922                Value::String("o1".into()),
923            ),
924            (
925                Value::String("s2".into()),
926                Value::String("p2".into()),
927                Value::String("o2".into()),
928            ),
929            (
930                Value::String("s3".into()),
931                Value::String("p3".into()),
932                Value::String("o3".into()),
933            ),
934        ];
935        let source = ParallelTripleScanSource::new(triples);
936
937        assert_eq!(source.total_rows(), Some(3));
938        assert!(source.is_partitionable());
939        assert_eq!(source.num_columns(), 3);
940    }
941
942    #[cfg(feature = "triple-store")]
943    #[test]
944    fn test_parallel_triple_scan_partition() {
945        let triples: Vec<(Value, Value, Value)> = (0..100)
946            .map(|i| {
947                (
948                    Value::String(format!("s{}", i).into()),
949                    Value::String(format!("p{}", i).into()),
950                    Value::String(format!("o{}", i).into()),
951                )
952            })
953            .collect();
954        let source = ParallelTripleScanSource::new(triples);
955
956        let morsel = Morsel::new(0, 0, 20, 50);
957        let mut partition = source.create_partition(&morsel);
958
959        let mut total = 0;
960        while let Ok(Some(chunk)) = partition.next_chunk(10) {
961            total += chunk.len();
962        }
963        assert_eq!(total, 30);
964    }
965
966    #[test]
967    fn test_parallel_node_scan_source() {
968        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
969
970        // Add some nodes with labels
971        for i in 0..100 {
972            if i % 2 == 0 {
973                store.create_node(&["Person", "Employee"]);
974            } else {
975                store.create_node(&["Person"]);
976            }
977        }
978
979        // Test scan all nodes
980        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
981        assert_eq!(source.total_rows(), Some(100));
982        assert!(source.is_partitionable());
983        assert_eq!(source.num_columns(), 1);
984
985        // Test scan by label
986        let source_person = ParallelNodeScanSource::with_label(
987            Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
988            "Person",
989        );
990        assert_eq!(source_person.total_rows(), Some(100));
991
992        let source_employee = ParallelNodeScanSource::with_label(
993            Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
994            "Employee",
995        );
996        assert_eq!(source_employee.total_rows(), Some(50));
997    }
998
999    #[test]
1000    fn test_parallel_node_scan_partition() {
1001        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1002
1003        // Add 100 nodes
1004        for _ in 0..100 {
1005            store.create_node(&[]);
1006        }
1007
1008        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
1009
1010        // Create partition for rows 20-50
1011        let morsel = Morsel::new(0, 0, 20, 50);
1012        let mut partition = source.create_partition(&morsel);
1013
1014        // Should produce 30 rows total
1015        let mut total = 0;
1016        while let Ok(Some(chunk)) = partition.next_chunk(10) {
1017            total += chunk.len();
1018        }
1019        assert_eq!(total, 30);
1020    }
1021
1022    #[test]
1023    fn test_parallel_node_scan_morsels() {
1024        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1025
1026        // Add 1000 nodes
1027        for _ in 0..1000 {
1028            store.create_node(&[]);
1029        }
1030
1031        let source = ParallelNodeScanSource::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
1032
1033        // Generate morsels with size 256
1034        let morsels = source.generate_morsels(256, 0);
1035        assert_eq!(morsels.len(), 4); // 1000 / 256 = 3 full + 1 partial
1036
1037        // Verify morsels cover all rows
1038        let mut total_rows = 0;
1039        for morsel in &morsels {
1040            total_rows += morsel.end_row - morsel.start_row;
1041        }
1042        assert_eq!(total_rows, 1000);
1043    }
1044}