use std::collections::HashSet;
use grafeo_common::types::{LogicalType, Value};
use super::{Operator, OperatorResult};
use crate::execution::DataChunk;
use crate::execution::chunk::DataChunkBuilder;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct RowKey(Vec<KeyPart>);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum KeyPart {
Null,
Bool(bool),
Int64(i64),
String(String),
}
impl RowKey {
fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
let parts: Vec<KeyPart> = columns
.iter()
.map(|&col_idx| {
chunk
.column(col_idx)
.and_then(|col| col.get_value(row))
.map_or(KeyPart::Null, |v| match v {
Value::Null => KeyPart::Null,
Value::Bool(b) => KeyPart::Bool(b),
Value::Int64(i) => KeyPart::Int64(i),
Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
Value::String(s) => KeyPart::String(s.to_string()),
_ => KeyPart::String(format!("{v:?}")),
})
})
.collect();
RowKey(parts)
}
fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
let columns: Vec<usize> = (0..chunk.column_count()).collect();
Self::from_row(chunk, row, &columns)
}
}
pub struct DistinctOperator {
child: Box<dyn Operator>,
distinct_columns: Option<Vec<usize>>,
output_schema: Vec<LogicalType>,
seen: HashSet<RowKey>,
}
impl DistinctOperator {
pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
Self {
child,
distinct_columns: None,
output_schema,
seen: HashSet::new(),
}
}
pub fn on_columns(
child: Box<dyn Operator>,
columns: Vec<usize>,
output_schema: Vec<LogicalType>,
) -> Self {
Self {
child,
distinct_columns: Some(columns),
output_schema,
seen: HashSet::new(),
}
}
}
impl Operator for DistinctOperator {
fn next(&mut self) -> OperatorResult {
loop {
let Some(chunk) = self.child.next()? else {
return Ok(None);
};
let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
for row in chunk.selected_indices() {
let key = match &self.distinct_columns {
Some(cols) => RowKey::from_row(&chunk, row, cols),
None => RowKey::from_all_columns(&chunk, row),
};
if self.seen.insert(key) {
for col_idx in 0..chunk.column_count() {
if let (Some(src_col), Some(dst_col)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
{
if let Some(value) = src_col.get_value(row) {
dst_col.push_value(value);
} else {
dst_col.push_value(Value::Null);
}
}
}
builder.advance_row();
if builder.is_full() {
return Ok(Some(builder.finish()));
}
}
}
if builder.row_count() > 0 {
return Ok(Some(builder.finish()));
}
}
}
fn reset(&mut self) {
self.child.reset();
self.seen.clear();
}
fn name(&self) -> &'static str {
"Distinct"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::chunk::DataChunkBuilder;
struct MockOperator {
chunks: Vec<DataChunk>,
position: usize,
}
impl MockOperator {
fn new(chunks: Vec<DataChunk>) -> Self {
Self {
chunks,
position: 0,
}
}
}
impl Operator for MockOperator {
fn next(&mut self) -> OperatorResult {
if self.position < self.chunks.len() {
let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
self.position += 1;
Ok(Some(chunk))
} else {
Ok(None)
}
}
fn reset(&mut self) {
self.position = 0;
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn create_chunk_with_duplicates() -> DataChunk {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
let data = [
(1i64, "a"),
(2, "b"),
(1, "a"), (3, "c"),
(2, "b"), (1, "a"), ];
for (num, text) in data {
builder.column_mut(0).unwrap().push_int64(num);
builder.column_mut(1).unwrap().push_string(text);
builder.advance_row();
}
builder.finish()
}
#[test]
fn test_distinct_all_columns() {
let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
let mut distinct = DistinctOperator::new(
Box::new(mock),
vec![LogicalType::Int64, LogicalType::String],
);
let mut results = Vec::new();
while let Some(chunk) = distinct.next().unwrap() {
for row in chunk.selected_indices() {
let num = chunk.column(0).unwrap().get_int64(row).unwrap();
let text = chunk
.column(1)
.unwrap()
.get_string(row)
.unwrap()
.to_string();
results.push((num, text));
}
}
assert_eq!(results.len(), 3);
results.sort();
assert_eq!(
results,
vec![
(1, "a".to_string()),
(2, "b".to_string()),
(3, "c".to_string()),
]
);
}
#[test]
fn test_distinct_single_column() {
let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
let mut distinct = DistinctOperator::on_columns(
Box::new(mock),
vec![0], vec![LogicalType::Int64, LogicalType::String],
);
let mut results = Vec::new();
while let Some(chunk) = distinct.next().unwrap() {
for row in chunk.selected_indices() {
let num = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(num);
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_distinct_across_chunks() {
let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
for i in [1, 2, 3] {
builder1.column_mut(0).unwrap().push_int64(i);
builder1.advance_row();
}
let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
for i in [2, 3, 4] {
builder2.column_mut(0).unwrap().push_int64(i);
builder2.advance_row();
}
let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = distinct.next().unwrap() {
for row in chunk.selected_indices() {
let num = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(num);
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 2, 3, 4]);
}
}