Skip to main content

grafeo_core/execution/operators/
parameter_scan.rs

1//! Parameter scan operator for correlated subqueries.
2//!
3//! Provides a single-row DataChunk of values injected from an outer scope
4//! (e.g., the Apply operator). Used as the leaf of inner plans in CALL
5//! {subquery} and pattern comprehensions.
6
7use std::sync::Arc;
8
9use grafeo_common::types::LogicalType;
10use parking_lot::Mutex;
11
12use super::{DataChunk, Operator, OperatorResult};
13use crate::execution::vector::ValueVector;
14use grafeo_common::types::Value;
15
16/// Shared state between [`ApplyOperator`](super::ApplyOperator) and [`ParameterScanOperator`].
17///
18/// The Apply operator writes the current outer row values here before
19/// executing the inner plan. The ParameterScan reads them as its output.
20#[derive(Debug)]
21pub struct ParameterState {
22    /// Column names for the injected parameters.
23    pub columns: Vec<String>,
24    /// Current row values (set by Apply before each inner execution).
25    values: Mutex<Option<Vec<Value>>>,
26}
27
28impl ParameterState {
29    /// Creates a new parameter state for the given column names.
30    #[must_use]
31    pub fn new(columns: Vec<String>) -> Self {
32        Self {
33            columns,
34            values: Mutex::new(None),
35        }
36    }
37
38    /// Sets the current parameter values (called by the Apply operator).
39    pub fn set_values(&self, values: Vec<Value>) {
40        *self.values.lock() = Some(values);
41    }
42
43    /// Clears the current parameter values.
44    pub fn clear(&self) {
45        *self.values.lock() = None;
46    }
47
48    /// Takes the current parameter values.
49    fn take_values(&self) -> Option<Vec<Value>> {
50        self.values.lock().take()
51    }
52}
53
54/// Operator that emits a single row from externally injected parameter values.
55///
56/// This is the leaf operator for inner plans in correlated subqueries.
57/// The [`ApplyOperator`](super::ApplyOperator) sets parameter values via the shared [`ParameterState`]
58/// before each inner plan execution.
59pub struct ParameterScanOperator {
60    state: Arc<ParameterState>,
61    emitted: bool,
62}
63
64impl ParameterScanOperator {
65    /// Creates a new parameter scan operator.
66    #[must_use]
67    pub fn new(state: Arc<ParameterState>) -> Self {
68        Self {
69            state,
70            emitted: false,
71        }
72    }
73
74    /// Returns the shared parameter state (for wiring with Apply).
75    #[must_use]
76    pub fn state(&self) -> &Arc<ParameterState> {
77        &self.state
78    }
79}
80
81impl Operator for ParameterScanOperator {
82    fn next(&mut self) -> OperatorResult {
83        if self.emitted {
84            return Ok(None);
85        }
86        self.emitted = true;
87
88        let Some(values) = self.state.take_values() else {
89            return Ok(None);
90        };
91
92        // Build a single-row DataChunk with one column per parameter
93        let columns: Vec<ValueVector> = values
94            .into_iter()
95            .map(|val| {
96                let mut col = ValueVector::with_capacity(LogicalType::Any, 1);
97                col.push_value(val);
98                col
99            })
100            .collect();
101
102        if columns.is_empty() {
103            return Ok(None);
104        }
105
106        Ok(Some(DataChunk::new(columns)))
107    }
108
109    fn reset(&mut self) {
110        self.emitted = false;
111    }
112
113    fn name(&self) -> &'static str {
114        "ParameterScan"
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_parameter_scan_emits_single_row() {
124        let state = Arc::new(ParameterState::new(vec!["x".to_string(), "y".to_string()]));
125        let mut op = ParameterScanOperator::new(Arc::clone(&state));
126
127        // Set values
128        state.set_values(vec![Value::Int64(42), Value::String("hello".into())]);
129
130        // First call: should emit the row
131        let chunk = op.next().unwrap().expect("should emit a chunk");
132        assert_eq!(chunk.row_count(), 1);
133        assert_eq!(chunk.num_columns(), 2);
134        assert_eq!(
135            chunk.column(0).unwrap().get_value(0),
136            Some(Value::Int64(42))
137        );
138
139        // Second call: should be exhausted
140        assert!(op.next().unwrap().is_none());
141    }
142
143    #[test]
144    fn test_parameter_scan_reset() {
145        let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
146        let mut op = ParameterScanOperator::new(Arc::clone(&state));
147
148        state.set_values(vec![Value::Int64(1)]);
149        let _ = op.next().unwrap();
150        assert!(op.next().unwrap().is_none());
151
152        // Reset and set new values
153        op.reset();
154        state.set_values(vec![Value::Int64(2)]);
155        let chunk = op.next().unwrap().expect("should emit after reset");
156        assert_eq!(chunk.column(0).unwrap().get_value(0), Some(Value::Int64(2)));
157    }
158
159    #[test]
160    fn test_parameter_scan_no_values() {
161        let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
162        let mut op = ParameterScanOperator::new(Arc::clone(&state));
163
164        // No values set: should return None
165        assert!(op.next().unwrap().is_none());
166    }
167}