Skip to main content

grafeo_core/execution/
source.rs

1//! Source implementations for push-based execution.
2//!
3//! Sources produce chunks of data that flow through push-based pipelines.
4
5use super::chunk::DataChunk;
6use super::operators::{Operator, OperatorError};
7use super::pipeline::Source;
8use super::vector::ValueVector;
9use grafeo_common::types::{NodeId, Value};
10
11/// Adapts a pull-based operator to work as a Source.
12///
13/// This allows gradual migration from pull to push model by wrapping
14/// existing operators as sources for push pipelines.
15pub struct OperatorSource {
16    operator: Box<dyn Operator>,
17}
18
19impl OperatorSource {
20    /// Create a new operator source.
21    pub fn new(operator: Box<dyn Operator>) -> Self {
22        Self { operator }
23    }
24}
25
26impl Source for OperatorSource {
27    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
28        // Pull-based operators produce their own chunk sizes,
29        // so we ignore the requested chunk_size
30        self.operator.next()
31    }
32
33    fn reset(&mut self) {
34        self.operator.reset();
35    }
36
37    fn name(&self) -> &'static str {
38        "OperatorSource"
39    }
40}
41
42/// Source that produces chunks from a vector of values.
43///
44/// Useful for testing and for materializing intermediate results.
45pub struct VectorSource {
46    values: Vec<Vec<Value>>,
47    position: usize,
48    num_columns: usize,
49}
50
51impl VectorSource {
52    /// Create a new vector source from column data.
53    pub fn new(columns: Vec<Vec<Value>>) -> Self {
54        let num_columns = columns.len();
55        Self {
56            values: columns,
57            position: 0,
58            num_columns,
59        }
60    }
61
62    /// Create a single-column source.
63    pub fn single_column(values: Vec<Value>) -> Self {
64        Self::new(vec![values])
65    }
66
67    /// Create from node IDs.
68    pub fn from_node_ids(ids: Vec<NodeId>) -> Self {
69        let values: Vec<Value> = ids
70            .into_iter()
71            .map(|id| Value::Int64(id.0 as i64))
72            .collect();
73        Self::single_column(values)
74    }
75}
76
77impl Source for VectorSource {
78    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
79        if self.num_columns == 0 || self.values[0].is_empty() {
80            return Ok(None);
81        }
82
83        let total_rows = self.values[0].len();
84        if self.position >= total_rows {
85            return Ok(None);
86        }
87
88        let end = (self.position + chunk_size).min(total_rows);
89        let mut columns = Vec::with_capacity(self.num_columns);
90
91        for col_values in &self.values {
92            let slice = &col_values[self.position..end];
93            let vector = ValueVector::from_values(slice);
94            columns.push(vector);
95        }
96
97        self.position = end;
98        Ok(Some(DataChunk::new(columns)))
99    }
100
101    fn reset(&mut self) {
102        self.position = 0;
103    }
104
105    fn name(&self) -> &'static str {
106        "VectorSource"
107    }
108}
109
110/// Source that produces a single empty chunk (for testing).
111pub struct EmptySource;
112
113impl EmptySource {
114    /// Create a new empty source.
115    pub fn new() -> Self {
116        Self
117    }
118}
119
120impl Default for EmptySource {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl Source for EmptySource {
127    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
128        Ok(None)
129    }
130
131    fn reset(&mut self) {}
132
133    fn name(&self) -> &'static str {
134        "EmptySource"
135    }
136}
137
138/// Source that produces chunks from a pre-built collection.
139///
140/// Takes ownership of DataChunks and produces them one at a time.
141pub struct ChunkSource {
142    chunks: Vec<DataChunk>,
143    position: usize,
144}
145
146impl ChunkSource {
147    /// Create a new chunk source.
148    pub fn new(chunks: Vec<DataChunk>) -> Self {
149        Self {
150            chunks,
151            position: 0,
152        }
153    }
154
155    /// Create from a single chunk.
156    pub fn single(chunk: DataChunk) -> Self {
157        Self::new(vec![chunk])
158    }
159}
160
161impl Source for ChunkSource {
162    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
163        if self.position >= self.chunks.len() {
164            return Ok(None);
165        }
166
167        let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
168        self.position += 1;
169        Ok(Some(chunk))
170    }
171
172    fn reset(&mut self) {
173        // Cannot reset since chunks were moved out
174        self.position = 0;
175    }
176
177    fn name(&self) -> &'static str {
178        "ChunkSource"
179    }
180}
181
182/// Source that generates values using a closure.
183///
184/// Useful for generating test data or for lazy evaluation.
185pub struct GeneratorSource<F>
186where
187    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
188{
189    generator: F,
190    row_index: usize,
191    exhausted: bool,
192}
193
194impl<F> GeneratorSource<F>
195where
196    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
197{
198    /// Create a new generator source.
199    pub fn new(generator: F) -> Self {
200        Self {
201            generator,
202            row_index: 0,
203            exhausted: false,
204        }
205    }
206}
207
208impl<F> Source for GeneratorSource<F>
209where
210    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
211{
212    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
213        if self.exhausted {
214            return Ok(None);
215        }
216
217        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(chunk_size);
218
219        for _ in 0..chunk_size {
220            if let Some(row) = (self.generator)(self.row_index) {
221                rows.push(row);
222                self.row_index += 1;
223            } else {
224                self.exhausted = true;
225                break;
226            }
227        }
228
229        if rows.is_empty() {
230            return Ok(None);
231        }
232
233        // Transpose rows into columns
234        let num_columns = rows[0].len();
235        let mut columns: Vec<ValueVector> = (0..num_columns).map(|_| ValueVector::new()).collect();
236
237        for row in rows {
238            for (col_idx, val) in row.into_iter().enumerate() {
239                if col_idx < columns.len() {
240                    columns[col_idx].push(val);
241                }
242            }
243        }
244
245        Ok(Some(DataChunk::new(columns)))
246    }
247
248    fn reset(&mut self) {
249        self.row_index = 0;
250        self.exhausted = false;
251    }
252
253    fn name(&self) -> &'static str {
254        "GeneratorSource"
255    }
256}
257
258/// Source that scans RDF triples matching a pattern.
259///
260/// Produces chunks with columns for subject, predicate, and object.
261#[cfg(feature = "rdf")]
262pub struct TripleScanSource {
263    /// The triples to scan (materialized for simplicity).
264    triples: Vec<(Value, Value, Value)>,
265    /// Current position in the triples.
266    position: usize,
267    /// Variable names for output columns.
268    output_vars: Vec<String>,
269}
270
271#[cfg(feature = "rdf")]
272impl TripleScanSource {
273    /// Create a new triple scan source.
274    ///
275    /// # Arguments
276    /// * `triples` - The triples to scan (subject, predicate, object as Values)
277    /// * `output_vars` - Names of variables to bind (typically ["s", "p", "o"] or a subset)
278    pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
279        Self {
280            triples,
281            position: 0,
282            output_vars,
283        }
284    }
285
286    /// Create from an RDF store query result.
287    pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
288    where
289        I: IntoIterator<Item = (Value, Value, Value)>,
290    {
291        Self::new(iter.into_iter().collect(), output_vars)
292    }
293
294    /// Returns the number of remaining triples.
295    pub fn remaining(&self) -> usize {
296        self.triples.len().saturating_sub(self.position)
297    }
298}
299
300#[cfg(feature = "rdf")]
301impl Source for TripleScanSource {
302    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
303        if self.position >= self.triples.len() {
304            return Ok(None);
305        }
306
307        let end = (self.position + chunk_size).min(self.triples.len());
308        let slice = &self.triples[self.position..end];
309
310        // Create columns for subject, predicate, object
311        let mut subjects = Vec::with_capacity(slice.len());
312        let mut predicates = Vec::with_capacity(slice.len());
313        let mut objects = Vec::with_capacity(slice.len());
314
315        for (s, p, o) in slice {
316            subjects.push(s.clone());
317            predicates.push(p.clone());
318            objects.push(o.clone());
319        }
320
321        let mut columns = Vec::with_capacity(3);
322
323        // Only include columns for requested variables
324        for var in &self.output_vars {
325            match var.as_str() {
326                "s" | "subject" => columns.push(ValueVector::from_values(&subjects)),
327                "p" | "predicate" => columns.push(ValueVector::from_values(&predicates)),
328                "o" | "object" => columns.push(ValueVector::from_values(&objects)),
329                _ => {
330                    // For other variable names, we need to determine which position
331                    // they refer to based on the query pattern
332                    // For now, include all three columns if unknown
333                    if columns.is_empty() {
334                        columns.push(ValueVector::from_values(&subjects));
335                        columns.push(ValueVector::from_values(&predicates));
336                        columns.push(ValueVector::from_values(&objects));
337                    }
338                }
339            }
340        }
341
342        // If no columns were added, include all
343        if columns.is_empty() {
344            columns.push(ValueVector::from_values(&subjects));
345            columns.push(ValueVector::from_values(&predicates));
346            columns.push(ValueVector::from_values(&objects));
347        }
348
349        self.position = end;
350        Ok(Some(DataChunk::new(columns)))
351    }
352
353    fn reset(&mut self) {
354        self.position = 0;
355    }
356
357    fn name(&self) -> &'static str {
358        "TripleScanSource"
359    }
360}
361
362/// Push operator that performs a hash join on RDF triple patterns.
363///
364/// Used when joining two triple patterns that share a variable.
365/// Note: This is a stub for future implementation.
366#[cfg(feature = "rdf")]
367#[allow(dead_code)]
368pub struct TripleJoinOperator {
369    /// Name of the join variable.
370    join_var: String,
371    /// Left input column index for join key.
372    left_key_col: usize,
373    /// Right input column index for join key.
374    right_key_col: usize,
375    /// Buffered right side for hash join.
376    right_buffer: Vec<DataChunk>,
377    /// Whether we've seen the right side completely.
378    right_complete: bool,
379}
380
381#[cfg(feature = "rdf")]
382impl TripleJoinOperator {
383    /// Create a new triple join operator.
384    pub fn new(join_var: String, left_key_col: usize, right_key_col: usize) -> Self {
385        Self {
386            join_var,
387            left_key_col,
388            right_key_col,
389            right_buffer: Vec::new(),
390            right_complete: false,
391        }
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_vector_source_single_chunk() {
401        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
402        let mut source = VectorSource::single_column(values);
403
404        let chunk = source.next_chunk(10).unwrap().unwrap();
405        assert_eq!(chunk.len(), 3);
406
407        let next = source.next_chunk(10).unwrap();
408        assert!(next.is_none());
409    }
410
411    #[test]
412    fn test_vector_source_chunked() {
413        let values: Vec<Value> = (0..10).map(|i| Value::Int64(i)).collect();
414        let mut source = VectorSource::single_column(values);
415
416        let chunk1 = source.next_chunk(3).unwrap().unwrap();
417        assert_eq!(chunk1.len(), 3);
418
419        let chunk2 = source.next_chunk(3).unwrap().unwrap();
420        assert_eq!(chunk2.len(), 3);
421
422        let chunk3 = source.next_chunk(3).unwrap().unwrap();
423        assert_eq!(chunk3.len(), 3);
424
425        let chunk4 = source.next_chunk(3).unwrap().unwrap();
426        assert_eq!(chunk4.len(), 1); // Remaining row
427
428        let none = source.next_chunk(3).unwrap();
429        assert!(none.is_none());
430    }
431
432    #[test]
433    fn test_vector_source_reset() {
434        let values = vec![Value::Int64(1), Value::Int64(2)];
435        let mut source = VectorSource::single_column(values);
436
437        let _ = source.next_chunk(10).unwrap();
438        assert!(source.next_chunk(10).unwrap().is_none());
439
440        source.reset();
441        let chunk = source.next_chunk(10).unwrap().unwrap();
442        assert_eq!(chunk.len(), 2);
443    }
444
445    #[test]
446    fn test_empty_source() {
447        let mut source = EmptySource::new();
448        assert!(source.next_chunk(100).unwrap().is_none());
449    }
450
451    #[test]
452    fn test_chunk_source() {
453        let v1 = ValueVector::from_values(&[Value::Int64(1), Value::Int64(2)]);
454        let chunk1 = DataChunk::new(vec![v1]);
455
456        let v2 = ValueVector::from_values(&[Value::Int64(3), Value::Int64(4)]);
457        let chunk2 = DataChunk::new(vec![v2]);
458
459        let mut source = ChunkSource::new(vec![chunk1, chunk2]);
460
461        let c1 = source.next_chunk(100).unwrap().unwrap();
462        assert_eq!(c1.len(), 2);
463
464        let c2 = source.next_chunk(100).unwrap().unwrap();
465        assert_eq!(c2.len(), 2);
466
467        assert!(source.next_chunk(100).unwrap().is_none());
468    }
469
470    #[test]
471    fn test_generator_source() {
472        let mut source = GeneratorSource::new(|i| {
473            if i < 5 {
474                Some(vec![Value::Int64(i as i64)])
475            } else {
476                None
477            }
478        });
479
480        let chunk1 = source.next_chunk(2).unwrap().unwrap();
481        assert_eq!(chunk1.len(), 2);
482
483        let chunk2 = source.next_chunk(2).unwrap().unwrap();
484        assert_eq!(chunk2.len(), 2);
485
486        let chunk3 = source.next_chunk(2).unwrap().unwrap();
487        assert_eq!(chunk3.len(), 1);
488
489        assert!(source.next_chunk(2).unwrap().is_none());
490    }
491}