grafeo_core/execution/operators/
parameter_scan.rs1use std::sync::Arc;
8
9use grafeo_common::types::LogicalType;
10use parking_lot::Mutex;
11
12use super::{DataChunk, Operator, OperatorResult};
13use crate::execution::vector::ValueVector;
14use grafeo_common::types::Value;
15
16#[derive(Debug)]
21pub struct ParameterState {
22 pub columns: Vec<String>,
24 values: Mutex<Option<Vec<Value>>>,
26}
27
28impl ParameterState {
29 #[must_use]
31 pub fn new(columns: Vec<String>) -> Self {
32 Self {
33 columns,
34 values: Mutex::new(None),
35 }
36 }
37
38 pub fn set_values(&self, values: Vec<Value>) {
40 *self.values.lock() = Some(values);
41 }
42
43 pub fn clear(&self) {
45 *self.values.lock() = None;
46 }
47
48 fn take_values(&self) -> Option<Vec<Value>> {
50 self.values.lock().take()
51 }
52}
53
54pub struct ParameterScanOperator {
60 state: Arc<ParameterState>,
61 emitted: bool,
62}
63
64impl ParameterScanOperator {
65 #[must_use]
67 pub fn new(state: Arc<ParameterState>) -> Self {
68 Self {
69 state,
70 emitted: false,
71 }
72 }
73
74 #[must_use]
76 pub fn state(&self) -> &Arc<ParameterState> {
77 &self.state
78 }
79}
80
81impl Operator for ParameterScanOperator {
82 fn next(&mut self) -> OperatorResult {
83 if self.emitted {
84 return Ok(None);
85 }
86 self.emitted = true;
87
88 let Some(values) = self.state.take_values() else {
89 return Ok(None);
90 };
91
92 let columns: Vec<ValueVector> = values
94 .into_iter()
95 .map(|val| {
96 let mut col = ValueVector::with_capacity(LogicalType::Any, 1);
97 col.push_value(val);
98 col
99 })
100 .collect();
101
102 if columns.is_empty() {
103 return Ok(None);
104 }
105
106 Ok(Some(DataChunk::new(columns)))
107 }
108
109 fn reset(&mut self) {
110 self.emitted = false;
111 }
112
113 fn name(&self) -> &'static str {
114 "ParameterScan"
115 }
116
117 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
118 self
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125
126 #[test]
127 fn test_parameter_scan_emits_single_row() {
128 let state = Arc::new(ParameterState::new(vec!["x".to_string(), "y".to_string()]));
129 let mut op = ParameterScanOperator::new(Arc::clone(&state));
130
131 state.set_values(vec![Value::Int64(42), Value::String("hello".into())]);
133
134 let chunk = op.next().unwrap().expect("should emit a chunk");
136 assert_eq!(chunk.row_count(), 1);
137 assert_eq!(chunk.num_columns(), 2);
138 assert_eq!(
139 chunk.column(0).unwrap().get_value(0),
140 Some(Value::Int64(42))
141 );
142
143 assert!(op.next().unwrap().is_none());
145 }
146
147 #[test]
148 fn test_parameter_scan_reset() {
149 let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
150 let mut op = ParameterScanOperator::new(Arc::clone(&state));
151
152 state.set_values(vec![Value::Int64(1)]);
153 let _ = op.next().unwrap();
154 assert!(op.next().unwrap().is_none());
155
156 op.reset();
158 state.set_values(vec![Value::Int64(2)]);
159 let chunk = op.next().unwrap().expect("should emit after reset");
160 assert_eq!(chunk.column(0).unwrap().get_value(0), Some(Value::Int64(2)));
161 }
162
163 #[test]
164 fn test_parameter_scan_no_values() {
165 let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
166 let mut op = ParameterScanOperator::new(Arc::clone(&state));
167
168 assert!(op.next().unwrap().is_none());
170 }
171
172 #[test]
173 fn test_parameter_scan_into_any() {
174 let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
175 let op = ParameterScanOperator::new(state);
176 let any = Box::new(op).into_any();
177 assert!(any.downcast::<ParameterScanOperator>().is_ok());
178 }
179}