Skip to main content

grafeo_core/execution/
source.rs

1//! Source implementations for push-based execution.
2//!
3//! Sources produce chunks of data that flow through push-based pipelines.
4
5use super::chunk::DataChunk;
6use super::operators::{Operator, OperatorError};
7use super::pipeline::Source;
8use super::vector::ValueVector;
9use grafeo_common::types::{NodeId, Value};
10
11/// Adapts a pull-based operator to work as a Source.
12///
13/// This allows gradual migration from pull to push model by wrapping
14/// existing operators as sources for push pipelines.
15pub struct OperatorSource {
16    operator: Box<dyn Operator>,
17}
18
19impl OperatorSource {
20    /// Create a new operator source.
21    pub fn new(operator: Box<dyn Operator>) -> Self {
22        Self { operator }
23    }
24}
25
26impl Source for OperatorSource {
27    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
28        // Pull-based operators produce their own chunk sizes,
29        // so we ignore the requested chunk_size
30        self.operator.next()
31    }
32
33    fn reset(&mut self) {
34        self.operator.reset();
35    }
36
37    fn name(&self) -> &'static str {
38        "OperatorSource"
39    }
40}
41
42/// Source that produces chunks from a vector of values.
43///
44/// Useful for testing and for materializing intermediate results.
45pub struct VectorSource {
46    values: Vec<Vec<Value>>,
47    position: usize,
48    num_columns: usize,
49}
50
51impl VectorSource {
52    /// Create a new vector source from column data.
53    pub fn new(columns: Vec<Vec<Value>>) -> Self {
54        let num_columns = columns.len();
55        Self {
56            values: columns,
57            position: 0,
58            num_columns,
59        }
60    }
61
62    /// Create a single-column source.
63    pub fn single_column(values: Vec<Value>) -> Self {
64        Self::new(vec![values])
65    }
66
67    /// Create from node IDs.
68    pub fn from_node_ids(ids: Vec<NodeId>) -> Self {
69        let values: Vec<Value> = ids
70            .into_iter()
71            .map(|id| Value::Int64(id.0 as i64))
72            .collect();
73        Self::single_column(values)
74    }
75}
76
77impl Source for VectorSource {
78    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
79        if self.num_columns == 0 || self.values[0].is_empty() {
80            return Ok(None);
81        }
82
83        let total_rows = self.values[0].len();
84        if self.position >= total_rows {
85            return Ok(None);
86        }
87
88        let end = (self.position + chunk_size).min(total_rows);
89        let mut columns = Vec::with_capacity(self.num_columns);
90
91        for col_values in &self.values {
92            let slice = &col_values[self.position..end];
93            let vector = ValueVector::from_values(slice);
94            columns.push(vector);
95        }
96
97        self.position = end;
98        Ok(Some(DataChunk::new(columns)))
99    }
100
101    fn reset(&mut self) {
102        self.position = 0;
103    }
104
105    fn name(&self) -> &'static str {
106        "VectorSource"
107    }
108}
109
110/// Source that produces a single empty chunk (for testing).
111pub struct EmptySource;
112
113impl EmptySource {
114    /// Create a new empty source.
115    pub fn new() -> Self {
116        Self
117    }
118}
119
120impl Default for EmptySource {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl Source for EmptySource {
127    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
128        Ok(None)
129    }
130
131    fn reset(&mut self) {}
132
133    fn name(&self) -> &'static str {
134        "EmptySource"
135    }
136}
137
138/// Source that produces chunks from a pre-built collection.
139///
140/// Takes ownership of DataChunks and produces them one at a time.
141pub struct ChunkSource {
142    chunks: Vec<DataChunk>,
143    position: usize,
144}
145
146impl ChunkSource {
147    /// Create a new chunk source.
148    pub fn new(chunks: Vec<DataChunk>) -> Self {
149        Self {
150            chunks,
151            position: 0,
152        }
153    }
154
155    /// Create from a single chunk.
156    pub fn single(chunk: DataChunk) -> Self {
157        Self::new(vec![chunk])
158    }
159}
160
161impl Source for ChunkSource {
162    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
163        if self.position >= self.chunks.len() {
164            return Ok(None);
165        }
166
167        let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
168        self.position += 1;
169        Ok(Some(chunk))
170    }
171
172    fn reset(&mut self) {
173        // Cannot reset since chunks were moved out
174        self.position = 0;
175    }
176
177    fn name(&self) -> &'static str {
178        "ChunkSource"
179    }
180}
181
182/// Source that generates values using a closure.
183///
184/// Useful for generating test data or for lazy evaluation.
185pub struct GeneratorSource<F>
186where
187    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
188{
189    generator: F,
190    row_index: usize,
191    exhausted: bool,
192}
193
194impl<F> GeneratorSource<F>
195where
196    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
197{
198    /// Create a new generator source.
199    pub fn new(generator: F) -> Self {
200        Self {
201            generator,
202            row_index: 0,
203            exhausted: false,
204        }
205    }
206}
207
208impl<F> Source for GeneratorSource<F>
209where
210    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
211{
212    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
213        if self.exhausted {
214            return Ok(None);
215        }
216
217        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(chunk_size);
218
219        for _ in 0..chunk_size {
220            if let Some(row) = (self.generator)(self.row_index) {
221                rows.push(row);
222                self.row_index += 1;
223            } else {
224                self.exhausted = true;
225                break;
226            }
227        }
228
229        if rows.is_empty() {
230            return Ok(None);
231        }
232
233        // Transpose rows into columns
234        let num_columns = rows[0].len();
235        let mut columns: Vec<ValueVector> = (0..num_columns).map(|_| ValueVector::new()).collect();
236
237        for row in rows {
238            for (col_idx, val) in row.into_iter().enumerate() {
239                if col_idx < columns.len() {
240                    columns[col_idx].push(val);
241                }
242            }
243        }
244
245        Ok(Some(DataChunk::new(columns)))
246    }
247
248    fn reset(&mut self) {
249        self.row_index = 0;
250        self.exhausted = false;
251    }
252
253    fn name(&self) -> &'static str {
254        "GeneratorSource"
255    }
256}
257
258/// Source that scans RDF triples matching a pattern.
259///
260/// Produces chunks with columns for subject, predicate, and object.
261#[cfg(feature = "rdf")]
262pub struct TripleScanSource {
263    /// The triples to scan (materialized for simplicity).
264    triples: Vec<(Value, Value, Value)>,
265    /// Current position in the triples.
266    position: usize,
267    /// Variable names for output columns.
268    output_vars: Vec<String>,
269}
270
271#[cfg(feature = "rdf")]
272impl TripleScanSource {
273    /// Create a new triple scan source.
274    ///
275    /// # Arguments
276    /// * `triples` - The triples to scan (subject, predicate, object as Values)
277    /// * `output_vars` - Names of variables to bind (typically ["s", "p", "o"] or a subset)
278    pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
279        Self {
280            triples,
281            position: 0,
282            output_vars,
283        }
284    }
285
286    /// Create from an RDF store query result.
287    pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
288    where
289        I: IntoIterator<Item = (Value, Value, Value)>,
290    {
291        Self::new(iter.into_iter().collect(), output_vars)
292    }
293
294    /// Returns the number of remaining triples.
295    pub fn remaining(&self) -> usize {
296        self.triples.len().saturating_sub(self.position)
297    }
298}
299
300#[cfg(feature = "rdf")]
301impl Source for TripleScanSource {
302    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
303        if self.position >= self.triples.len() {
304            return Ok(None);
305        }
306
307        let end = (self.position + chunk_size).min(self.triples.len());
308        let slice = &self.triples[self.position..end];
309
310        // Create columns for subject, predicate, object
311        let mut subjects = Vec::with_capacity(slice.len());
312        let mut predicates = Vec::with_capacity(slice.len());
313        let mut objects = Vec::with_capacity(slice.len());
314
315        for (s, p, o) in slice {
316            subjects.push(s.clone());
317            predicates.push(p.clone());
318            objects.push(o.clone());
319        }
320
321        let mut columns = Vec::with_capacity(3);
322
323        // Only include columns for requested variables
324        for var in &self.output_vars {
325            match var.as_str() {
326                "s" | "subject" => columns.push(ValueVector::from_values(&subjects)),
327                "p" | "predicate" => columns.push(ValueVector::from_values(&predicates)),
328                "o" | "object" => columns.push(ValueVector::from_values(&objects)),
329                _ => {
330                    // For other variable names, we need to determine which position
331                    // they refer to based on the query pattern
332                    // For now, include all three columns if unknown
333                    if columns.is_empty() {
334                        columns.push(ValueVector::from_values(&subjects));
335                        columns.push(ValueVector::from_values(&predicates));
336                        columns.push(ValueVector::from_values(&objects));
337                    }
338                }
339            }
340        }
341
342        // If no columns were added, include all
343        if columns.is_empty() {
344            columns.push(ValueVector::from_values(&subjects));
345            columns.push(ValueVector::from_values(&predicates));
346            columns.push(ValueVector::from_values(&objects));
347        }
348
349        self.position = end;
350        Ok(Some(DataChunk::new(columns)))
351    }
352
353    fn reset(&mut self) {
354        self.position = 0;
355    }
356
357    fn name(&self) -> &'static str {
358        "TripleScanSource"
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_vector_source_single_chunk() {
368        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
369        let mut source = VectorSource::single_column(values);
370
371        let chunk = source.next_chunk(10).unwrap().unwrap();
372        assert_eq!(chunk.len(), 3);
373
374        let next = source.next_chunk(10).unwrap();
375        assert!(next.is_none());
376    }
377
378    #[test]
379    fn test_vector_source_chunked() {
380        let values: Vec<Value> = (0..10).map(Value::Int64).collect();
381        let mut source = VectorSource::single_column(values);
382
383        let chunk1 = source.next_chunk(3).unwrap().unwrap();
384        assert_eq!(chunk1.len(), 3);
385
386        let chunk2 = source.next_chunk(3).unwrap().unwrap();
387        assert_eq!(chunk2.len(), 3);
388
389        let chunk3 = source.next_chunk(3).unwrap().unwrap();
390        assert_eq!(chunk3.len(), 3);
391
392        let chunk4 = source.next_chunk(3).unwrap().unwrap();
393        assert_eq!(chunk4.len(), 1); // Remaining row
394
395        let none = source.next_chunk(3).unwrap();
396        assert!(none.is_none());
397    }
398
399    #[test]
400    fn test_vector_source_reset() {
401        let values = vec![Value::Int64(1), Value::Int64(2)];
402        let mut source = VectorSource::single_column(values);
403
404        let _ = source.next_chunk(10).unwrap();
405        assert!(source.next_chunk(10).unwrap().is_none());
406
407        source.reset();
408        let chunk = source.next_chunk(10).unwrap().unwrap();
409        assert_eq!(chunk.len(), 2);
410    }
411
412    #[test]
413    fn test_empty_source() {
414        let mut source = EmptySource::new();
415        assert!(source.next_chunk(100).unwrap().is_none());
416    }
417
418    #[test]
419    fn test_chunk_source() {
420        let v1 = ValueVector::from_values(&[Value::Int64(1), Value::Int64(2)]);
421        let chunk1 = DataChunk::new(vec![v1]);
422
423        let v2 = ValueVector::from_values(&[Value::Int64(3), Value::Int64(4)]);
424        let chunk2 = DataChunk::new(vec![v2]);
425
426        let mut source = ChunkSource::new(vec![chunk1, chunk2]);
427
428        let c1 = source.next_chunk(100).unwrap().unwrap();
429        assert_eq!(c1.len(), 2);
430
431        let c2 = source.next_chunk(100).unwrap().unwrap();
432        assert_eq!(c2.len(), 2);
433
434        assert!(source.next_chunk(100).unwrap().is_none());
435    }
436
437    #[test]
438    fn test_generator_source() {
439        let mut source = GeneratorSource::new(|i| {
440            if i < 5 {
441                Some(vec![Value::Int64(i as i64)])
442            } else {
443                None
444            }
445        });
446
447        let chunk1 = source.next_chunk(2).unwrap().unwrap();
448        assert_eq!(chunk1.len(), 2);
449
450        let chunk2 = source.next_chunk(2).unwrap().unwrap();
451        assert_eq!(chunk2.len(), 2);
452
453        let chunk3 = source.next_chunk(2).unwrap().unwrap();
454        assert_eq!(chunk3.len(), 1);
455
456        assert!(source.next_chunk(2).unwrap().is_none());
457    }
458}