use crate::{Result, Tuple, Schema};
use super::{PhysicalOperator, TimeoutContext};
use std::sync::Arc;
pub struct ProjectOperator {
input: Box<dyn PhysicalOperator>,
exprs: Vec<crate::sql::LogicalExpr>,
aliases: Vec<String>,
output_schema: Arc<Schema>,
evaluator: crate::sql::Evaluator,
distinct: bool,
seen: std::collections::HashSet<Vec<u8>>,
timeout_ctx: Option<TimeoutContext>,
distinct_on_exprs: Option<Vec<crate::sql::LogicalExpr>>,
}
impl ProjectOperator {
pub fn new(
input: Box<dyn PhysicalOperator>,
exprs: Vec<crate::sql::LogicalExpr>,
aliases: Vec<String>,
distinct: bool,
parameters: Vec<crate::Value>,
) -> Self {
Self::new_with_distinct_on(input, exprs, aliases, distinct, None, parameters)
}
pub fn new_with_distinct_on(
input: Box<dyn PhysicalOperator>,
exprs: Vec<crate::sql::LogicalExpr>,
aliases: Vec<String>,
distinct: bool,
distinct_on: Option<Vec<crate::sql::LogicalExpr>>,
parameters: Vec<crate::Value>,
) -> Self {
let input_schema = input.schema();
use crate::sql::TypeInference;
let columns = aliases.iter()
.zip(exprs.iter())
.map(|(alias, expr)| {
let data_type = expr.infer_type(&input_schema)
.unwrap_or(crate::DataType::Text);
crate::Column {
name: alias.clone(),
data_type,
nullable: true,
primary_key: false,
source_table: None,
source_table_name: None,
default_expr: None,
unique: false,
storage_mode: crate::ColumnStorageMode::Default,
}
})
.collect();
let output_schema = Arc::new(Schema { columns });
let evaluator = crate::sql::Evaluator::with_parameters(input_schema, parameters);
Self {
input,
exprs,
aliases,
output_schema,
evaluator,
distinct,
seen: std::collections::HashSet::new(),
timeout_ctx: None,
distinct_on_exprs: distinct_on,
}
}
pub fn with_timeout(mut self, timeout_ctx: Option<TimeoutContext>) -> Self {
self.timeout_ctx = timeout_ctx;
self
}
}
impl PhysicalOperator for ProjectOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
loop {
match self.input.next()? {
None => return Ok(None),
Some(tuple) => {
let output_values: Result<Vec<crate::Value>> = self.exprs.iter()
.map(|expr| self.evaluator.evaluate(expr, &tuple))
.collect();
let mut output_tuple = Tuple::new(output_values?);
output_tuple.row_id = tuple.row_id;
if let Some(ref distinct_on_exprs) = self.distinct_on_exprs {
let key_values: Result<Vec<crate::Value>> = distinct_on_exprs.iter()
.map(|expr| self.evaluator.evaluate(expr, &tuple))
.collect();
let key = bincode::serialize(&key_values?)
.map_err(|e| crate::Error::query_execution(
format!("Failed to serialize DISTINCT ON key: {}", e)
))?;
if self.seen.contains(&key) {
continue;
}
self.seen.insert(key);
} else if self.distinct {
let serialized = bincode::serialize(&output_tuple.values)
.map_err(|e| crate::Error::query_execution(
format!("Failed to serialize tuple for DISTINCT: {}", e)
))?;
if self.seen.contains(&serialized) {
continue;
}
self.seen.insert(serialized);
}
return Ok(Some(output_tuple));
}
}
}
}
fn schema(&self) -> Arc<Schema> {
self.output_schema.clone()
}
}
pub struct LimitOperator {
input: Box<dyn PhysicalOperator>,
limit: usize,
offset: usize,
skipped: usize,
returned: usize,
timeout_ctx: Option<TimeoutContext>,
}
impl LimitOperator {
pub fn new(input: Box<dyn PhysicalOperator>, limit: usize, offset: usize) -> Self {
Self {
input,
limit,
offset,
skipped: 0,
returned: 0,
timeout_ctx: None,
}
}
pub fn with_timeout(mut self, timeout_ctx: Option<TimeoutContext>) -> Self {
self.timeout_ctx = timeout_ctx;
self
}
}
impl PhysicalOperator for LimitOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
while self.skipped < self.offset {
match self.input.next()? {
None => return Ok(None),
Some(_) => {
self.skipped += 1;
}
}
}
if self.returned >= self.limit {
return Ok(None);
}
match self.input.next()? {
None => Ok(None),
Some(tuple) => {
self.returned += 1;
Ok(Some(tuple))
}
}
}
fn schema(&self) -> Arc<Schema> {
self.input.schema()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::Column;
use crate::DataType;
use crate::sql::executor::ScanOperator;
#[test]
fn test_limit_operator() {
let schema = Arc::new(Schema {
columns: vec![Column {
name: "id".to_string(),
data_type: DataType::Int4,
nullable: false,
primary_key: true,
source_table: None,
source_table_name: None,
default_expr: None,
unique: false,
storage_mode: crate::ColumnStorageMode::Default,
}],
});
let scan = ScanOperator::new("test".to_string(), schema.clone(), None, Vec::new(), Vec::new());
let mut limit = LimitOperator::new(Box::new(scan), 10, 0);
assert!(limit.next().expect("Failed to execute limit").is_none());
}
}