reifydb_engine/flow/compiler/operator/
distinct.rs1use reifydb_core::interface::{
5 catalog::flow::FlowNodeId,
6 identifier::{ColumnIdentifier, ColumnPrimitive},
7 resolved::{ResolvedColumn, ResolvedPrimitive},
8};
9use reifydb_rql::{
10 expression::{ColumnExpression, Expression},
11 flow::node::FlowNodeType::Distinct,
12 nodes::DistinctNode,
13 query::QueryPlan,
14};
15use reifydb_transaction::transaction::admin::AdminTransaction;
16use reifydb_type::{Result, fragment::Fragment};
17
18use crate::flow::compiler::{CompileOperator, FlowCompiler};
19
20pub(crate) struct DistinctCompiler {
21 pub input: Box<QueryPlan>,
22 pub columns: Vec<ResolvedColumn>,
23}
24
25impl From<DistinctNode> for DistinctCompiler {
26 fn from(node: DistinctNode) -> Self {
27 Self {
28 input: node.input,
29 columns: node.columns.into_iter().collect(),
30 }
31 }
32}
33
34fn resolved_to_column_identifier(resolved: ResolvedColumn) -> ColumnIdentifier {
36 let primitive = match resolved.primitive() {
37 ResolvedPrimitive::Table(t) => ColumnPrimitive::Primitive {
38 namespace: Fragment::internal(t.namespace().name()),
39 primitive: Fragment::internal(t.name()),
40 },
41 ResolvedPrimitive::View(v) => ColumnPrimitive::Primitive {
42 namespace: Fragment::internal(v.namespace().name()),
43 primitive: Fragment::internal(v.name()),
44 },
45 ResolvedPrimitive::RingBuffer(r) => ColumnPrimitive::Primitive {
46 namespace: Fragment::internal(r.namespace().name()),
47 primitive: Fragment::internal(r.name()),
48 },
49 _ => ColumnPrimitive::Alias(Fragment::internal("_unknown")),
50 };
51
52 ColumnIdentifier {
53 primitive,
54 name: Fragment::internal(resolved.name()),
55 }
56}
57
58impl CompileOperator for DistinctCompiler {
59 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
60 let input_node = compiler.compile_plan(txn, *self.input)?;
61
62 let expressions: Vec<Expression> = self
64 .columns
65 .into_iter()
66 .map(|col| Expression::Column(ColumnExpression(resolved_to_column_identifier(col))))
67 .collect();
68
69 let node_id = compiler.add_node(
70 txn,
71 Distinct {
72 expressions,
73 },
74 )?;
75
76 compiler.add_edge(txn, &input_node, &node_id)?;
77 Ok(node_id)
78 }
79}