Skip to main content

grafeo_core/execution/operators/
parameter_scan.rs

1//! Parameter scan operator for correlated subqueries.
2//!
3//! Provides a single-row DataChunk of values injected from an outer scope
4//! (e.g., the Apply operator). Used as the leaf of inner plans in CALL
5//! {subquery} and pattern comprehensions.
6
7use std::sync::Arc;
8
9use grafeo_common::types::LogicalType;
10use parking_lot::Mutex;
11
12use super::{DataChunk, Operator, OperatorResult};
13use crate::execution::vector::ValueVector;
14use grafeo_common::types::Value;
15
16/// Shared state between [`ApplyOperator`](super::ApplyOperator) and [`ParameterScanOperator`].
17///
18/// The Apply operator writes the current outer row values here before
19/// executing the inner plan. The ParameterScan reads them as its output.
20#[derive(Debug)]
21pub struct ParameterState {
22    /// Column names for the injected parameters.
23    pub columns: Vec<String>,
24    /// Current row values (set by Apply before each inner execution).
25    values: Mutex<Option<Vec<Value>>>,
26}
27
28impl ParameterState {
29    /// Creates a new parameter state for the given column names.
30    #[must_use]
31    pub fn new(columns: Vec<String>) -> Self {
32        Self {
33            columns,
34            values: Mutex::new(None),
35        }
36    }
37
38    /// Sets the current parameter values (called by the Apply operator).
39    pub fn set_values(&self, values: Vec<Value>) {
40        *self.values.lock() = Some(values);
41    }
42
43    /// Clears the current parameter values.
44    pub fn clear(&self) {
45        *self.values.lock() = None;
46    }
47
48    /// Takes the current parameter values.
49    fn take_values(&self) -> Option<Vec<Value>> {
50        self.values.lock().take()
51    }
52}
53
54/// Operator that emits a single row from externally injected parameter values.
55///
56/// This is the leaf operator for inner plans in correlated subqueries.
57/// The [`ApplyOperator`](super::ApplyOperator) sets parameter values via the shared [`ParameterState`]
58/// before each inner plan execution.
59pub struct ParameterScanOperator {
60    state: Arc<ParameterState>,
61    emitted: bool,
62}
63
64impl ParameterScanOperator {
65    /// Creates a new parameter scan operator.
66    #[must_use]
67    pub fn new(state: Arc<ParameterState>) -> Self {
68        Self {
69            state,
70            emitted: false,
71        }
72    }
73
74    /// Returns the shared parameter state (for wiring with Apply).
75    #[must_use]
76    pub fn state(&self) -> &Arc<ParameterState> {
77        &self.state
78    }
79}
80
81impl Operator for ParameterScanOperator {
82    fn next(&mut self) -> OperatorResult {
83        if self.emitted {
84            return Ok(None);
85        }
86        self.emitted = true;
87
88        let Some(values) = self.state.take_values() else {
89            return Ok(None);
90        };
91
92        // Build a single-row DataChunk with one column per parameter
93        let columns: Vec<ValueVector> = values
94            .into_iter()
95            .map(|val| {
96                let mut col = ValueVector::with_capacity(LogicalType::Any, 1);
97                col.push_value(val);
98                col
99            })
100            .collect();
101
102        if columns.is_empty() {
103            return Ok(None);
104        }
105
106        Ok(Some(DataChunk::new(columns)))
107    }
108
109    fn reset(&mut self) {
110        self.emitted = false;
111    }
112
113    fn name(&self) -> &'static str {
114        "ParameterScan"
115    }
116
117    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
118        self
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    #[test]
127    fn test_parameter_scan_emits_single_row() {
128        let state = Arc::new(ParameterState::new(vec!["x".to_string(), "y".to_string()]));
129        let mut op = ParameterScanOperator::new(Arc::clone(&state));
130
131        // Set values
132        state.set_values(vec![Value::Int64(42), Value::String("hello".into())]);
133
134        // First call: should emit the row
135        let chunk = op.next().unwrap().expect("should emit a chunk");
136        assert_eq!(chunk.row_count(), 1);
137        assert_eq!(chunk.num_columns(), 2);
138        assert_eq!(
139            chunk.column(0).unwrap().get_value(0),
140            Some(Value::Int64(42))
141        );
142
143        // Second call: should be exhausted
144        assert!(op.next().unwrap().is_none());
145    }
146
147    #[test]
148    fn test_parameter_scan_reset() {
149        let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
150        let mut op = ParameterScanOperator::new(Arc::clone(&state));
151
152        state.set_values(vec![Value::Int64(1)]);
153        let _ = op.next().unwrap();
154        assert!(op.next().unwrap().is_none());
155
156        // Reset and set new values
157        op.reset();
158        state.set_values(vec![Value::Int64(2)]);
159        let chunk = op.next().unwrap().expect("should emit after reset");
160        assert_eq!(chunk.column(0).unwrap().get_value(0), Some(Value::Int64(2)));
161    }
162
163    #[test]
164    fn test_parameter_scan_no_values() {
165        let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
166        let mut op = ParameterScanOperator::new(Arc::clone(&state));
167
168        // No values set: should return None
169        assert!(op.next().unwrap().is_none());
170    }
171
172    #[test]
173    fn test_parameter_scan_into_any() {
174        let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
175        let op = ParameterScanOperator::new(state);
176        let any = Box::new(op).into_any();
177        assert!(any.downcast::<ParameterScanOperator>().is_ok());
178    }
179}