use std::sync::Arc;
use grafeo_common::types::LogicalType;
use parking_lot::Mutex;
use super::{DataChunk, Operator, OperatorResult};
use crate::execution::vector::ValueVector;
use grafeo_common::types::Value;
#[derive(Debug)]
pub struct ParameterState {
pub columns: Vec<String>,
values: Mutex<Option<Vec<Value>>>,
}
impl ParameterState {
#[must_use]
pub fn new(columns: Vec<String>) -> Self {
Self {
columns,
values: Mutex::new(None),
}
}
pub fn set_values(&self, values: Vec<Value>) {
*self.values.lock() = Some(values);
}
pub fn clear(&self) {
*self.values.lock() = None;
}
fn take_values(&self) -> Option<Vec<Value>> {
self.values.lock().take()
}
}
pub struct ParameterScanOperator {
state: Arc<ParameterState>,
emitted: bool,
}
impl ParameterScanOperator {
#[must_use]
pub fn new(state: Arc<ParameterState>) -> Self {
Self {
state,
emitted: false,
}
}
#[must_use]
pub fn state(&self) -> &Arc<ParameterState> {
&self.state
}
}
impl Operator for ParameterScanOperator {
fn next(&mut self) -> OperatorResult {
if self.emitted {
return Ok(None);
}
self.emitted = true;
let Some(values) = self.state.take_values() else {
return Ok(None);
};
let columns: Vec<ValueVector> = values
.into_iter()
.map(|val| {
let mut col = ValueVector::with_capacity(LogicalType::Any, 1);
col.push_value(val);
col
})
.collect();
if columns.is_empty() {
return Ok(None);
}
Ok(Some(DataChunk::new(columns)))
}
fn reset(&mut self) {
self.emitted = false;
}
fn name(&self) -> &'static str {
"ParameterScan"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parameter_scan_emits_single_row() {
let state = Arc::new(ParameterState::new(vec!["x".to_string(), "y".to_string()]));
let mut op = ParameterScanOperator::new(Arc::clone(&state));
state.set_values(vec![Value::Int64(42), Value::String("hello".into())]);
let chunk = op.next().unwrap().expect("should emit a chunk");
assert_eq!(chunk.row_count(), 1);
assert_eq!(chunk.num_columns(), 2);
assert_eq!(
chunk.column(0).unwrap().get_value(0),
Some(Value::Int64(42))
);
assert!(op.next().unwrap().is_none());
}
#[test]
fn test_parameter_scan_reset() {
let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
let mut op = ParameterScanOperator::new(Arc::clone(&state));
state.set_values(vec![Value::Int64(1)]);
let _ = op.next().unwrap();
assert!(op.next().unwrap().is_none());
op.reset();
state.set_values(vec![Value::Int64(2)]);
let chunk = op.next().unwrap().expect("should emit after reset");
assert_eq!(chunk.column(0).unwrap().get_value(0), Some(Value::Int64(2)));
}
#[test]
fn test_parameter_scan_no_values() {
let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
let mut op = ParameterScanOperator::new(Arc::clone(&state));
assert!(op.next().unwrap().is_none());
}
#[test]
fn test_parameter_scan_into_any() {
let state = Arc::new(ParameterState::new(vec!["x".to_string()]));
let op = ParameterScanOperator::new(state);
let any = Box::new(op).into_any();
assert!(any.downcast::<ParameterScanOperator>().is_ok());
}
}