Skip to main content

reifydb_engine/flow/compiler/operator/
distinct.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::{
5	catalog::flow::FlowNodeId,
6	identifier::{ColumnIdentifier, ColumnPrimitive},
7	resolved::{ResolvedColumn, ResolvedPrimitive},
8};
9use reifydb_rql::{
10	expression::{ColumnExpression, Expression},
11	flow::node::FlowNodeType::Distinct,
12	nodes::DistinctNode,
13	query::QueryPlan,
14};
15use reifydb_transaction::transaction::admin::AdminTransaction;
16use reifydb_type::{Result, fragment::Fragment};
17
18use crate::flow::compiler::{CompileOperator, FlowCompiler};
19
20pub(crate) struct DistinctCompiler {
21	pub input: Box<QueryPlan>,
22	pub columns: Vec<ResolvedColumn>,
23}
24
25impl From<DistinctNode> for DistinctCompiler {
26	fn from(node: DistinctNode) -> Self {
27		Self {
28			input: node.input,
29			columns: node.columns.into_iter().collect(),
30		}
31	}
32}
33
34// Helper function to convert ResolvedColumn to ColumnIdentifier for expression system
35fn resolved_to_column_identifier(resolved: ResolvedColumn) -> ColumnIdentifier {
36	let primitive = match resolved.primitive() {
37		ResolvedPrimitive::Table(t) => ColumnPrimitive::Primitive {
38			namespace: Fragment::internal(t.namespace().name()),
39			primitive: Fragment::internal(t.name()),
40		},
41		ResolvedPrimitive::View(v) => ColumnPrimitive::Primitive {
42			namespace: Fragment::internal(v.namespace().name()),
43			primitive: Fragment::internal(v.name()),
44		},
45		ResolvedPrimitive::RingBuffer(r) => ColumnPrimitive::Primitive {
46			namespace: Fragment::internal(r.namespace().name()),
47			primitive: Fragment::internal(r.name()),
48		},
49		_ => ColumnPrimitive::Alias(Fragment::internal("_unknown")),
50	};
51
52	ColumnIdentifier {
53		primitive,
54		name: Fragment::internal(resolved.name()),
55	}
56}
57
58impl CompileOperator for DistinctCompiler {
59	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
60		let input_node = compiler.compile_plan(txn, *self.input)?;
61
62		// Convert resolved columns to column expressions via ColumnIdentifier
63		let expressions: Vec<Expression> = self
64			.columns
65			.into_iter()
66			.map(|col| Expression::Column(ColumnExpression(resolved_to_column_identifier(col))))
67			.collect();
68
69		let node_id = compiler.add_node(
70			txn,
71			Distinct {
72				expressions,
73			},
74		)?;
75
76		compiler.add_edge(txn, &input_node, &node_id)?;
77		Ok(node_id)
78	}
79}