grafeo_core/execution/operators/
parameter_scan.rs1use std::sync::Arc;
8
9use grafeo_common::types::LogicalType;
10use parking_lot::Mutex;
11
12use super::{DataChunk, Operator, OperatorResult};
13use crate::execution::vector::ValueVector;
14use grafeo_common::types::Value;
15
16#[derive(Debug)]
21pub struct ParameterState {
22 pub columns: Vec<String>,
24 values: Mutex<Option<Vec<Value>>>,
26}
27
28impl ParameterState {
29 #[must_use]
31 pub fn new(columns: Vec<String>) -> Self {
32 Self {
33 columns,
34 values: Mutex::new(None),
35 }
36 }
37
38 pub fn set_values(&self, values: Vec<Value>) {
40 *self.values.lock() = Some(values);
41 }
42
43 pub fn clear(&self) {
45 *self.values.lock() = None;
46 }
47
48 fn take_values(&self) -> Option<Vec<Value>> {
50 self.values.lock().take()
51 }
52}
53
54pub struct ParameterScanOperator {
60 state: Arc<ParameterState>,
61 emitted: bool,
62}
63
64impl ParameterScanOperator {
65 #[must_use]
67 pub fn new(state: Arc<ParameterState>) -> Self {
68 Self {
69 state,
70 emitted: false,
71 }
72 }
73
74 #[must_use]
76 pub fn state(&self) -> &Arc<ParameterState> {
77 &self.state
78 }
79}
80
81impl Operator for ParameterScanOperator {
82 fn next(&mut self) -> OperatorResult {
83 if self.emitted {
84 return Ok(None);
85 }
86 self.emitted = true;
87
88 let Some(values) = self.state.take_values() else {
89 return Ok(None);
90 };
91
92 let columns: Vec<ValueVector> = values
94 .into_iter()
95 .map(|val| {
96 let mut col = ValueVector::with_capacity(LogicalType::Any, 1);
97 col.push_value(val);
98 col
99 })
100 .collect();
101
102 if columns.is_empty() {
103 return Ok(None);
104 }
105
106 Ok(Some(DataChunk::new(columns)))
107 }
108
109 fn reset(&mut self) {
110 self.emitted = false;
111 }
112
113 fn name(&self) -> &'static str {
114 "ParameterScan"
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn test_parameter_scan_emits_single_row() {
124 let state = Arc::new(ParameterState::new(vec!["x".to_string(), "y".to_string()]));
125 let mut op = ParameterScanOperator::new(Arc::clone(&state));
126
127 state.set_values(vec![Value::Int64(42), Value::String("hello".into())]);
129
130 let chunk = op.next().unwrap().expect("should emit a chunk");
132 assert_eq!(chunk.row_count(), 1);
133 assert_eq!(chunk.num_columns(), 2);
134 assert_eq!(
135 chunk.column(0).unwrap().get_value(0),
136 Some(Value::Int64(42))
137 );
138
139 assert!(op.next().unwrap().is_none());
141 }
142
143 #[test]
144 fn test_parameter_scan_reset() {
145 let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
146 let mut op = ParameterScanOperator::new(Arc::clone(&state));
147
148 state.set_values(vec![Value::Int64(1)]);
149 let _ = op.next().unwrap();
150 assert!(op.next().unwrap().is_none());
151
152 op.reset();
154 state.set_values(vec![Value::Int64(2)]);
155 let chunk = op.next().unwrap().expect("should emit after reset");
156 assert_eq!(chunk.column(0).unwrap().get_value(0), Some(Value::Int64(2)));
157 }
158
159 #[test]
160 fn test_parameter_scan_no_values() {
161 let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
162 let mut op = ParameterScanOperator::new(Arc::clone(&state));
163
164 assert!(op.next().unwrap().is_none());
166 }
167}