use reifydb_core::interface::{
catalog::flow::FlowNodeId,
identifier::{ColumnIdentifier, ColumnShape},
resolved::{ResolvedColumn, ResolvedShape},
};
use reifydb_rql::{
expression::{ColumnExpression, Expression},
flow::node::FlowNodeType::Distinct,
nodes::DistinctNode,
query::QueryPlan,
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{Result, fragment::Fragment};
use crate::flow::compiler::{CompileOperator, FlowCompiler};
pub(crate) struct DistinctCompiler {
pub input: Box<QueryPlan>,
pub columns: Vec<ResolvedColumn>,
}
impl From<DistinctNode> for DistinctCompiler {
fn from(node: DistinctNode) -> Self {
Self {
input: node.input,
columns: node.columns.into_iter().collect(),
}
}
}
fn resolved_to_column_identifier(resolved: ResolvedColumn) -> ColumnIdentifier {
let shape = match resolved.shape() {
ResolvedShape::Table(t) => ColumnShape::Qualified {
namespace: Fragment::internal(t.namespace().name()),
name: Fragment::internal(t.name()),
},
ResolvedShape::View(v) => ColumnShape::Qualified {
namespace: Fragment::internal(v.namespace().name()),
name: Fragment::internal(v.name()),
},
ResolvedShape::RingBuffer(r) => ColumnShape::Qualified {
namespace: Fragment::internal(r.namespace().name()),
name: Fragment::internal(r.name()),
},
_ => ColumnShape::Alias(Fragment::internal("_unknown")),
};
ColumnIdentifier {
shape,
name: Fragment::internal(resolved.name()),
}
}
impl CompileOperator for DistinctCompiler {
fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
let input_node = compiler.compile_plan(txn, *self.input)?;
let expressions: Vec<Expression> = self
.columns
.into_iter()
.map(|col| Expression::Column(ColumnExpression(resolved_to_column_identifier(col))))
.collect();
let node_id = compiler.add_node(
txn,
Distinct {
expressions,
},
)?;
compiler.add_edge(txn, &input_node, &node_id)?;
Ok(node_id)
}
}