Skip to main content

grafeo_core/execution/
source.rs

1//! Source implementations for push-based execution.
2//!
3//! Sources produce chunks of data that flow through push-based pipelines.
4
5use super::chunk::DataChunk;
6use super::operators::{Operator, OperatorError};
7use super::pipeline::Source;
8use super::vector::ValueVector;
9use grafeo_common::types::{NodeId, Value};
10
11/// Adapts a pull-based operator to work as a Source.
12///
13/// This allows gradual migration from pull to push model by wrapping
14/// existing operators as sources for push pipelines.
15pub struct OperatorSource {
16    operator: Box<dyn Operator>,
17}
18
19impl OperatorSource {
20    /// Create a new operator source.
21    pub fn new(operator: Box<dyn Operator>) -> Self {
22        Self { operator }
23    }
24}
25
26impl Source for OperatorSource {
27    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
28        // Pull-based operators produce their own chunk sizes,
29        // so we ignore the requested chunk_size
30        self.operator.next()
31    }
32
33    fn reset(&mut self) {
34        self.operator.reset();
35    }
36
37    fn name(&self) -> &'static str {
38        "OperatorSource"
39    }
40}
41
42/// Source that produces chunks from a vector of values.
43///
44/// Useful for testing and for materializing intermediate results.
45pub struct VectorSource {
46    values: Vec<Vec<Value>>,
47    position: usize,
48    num_columns: usize,
49}
50
51impl VectorSource {
52    /// Create a new vector source from column data.
53    pub fn new(columns: Vec<Vec<Value>>) -> Self {
54        let num_columns = columns.len();
55        Self {
56            values: columns,
57            position: 0,
58            num_columns,
59        }
60    }
61
62    /// Create a single-column source.
63    pub fn single_column(values: Vec<Value>) -> Self {
64        Self::new(vec![values])
65    }
66
67    /// Create from node IDs.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if any `NodeId` exceeds the `i64` range.
72    pub fn from_node_ids(ids: Vec<NodeId>) -> Result<Self, OperatorError> {
73        let values: Vec<Value> = ids
74            .into_iter()
75            .map(|id| {
76                let signed = i64::try_from(id.0).map_err(|_| {
77                    OperatorError::Execution(format!("NodeId {} exceeds i64 range", id.0))
78                })?;
79                Ok(Value::Int64(signed))
80            })
81            .collect::<Result<Vec<_>, OperatorError>>()?;
82        Ok(Self::single_column(values))
83    }
84}
85
86impl Source for VectorSource {
87    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
88        if self.num_columns == 0 || self.values[0].is_empty() {
89            return Ok(None);
90        }
91
92        let total_rows = self.values[0].len();
93        if self.position >= total_rows {
94            return Ok(None);
95        }
96
97        let end = (self.position + chunk_size).min(total_rows);
98        let mut columns = Vec::with_capacity(self.num_columns);
99
100        for col_values in &self.values {
101            let slice = &col_values[self.position..end];
102            let vector = ValueVector::from_values(slice);
103            columns.push(vector);
104        }
105
106        self.position = end;
107        Ok(Some(DataChunk::new(columns)))
108    }
109
110    fn reset(&mut self) {
111        self.position = 0;
112    }
113
114    fn name(&self) -> &'static str {
115        "VectorSource"
116    }
117}
118
119/// Source that produces a single empty chunk (for testing).
120pub struct EmptySource;
121
122impl EmptySource {
123    /// Create a new empty source.
124    pub fn new() -> Self {
125        Self
126    }
127}
128
129impl Default for EmptySource {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl Source for EmptySource {
136    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
137        Ok(None)
138    }
139
140    fn reset(&mut self) {}
141
142    fn name(&self) -> &'static str {
143        "EmptySource"
144    }
145}
146
147/// Source that produces chunks from a pre-built collection.
148///
149/// Takes ownership of DataChunks and produces them one at a time.
150pub struct ChunkSource {
151    chunks: Vec<DataChunk>,
152    position: usize,
153}
154
155impl ChunkSource {
156    /// Create a new chunk source.
157    pub fn new(chunks: Vec<DataChunk>) -> Self {
158        Self {
159            chunks,
160            position: 0,
161        }
162    }
163
164    /// Create from a single chunk.
165    pub fn single(chunk: DataChunk) -> Self {
166        Self::new(vec![chunk])
167    }
168}
169
170impl Source for ChunkSource {
171    fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
172        if self.position >= self.chunks.len() {
173            return Ok(None);
174        }
175
176        let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
177        self.position += 1;
178        Ok(Some(chunk))
179    }
180
181    fn reset(&mut self) {
182        // Cannot reset since chunks were moved out
183        self.position = 0;
184    }
185
186    fn name(&self) -> &'static str {
187        "ChunkSource"
188    }
189}
190
191/// Source that generates values using a closure.
192///
193/// Useful for generating test data or for lazy evaluation.
194pub struct GeneratorSource<F>
195where
196    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
197{
198    generator: F,
199    row_index: usize,
200    exhausted: bool,
201}
202
203impl<F> GeneratorSource<F>
204where
205    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
206{
207    /// Create a new generator source.
208    pub fn new(generator: F) -> Self {
209        Self {
210            generator,
211            row_index: 0,
212            exhausted: false,
213        }
214    }
215}
216
217impl<F> Source for GeneratorSource<F>
218where
219    F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
220{
221    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
222        if self.exhausted {
223            return Ok(None);
224        }
225
226        let mut rows: Vec<Vec<Value>> = Vec::with_capacity(chunk_size);
227
228        for _ in 0..chunk_size {
229            if let Some(row) = (self.generator)(self.row_index) {
230                rows.push(row);
231                self.row_index += 1;
232            } else {
233                self.exhausted = true;
234                break;
235            }
236        }
237
238        if rows.is_empty() {
239            return Ok(None);
240        }
241
242        // Transpose rows into columns
243        let num_columns = rows[0].len();
244        let mut columns: Vec<ValueVector> = (0..num_columns).map(|_| ValueVector::new()).collect();
245
246        for row in rows {
247            for (col_idx, val) in row.into_iter().enumerate() {
248                if col_idx < columns.len() {
249                    columns[col_idx].push(val);
250                }
251            }
252        }
253
254        Ok(Some(DataChunk::new(columns)))
255    }
256
257    fn reset(&mut self) {
258        self.row_index = 0;
259        self.exhausted = false;
260    }
261
262    fn name(&self) -> &'static str {
263        "GeneratorSource"
264    }
265}
266
267/// Source that scans RDF triples matching a pattern.
268///
269/// Produces chunks with columns for subject, predicate, and object.
270#[cfg(feature = "triple-store")]
271pub struct TripleScanSource {
272    /// The triples to scan (materialized for simplicity).
273    triples: Vec<(Value, Value, Value)>,
274    /// Current position in the triples.
275    position: usize,
276    /// Variable names for output columns.
277    output_vars: Vec<String>,
278}
279
280#[cfg(feature = "triple-store")]
281impl TripleScanSource {
282    /// Create a new triple scan source.
283    ///
284    /// # Arguments
285    /// * `triples` - The triples to scan (subject, predicate, object as Values)
286    /// * `output_vars` - Names of variables to bind (typically ["s", "p", "o"] or a subset)
287    pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
288        Self {
289            triples,
290            position: 0,
291            output_vars,
292        }
293    }
294
295    /// Create from an RDF store query result.
296    pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
297    where
298        I: IntoIterator<Item = (Value, Value, Value)>,
299    {
300        Self::new(iter.into_iter().collect(), output_vars)
301    }
302
303    /// Returns the number of remaining triples.
304    pub fn remaining(&self) -> usize {
305        self.triples.len().saturating_sub(self.position)
306    }
307}
308
309#[cfg(feature = "triple-store")]
310impl Source for TripleScanSource {
311    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312        if self.position >= self.triples.len() {
313            return Ok(None);
314        }
315
316        let end = (self.position + chunk_size).min(self.triples.len());
317        let slice = &self.triples[self.position..end];
318
319        // Create columns for subject, predicate, object
320        let mut subjects = Vec::with_capacity(slice.len());
321        let mut predicates = Vec::with_capacity(slice.len());
322        let mut objects = Vec::with_capacity(slice.len());
323
324        for (s, p, o) in slice {
325            subjects.push(s.clone());
326            predicates.push(p.clone());
327            objects.push(o.clone());
328        }
329
330        let mut columns = Vec::with_capacity(3);
331
332        // Only include columns for requested variables
333        for var in &self.output_vars {
334            match var.as_str() {
335                "s" | "subject" => columns.push(ValueVector::from_values(&subjects)),
336                "p" | "predicate" => columns.push(ValueVector::from_values(&predicates)),
337                "o" | "object" => columns.push(ValueVector::from_values(&objects)),
338                _ => {
339                    // For other variable names, we need to determine which position
340                    // they refer to based on the query pattern
341                    // For now, include all three columns if unknown
342                    if columns.is_empty() {
343                        columns.push(ValueVector::from_values(&subjects));
344                        columns.push(ValueVector::from_values(&predicates));
345                        columns.push(ValueVector::from_values(&objects));
346                    }
347                }
348            }
349        }
350
351        // If no columns were added, include all
352        if columns.is_empty() {
353            columns.push(ValueVector::from_values(&subjects));
354            columns.push(ValueVector::from_values(&predicates));
355            columns.push(ValueVector::from_values(&objects));
356        }
357
358        self.position = end;
359        Ok(Some(DataChunk::new(columns)))
360    }
361
362    fn reset(&mut self) {
363        self.position = 0;
364    }
365
366    fn name(&self) -> &'static str {
367        "TripleScanSource"
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_vector_source_single_chunk() {
377        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
378        let mut source = VectorSource::single_column(values);
379
380        let chunk = source.next_chunk(10).unwrap().unwrap();
381        assert_eq!(chunk.len(), 3);
382
383        let next = source.next_chunk(10).unwrap();
384        assert!(next.is_none());
385    }
386
387    #[test]
388    fn test_vector_source_chunked() {
389        let values: Vec<Value> = (0..10).map(Value::Int64).collect();
390        let mut source = VectorSource::single_column(values);
391
392        let chunk1 = source.next_chunk(3).unwrap().unwrap();
393        assert_eq!(chunk1.len(), 3);
394
395        let chunk2 = source.next_chunk(3).unwrap().unwrap();
396        assert_eq!(chunk2.len(), 3);
397
398        let chunk3 = source.next_chunk(3).unwrap().unwrap();
399        assert_eq!(chunk3.len(), 3);
400
401        let chunk4 = source.next_chunk(3).unwrap().unwrap();
402        assert_eq!(chunk4.len(), 1); // Remaining row
403
404        let none = source.next_chunk(3).unwrap();
405        assert!(none.is_none());
406    }
407
408    #[test]
409    fn test_vector_source_reset() {
410        let values = vec![Value::Int64(1), Value::Int64(2)];
411        let mut source = VectorSource::single_column(values);
412
413        let _ = source.next_chunk(10).unwrap();
414        assert!(source.next_chunk(10).unwrap().is_none());
415
416        source.reset();
417        let chunk = source.next_chunk(10).unwrap().unwrap();
418        assert_eq!(chunk.len(), 2);
419    }
420
421    #[test]
422    fn test_empty_source() {
423        let mut source = EmptySource::new();
424        assert!(source.next_chunk(100).unwrap().is_none());
425    }
426
427    #[test]
428    fn test_chunk_source() {
429        let v1 = ValueVector::from_values(&[Value::Int64(1), Value::Int64(2)]);
430        let chunk1 = DataChunk::new(vec![v1]);
431
432        let v2 = ValueVector::from_values(&[Value::Int64(3), Value::Int64(4)]);
433        let chunk2 = DataChunk::new(vec![v2]);
434
435        let mut source = ChunkSource::new(vec![chunk1, chunk2]);
436
437        let c1 = source.next_chunk(100).unwrap().unwrap();
438        assert_eq!(c1.len(), 2);
439
440        let c2 = source.next_chunk(100).unwrap().unwrap();
441        assert_eq!(c2.len(), 2);
442
443        assert!(source.next_chunk(100).unwrap().is_none());
444    }
445
446    #[test]
447    // reason: test values 0..5 fit i64
448    #[allow(clippy::cast_possible_wrap)]
449    fn test_generator_source() {
450        let mut source = GeneratorSource::new(|i| {
451            if i < 5 {
452                Some(vec![Value::Int64(i as i64)])
453            } else {
454                None
455            }
456        });
457
458        let chunk1 = source.next_chunk(2).unwrap().unwrap();
459        assert_eq!(chunk1.len(), 2);
460
461        let chunk2 = source.next_chunk(2).unwrap().unwrap();
462        assert_eq!(chunk2.len(), 2);
463
464        let chunk3 = source.next_chunk(2).unwrap().unwrap();
465        assert_eq!(chunk3.len(), 1);
466
467        assert!(source.next_chunk(2).unwrap().is_none());
468    }
469}