use crate::{Result, Tuple, Value, Error};
use super::aggregation::{AggregateState, CountState, create_aggregate};
use std::collections::HashMap;
use std::cmp::Ordering;
pub trait Executor: Send {
fn open(&mut self) -> Result<()>;
fn next(&mut self) -> Result<Option<Tuple>>;
fn close(&mut self) -> Result<()>;
}
pub struct ScanExecutor {
tuples: Vec<Tuple>,
position: usize,
is_open: bool,
}
impl ScanExecutor {
pub fn new(tuples: Vec<Tuple>) -> Self {
Self {
tuples,
position: 0,
is_open: false,
}
}
}
#[allow(clippy::indexing_slicing)]
impl Executor for ScanExecutor {
fn open(&mut self) -> Result<()> {
self.position = 0;
self.is_open = true;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if !self.is_open {
return Err(Error::Generic("Executor not open".to_string()));
}
if self.position < self.tuples.len() {
let tuple = self.tuples[self.position].clone();
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.is_open = false;
Ok(())
}
}
pub type PredicateFn = Box<dyn Fn(&Tuple) -> bool + Send>;
pub struct FilterExecutor {
child: Box<dyn Executor>,
predicate: PredicateFn,
}
impl FilterExecutor {
pub fn new(child: Box<dyn Executor>, predicate: PredicateFn) -> Self {
Self { child, predicate }
}
}
impl Executor for FilterExecutor {
fn open(&mut self) -> Result<()> {
self.child.open()
}
fn next(&mut self) -> Result<Option<Tuple>> {
loop {
match self.child.next()? {
Some(tuple) => {
if (self.predicate)(&tuple) {
return Ok(Some(tuple));
}
}
None => return Ok(None),
}
}
}
fn close(&mut self) -> Result<()> {
self.child.close()
}
}
pub type ProjectFn = Box<dyn Fn(&Tuple) -> Tuple + Send>;
pub struct ProjectExecutor {
child: Box<dyn Executor>,
project: ProjectFn,
}
impl ProjectExecutor {
pub fn new(child: Box<dyn Executor>, project: ProjectFn) -> Self {
Self { child, project }
}
pub fn by_indices(child: Box<dyn Executor>, indices: Vec<usize>) -> Self {
let project: ProjectFn = Box::new(move |tuple| {
let values: Vec<Value> = indices
.iter()
.map(|&i| tuple.values.get(i).cloned().unwrap_or(Value::Null))
.collect();
Tuple::new(values)
});
Self { child, project }
}
}
impl Executor for ProjectExecutor {
fn open(&mut self) -> Result<()> {
self.child.open()
}
fn next(&mut self) -> Result<Option<Tuple>> {
match self.child.next()? {
Some(tuple) => Ok(Some((self.project)(&tuple))),
None => Ok(None),
}
}
fn close(&mut self) -> Result<()> {
self.child.close()
}
}
pub type JoinConditionFn = Box<dyn Fn(&Tuple, &Tuple) -> bool + Send>;
pub struct NestedLoopJoinExecutor {
left: Box<dyn Executor>,
right: Box<dyn Executor>,
condition: JoinConditionFn,
current_left: Option<Tuple>,
right_tuples: Vec<Tuple>,
right_position: usize,
right_materialized: bool,
}
impl NestedLoopJoinExecutor {
pub fn new(
left: Box<dyn Executor>,
right: Box<dyn Executor>,
condition: JoinConditionFn,
) -> Self {
Self {
left,
right,
condition,
current_left: None,
right_tuples: Vec::new(),
right_position: 0,
right_materialized: false,
}
}
}
#[allow(clippy::indexing_slicing)]
impl Executor for NestedLoopJoinExecutor {
fn open(&mut self) -> Result<()> {
self.left.open()?;
self.right.open()?;
self.current_left = None;
self.right_tuples.clear();
self.right_position = 0;
self.right_materialized = false;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if !self.right_materialized {
while let Some(tuple) = self.right.next()? {
self.right_tuples.push(tuple);
}
self.right_materialized = true;
}
loop {
if self.current_left.is_none() {
self.current_left = self.left.next()?;
self.right_position = 0;
}
let Some(left_tuple) = self.current_left.as_ref() else {
return Ok(None); };
while self.right_position < self.right_tuples.len() {
let right_tuple = &self.right_tuples[self.right_position];
self.right_position += 1;
if (self.condition)(left_tuple, right_tuple) {
let mut combined_values = left_tuple.values.clone();
combined_values.extend(right_tuple.values.clone());
return Ok(Some(Tuple::new(combined_values)));
}
}
self.current_left = None;
}
}
fn close(&mut self) -> Result<()> {
self.left.close()?;
self.right.close()?;
self.right_tuples.clear();
Ok(())
}
}
pub type KeyExtractorFn = Box<dyn Fn(&Tuple) -> Value + Send>;
pub struct HashJoinExecutor {
left: Box<dyn Executor>,
right: Box<dyn Executor>,
left_key: KeyExtractorFn,
right_key: KeyExtractorFn,
hash_table: HashMap<String, Vec<Tuple>>,
current_right: Option<Tuple>,
current_matches: Vec<Tuple>,
match_position: usize,
built: bool,
}
impl HashJoinExecutor {
pub fn new(
left: Box<dyn Executor>,
right: Box<dyn Executor>,
left_key: KeyExtractorFn,
right_key: KeyExtractorFn,
) -> Self {
Self {
left,
right,
left_key,
right_key,
hash_table: HashMap::new(),
current_right: None,
current_matches: Vec::new(),
match_position: 0,
built: false,
}
}
fn value_to_key(value: &Value) -> String {
format!("{:?}", value)
}
}
#[allow(clippy::indexing_slicing)]
impl Executor for HashJoinExecutor {
fn open(&mut self) -> Result<()> {
self.left.open()?;
self.right.open()?;
self.hash_table.clear();
self.current_right = None;
self.current_matches.clear();
self.match_position = 0;
self.built = false;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if !self.built {
while let Some(tuple) = self.left.next()? {
let key = Self::value_to_key(&(self.left_key)(&tuple));
self.hash_table
.entry(key)
.or_insert_with(Vec::new)
.push(tuple);
}
self.built = true;
}
loop {
if self.match_position < self.current_matches.len() {
let left_tuple = &self.current_matches[self.match_position];
self.match_position += 1;
let Some(right_tuple) = self.current_right.as_ref() else {
self.current_matches.clear();
continue;
};
let mut combined_values = left_tuple.values.clone();
combined_values.extend(right_tuple.values.clone());
return Ok(Some(Tuple::new(combined_values)));
}
match self.right.next()? {
Some(right_tuple) => {
let key = Self::value_to_key(&(self.right_key)(&right_tuple));
self.current_matches = self
.hash_table
.get(&key)
.cloned()
.unwrap_or_default();
self.match_position = 0;
self.current_right = Some(right_tuple);
}
None => return Ok(None),
}
}
}
fn close(&mut self) -> Result<()> {
self.left.close()?;
self.right.close()?;
self.hash_table.clear();
Ok(())
}
}
pub type GroupKeyFn = Box<dyn Fn(&Tuple) -> Vec<Value> + Send>;
pub struct AggregateSpec {
pub function_name: String,
pub column_index: Option<usize>,
}
pub struct AggregateExecutor {
child: Box<dyn Executor>,
group_key: GroupKeyFn,
aggregates: Vec<AggregateSpec>,
results: Vec<Tuple>,
position: usize,
computed: bool,
}
#[allow(clippy::indexing_slicing)]
impl AggregateExecutor {
pub fn new(
child: Box<dyn Executor>,
group_key: GroupKeyFn,
aggregates: Vec<AggregateSpec>,
) -> Self {
Self {
child,
group_key,
aggregates,
results: Vec::new(),
position: 0,
computed: false,
}
}
fn compute_aggregates(&mut self) -> Result<()> {
let mut groups: HashMap<String, (Vec<Value>, Vec<Box<dyn AggregateState>>)> = HashMap::new();
while let Some(tuple) = self.child.next()? {
let key_values = (self.group_key)(&tuple);
let key_string = format!("{:?}", key_values);
let entry = groups.entry(key_string).or_insert_with(|| {
let states: Vec<Box<dyn AggregateState>> = self
.aggregates
.iter()
.map(|spec| {
create_aggregate(&spec.function_name)
.map(|f| f.init_state())
.unwrap_or_else(|| {
Box::new(CountState::default())
})
})
.collect();
(key_values.clone(), states)
});
for (i, spec) in self.aggregates.iter().enumerate() {
let value = match spec.column_index {
Some(col_idx) => tuple.values.get(col_idx).cloned().unwrap_or(Value::Null),
None => Value::Int4(1), };
entry.1[i].accumulate(&value)?;
}
}
for (_, (key_values, states)) in groups {
let mut result_values = key_values;
for state in states {
result_values.push(state.finalize()?);
}
self.results.push(Tuple::new(result_values));
}
self.computed = true;
Ok(())
}
}
#[allow(clippy::indexing_slicing)]
impl Executor for AggregateExecutor {
fn open(&mut self) -> Result<()> {
self.child.open()?;
self.results.clear();
self.position = 0;
self.computed = false;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if !self.computed {
self.compute_aggregates()?;
}
if self.position < self.results.len() {
let tuple = self.results[self.position].clone();
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.child.close()?;
self.results.clear();
Ok(())
}
}
#[derive(Clone)]
pub struct SortKey {
pub column_index: usize,
pub ascending: bool,
pub nulls_first: bool,
}
pub struct SortExecutor {
child: Box<dyn Executor>,
sort_keys: Vec<SortKey>,
results: Vec<Tuple>,
position: usize,
sorted: bool,
}
impl SortExecutor {
pub fn new(child: Box<dyn Executor>, sort_keys: Vec<SortKey>) -> Self {
Self {
child,
sort_keys,
results: Vec::new(),
position: 0,
sorted: false,
}
}
fn compare_values(a: &Value, b: &Value) -> Ordering {
match (a, b) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Less,
(_, Value::Null) => Ordering::Greater,
(Value::Int2(x), Value::Int2(y)) => x.cmp(y),
(Value::Int4(x), Value::Int4(y)) => x.cmp(y),
(Value::Int8(x), Value::Int8(y)) => x.cmp(y),
(Value::Float4(x), Value::Float4(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
(Value::Float8(x), Value::Float8(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
(Value::String(x), Value::String(y)) => x.cmp(y),
(Value::Boolean(x), Value::Boolean(y)) => x.cmp(y),
_ => Ordering::Equal, }
}
fn compare_tuples(&self, a: &Tuple, b: &Tuple) -> Ordering {
for key in &self.sort_keys {
let a_val = a.values.get(key.column_index).unwrap_or(&Value::Null);
let b_val = b.values.get(key.column_index).unwrap_or(&Value::Null);
let (first_null, second_null) = if key.nulls_first {
(Ordering::Less, Ordering::Greater)
} else {
(Ordering::Greater, Ordering::Less)
};
let ordering = match (a_val, b_val) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => first_null,
(_, Value::Null) => second_null,
_ => Self::compare_values(a_val, b_val),
};
let ordering = if key.ascending {
ordering
} else {
ordering.reverse()
};
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
}
fn sort_results(&mut self) -> Result<()> {
while let Some(tuple) = self.child.next()? {
self.results.push(tuple);
}
let sort_keys = self.sort_keys.clone();
self.results.sort_by(|a, b| {
for key in &sort_keys {
let a_val = a.values.get(key.column_index).unwrap_or(&Value::Null);
let b_val = b.values.get(key.column_index).unwrap_or(&Value::Null);
let (first_null, second_null) = if key.nulls_first {
(Ordering::Less, Ordering::Greater)
} else {
(Ordering::Greater, Ordering::Less)
};
let ordering = match (a_val, b_val) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => first_null,
(_, Value::Null) => second_null,
_ => Self::compare_values(a_val, b_val),
};
let ordering = if key.ascending {
ordering
} else {
ordering.reverse()
};
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
});
self.sorted = true;
Ok(())
}
}
#[allow(clippy::indexing_slicing)]
impl Executor for SortExecutor {
fn open(&mut self) -> Result<()> {
self.child.open()?;
self.results.clear();
self.position = 0;
self.sorted = false;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if !self.sorted {
self.sort_results()?;
}
if self.position < self.results.len() {
let tuple = self.results[self.position].clone();
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.child.close()?;
self.results.clear();
Ok(())
}
}
pub struct LimitExecutor {
child: Box<dyn Executor>,
limit: Option<usize>,
offset: usize,
count: usize,
skipped: usize,
}
impl LimitExecutor {
pub fn new(child: Box<dyn Executor>, limit: Option<usize>, offset: usize) -> Self {
Self {
child,
limit,
offset,
count: 0,
skipped: 0,
}
}
}
impl Executor for LimitExecutor {
fn open(&mut self) -> Result<()> {
self.child.open()?;
self.count = 0;
self.skipped = 0;
Ok(())
}
fn next(&mut self) -> Result<Option<Tuple>> {
if let Some(limit) = self.limit {
if self.count >= limit {
return Ok(None);
}
}
while self.skipped < self.offset {
match self.child.next()? {
Some(_) => self.skipped += 1,
None => return Ok(None),
}
}
match self.child.next()? {
Some(tuple) => {
self.count += 1;
Ok(Some(tuple))
}
None => Ok(None),
}
}
fn close(&mut self) -> Result<()> {
self.child.close()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_tuples() -> Vec<Tuple> {
vec![
Tuple::new(vec![Value::Int4(1), Value::String("Alice".to_string()), Value::Int4(30)]),
Tuple::new(vec![Value::Int4(2), Value::String("Bob".to_string()), Value::Int4(25)]),
Tuple::new(vec![Value::Int4(3), Value::String("Charlie".to_string()), Value::Int4(35)]),
]
}
#[test]
fn test_scan_executor() {
let mut scan = ScanExecutor::new(test_tuples());
scan.open().unwrap();
let mut count = 0;
while scan.next().unwrap().is_some() {
count += 1;
}
assert_eq!(count, 3);
scan.close().unwrap();
}
#[test]
fn test_filter_executor() {
let scan = Box::new(ScanExecutor::new(test_tuples()));
let predicate: PredicateFn = Box::new(|tuple| {
matches!(&tuple.values[2], Value::Int4(age) if *age > 28)
});
let mut filter = FilterExecutor::new(scan, predicate);
filter.open().unwrap();
let mut count = 0;
while filter.next().unwrap().is_some() {
count += 1;
}
assert_eq!(count, 2);
filter.close().unwrap();
}
#[test]
fn test_project_executor() {
let scan = Box::new(ScanExecutor::new(test_tuples()));
let mut project = ProjectExecutor::by_indices(scan, vec![1]); project.open().unwrap();
let tuple = project.next().unwrap().unwrap();
assert_eq!(tuple.values.len(), 1);
assert_eq!(tuple.values[0], Value::String("Alice".to_string()));
project.close().unwrap();
}
#[test]
fn test_sort_executor() {
let scan = Box::new(ScanExecutor::new(test_tuples()));
let sort_keys = vec![SortKey {
column_index: 2, ascending: true,
nulls_first: true,
}];
let mut sort = SortExecutor::new(scan, sort_keys);
sort.open().unwrap();
let first = sort.next().unwrap().unwrap();
assert_eq!(first.values[1], Value::String("Bob".to_string()));
let second = sort.next().unwrap().unwrap();
assert_eq!(second.values[1], Value::String("Alice".to_string()));
sort.close().unwrap();
}
#[test]
fn test_limit_executor() {
let scan = Box::new(ScanExecutor::new(test_tuples()));
let mut limit = LimitExecutor::new(scan, Some(2), 0);
limit.open().unwrap();
let mut count = 0;
while limit.next().unwrap().is_some() {
count += 1;
}
assert_eq!(count, 2);
limit.close().unwrap();
}
}