Skip to main content

reifydb_engine/vm/volcano/
distinct.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::HashSet;
5
6use reifydb_core::{
7	interface::resolved::ResolvedColumn,
8	value::column::{columns::Columns, headers::ColumnHeaders},
9};
10use reifydb_runtime::hash::{Hash128, xxh3_128};
11use reifydb_transaction::transaction::Transaction;
12use tracing::instrument;
13
14use crate::{
15	Result,
16	vm::volcano::query::{QueryContext, QueryNode},
17};
18
19pub(crate) struct DistinctNode {
20	input: Box<dyn QueryNode>,
21	columns: Vec<ResolvedColumn>,
22	headers: Option<ColumnHeaders>,
23}
24
25impl DistinctNode {
26	pub fn new(input: Box<dyn QueryNode>, columns: Vec<ResolvedColumn>) -> Self {
27		Self {
28			input,
29			columns,
30			headers: None,
31		}
32	}
33}
34
35impl QueryNode for DistinctNode {
36	#[instrument(level = "trace", skip_all, name = "volcano::distinct::initialize")]
37	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
38		self.input.initialize(rx, ctx)?;
39		Ok(())
40	}
41
42	#[instrument(level = "trace", skip_all, name = "volcano::distinct::next")]
43	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
44		if self.headers.is_some() {
45			return Ok(None);
46		}
47
48		let mut all_columns: Option<Columns> = None;
49		while let Some(cols) = self.input.next(rx, ctx)? {
50			if cols.row_count() == 0 {
51				continue;
52			}
53			match &mut all_columns {
54				None => all_columns = Some(cols),
55				Some(existing) => existing.append_columns(cols)?,
56			}
57		}
58
59		let all_columns = match all_columns {
60			Some(cols) => cols,
61			None => {
62				self.headers = Some(ColumnHeaders::empty());
63				return Ok(None);
64			}
65		};
66
67		let row_count = all_columns.row_count();
68		let mut seen = HashSet::<Hash128>::new();
69		let mut kept_indices = Vec::new();
70
71		if self.columns.is_empty() {
72			for row_idx in 0..row_count {
73				let mut data = Vec::new();
74				for col in all_columns.iter() {
75					let value = col.data().get_value(row_idx);
76					let value_str = value.to_string();
77					data.extend_from_slice(value_str.as_bytes());
78				}
79				let hash = xxh3_128(&data);
80				if seen.insert(hash) {
81					kept_indices.push(row_idx);
82				}
83			}
84		} else {
85			let distinct_col_names: Vec<&str> = self.columns.iter().map(|c| c.name()).collect();
86			for row_idx in 0..row_count {
87				let mut data = Vec::new();
88				for col_name in &distinct_col_names {
89					if let Some(col) = all_columns.column(col_name) {
90						let value = col.data().get_value(row_idx);
91						let value_str = value.to_string();
92						data.extend_from_slice(value_str.as_bytes());
93					}
94				}
95				let hash = xxh3_128(&data);
96				if seen.insert(hash) {
97					kept_indices.push(row_idx);
98				}
99			}
100		}
101
102		let result = all_columns.extract_by_indices(&kept_indices);
103		self.headers = Some(ColumnHeaders::from_columns(&result));
104
105		Ok(Some(result))
106	}
107
108	fn headers(&self) -> Option<ColumnHeaders> {
109		self.headers.clone().or(self.input.headers())
110	}
111}