use std::collections::HashSet;
use std::sync::Arc;
use crate::{Result, Schema, Tuple};
use super::PhysicalOperator;
fn hash_tuple(tuple: &Tuple) -> u64 {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
for value in &tuple.values {
format!("{:?}", value).hash(&mut hasher);
}
hasher.finish()
}
fn tuples_equal(a: &Tuple, b: &Tuple) -> bool {
if a.values.len() != b.values.len() {
return false;
}
for (va, vb) in a.values.iter().zip(b.values.iter()) {
if format!("{:?}", va) != format!("{:?}", vb) {
return false;
}
}
true
}
pub struct UnionOperator {
results: Vec<Tuple>,
position: usize,
schema: Arc<Schema>,
}
impl UnionOperator {
pub fn new(
mut left: Box<dyn PhysicalOperator>,
mut right: Box<dyn PhysicalOperator>,
all: bool,
) -> Result<Self> {
let schema = left.schema();
let mut results = Vec::new();
while let Some(tuple) = left.next()? {
results.push(tuple);
}
while let Some(tuple) = right.next()? {
results.push(tuple);
}
if !all {
let mut seen: HashSet<u64> = HashSet::new();
let mut unique_results = Vec::new();
for tuple in results {
let hash = hash_tuple(&tuple);
let is_duplicate = seen.contains(&hash) &&
unique_results.iter().any(|t| tuples_equal(t, &tuple));
if !is_duplicate {
seen.insert(hash);
unique_results.push(tuple);
}
}
results = unique_results;
}
Ok(Self {
results,
position: 0,
schema,
})
}
}
impl PhysicalOperator for UnionOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
if self.position < self.results.len() {
let tuple = self.results.get(self.position).cloned()
.ok_or_else(|| crate::Error::query_execution("Union index out of bounds"))?;
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
}
pub struct IntersectOperator {
results: Vec<Tuple>,
position: usize,
schema: Arc<Schema>,
}
impl IntersectOperator {
pub fn new(
mut left: Box<dyn PhysicalOperator>,
mut right: Box<dyn PhysicalOperator>,
all: bool,
) -> Result<Self> {
let schema = left.schema();
let mut left_tuples = Vec::new();
while let Some(tuple) = left.next()? {
left_tuples.push(tuple);
}
let mut right_tuples = Vec::new();
while let Some(tuple) = right.next()? {
right_tuples.push(tuple);
}
let mut results = Vec::new();
if all {
let mut right_counts: std::collections::HashMap<u64, Vec<Tuple>> = std::collections::HashMap::new();
for tuple in right_tuples {
let hash = hash_tuple(&tuple);
right_counts.entry(hash).or_default().push(tuple);
}
for left_tuple in left_tuples {
let hash = hash_tuple(&left_tuple);
if let Some(right_list) = right_counts.get_mut(&hash) {
if let Some(pos) = right_list.iter().position(|t| tuples_equal(t, &left_tuple)) {
right_list.remove(pos);
results.push(left_tuple);
}
}
}
} else {
let mut seen: HashSet<u64> = HashSet::new();
let right_hashes: HashSet<u64> = right_tuples.iter()
.map(|t| hash_tuple(t))
.collect();
for left_tuple in left_tuples {
let hash = hash_tuple(&left_tuple);
if right_hashes.contains(&hash) && !seen.contains(&hash) {
if right_tuples.iter().any(|t| tuples_equal(t, &left_tuple)) {
seen.insert(hash);
results.push(left_tuple);
}
}
}
}
Ok(Self {
results,
position: 0,
schema,
})
}
}
impl PhysicalOperator for IntersectOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
if self.position < self.results.len() {
let tuple = self.results.get(self.position).cloned()
.ok_or_else(|| crate::Error::query_execution("Intersect index out of bounds"))?;
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
}
pub struct ExceptOperator {
results: Vec<Tuple>,
position: usize,
schema: Arc<Schema>,
}
impl ExceptOperator {
pub fn new(
mut left: Box<dyn PhysicalOperator>,
mut right: Box<dyn PhysicalOperator>,
all: bool,
) -> Result<Self> {
let schema = left.schema();
let mut left_tuples = Vec::new();
while let Some(tuple) = left.next()? {
left_tuples.push(tuple);
}
let mut right_tuples = Vec::new();
while let Some(tuple) = right.next()? {
right_tuples.push(tuple);
}
let mut results = Vec::new();
if all {
let mut right_remaining: std::collections::HashMap<u64, Vec<Tuple>> = std::collections::HashMap::new();
for tuple in right_tuples {
let hash = hash_tuple(&tuple);
right_remaining.entry(hash).or_default().push(tuple);
}
for left_tuple in left_tuples {
let hash = hash_tuple(&left_tuple);
let mut found = false;
if let Some(right_list) = right_remaining.get_mut(&hash) {
if let Some(pos) = right_list.iter().position(|t| tuples_equal(t, &left_tuple)) {
right_list.remove(pos);
found = true;
}
}
if !found {
results.push(left_tuple);
}
}
} else {
let mut seen: HashSet<u64> = HashSet::new();
let right_hashes: HashSet<u64> = right_tuples.iter()
.map(|t| hash_tuple(t))
.collect();
for left_tuple in left_tuples {
let hash = hash_tuple(&left_tuple);
if seen.insert(hash) {
let in_right = right_hashes.contains(&hash) &&
right_tuples.iter().any(|t| tuples_equal(t, &left_tuple));
if !in_right {
results.push(left_tuple);
}
}
}
}
Ok(Self {
results,
position: 0,
schema,
})
}
}
impl PhysicalOperator for ExceptOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
if self.position < self.results.len() {
let tuple = self.results.get(self.position).cloned()
.ok_or_else(|| crate::Error::query_execution("Except index out of bounds"))?;
self.position += 1;
Ok(Some(tuple))
} else {
Ok(None)
}
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
}