reifydb_engine/vm/volcano/
distinct.rs1use std::collections::HashSet;
5
6use reifydb_core::{
7 interface::resolved::ResolvedColumn,
8 value::column::{columns::Columns, headers::ColumnHeaders},
9};
10use reifydb_runtime::hash::{Hash128, xxh3_128};
11use reifydb_transaction::transaction::Transaction;
12use tracing::instrument;
13
14use crate::{
15 Result,
16 vm::volcano::query::{QueryContext, QueryNode},
17};
18
19pub(crate) struct DistinctNode {
20 input: Box<dyn QueryNode>,
21 columns: Vec<ResolvedColumn>,
22 headers: Option<ColumnHeaders>,
23}
24
25impl DistinctNode {
26 pub fn new(input: Box<dyn QueryNode>, columns: Vec<ResolvedColumn>) -> Self {
27 Self {
28 input,
29 columns,
30 headers: None,
31 }
32 }
33}
34
35impl QueryNode for DistinctNode {
36 #[instrument(level = "trace", skip_all, name = "volcano::distinct::initialize")]
37 fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
38 self.input.initialize(rx, ctx)?;
39 Ok(())
40 }
41
42 #[instrument(level = "trace", skip_all, name = "volcano::distinct::next")]
43 fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
44 if self.headers.is_some() {
45 return Ok(None);
46 }
47
48 let mut all_columns: Option<Columns> = None;
49 while let Some(cols) = self.input.next(rx, ctx)? {
50 if cols.row_count() == 0 {
51 continue;
52 }
53 match &mut all_columns {
54 None => all_columns = Some(cols),
55 Some(existing) => existing.append_columns(cols)?,
56 }
57 }
58
59 let all_columns = match all_columns {
60 Some(cols) => cols,
61 None => {
62 self.headers = Some(ColumnHeaders::empty());
63 return Ok(None);
64 }
65 };
66
67 let row_count = all_columns.row_count();
68 let mut seen = HashSet::<Hash128>::new();
69 let mut kept_indices = Vec::new();
70
71 if self.columns.is_empty() {
72 for row_idx in 0..row_count {
73 let mut data = Vec::new();
74 for col in all_columns.iter() {
75 let value = col.data().get_value(row_idx);
76 let value_str = value.to_string();
77 data.extend_from_slice(value_str.as_bytes());
78 }
79 let hash = xxh3_128(&data);
80 if seen.insert(hash) {
81 kept_indices.push(row_idx);
82 }
83 }
84 } else {
85 let distinct_col_names: Vec<&str> = self.columns.iter().map(|c| c.name()).collect();
86 for row_idx in 0..row_count {
87 let mut data = Vec::new();
88 for col_name in &distinct_col_names {
89 if let Some(col) = all_columns.column(col_name) {
90 let value = col.data().get_value(row_idx);
91 let value_str = value.to_string();
92 data.extend_from_slice(value_str.as_bytes());
93 }
94 }
95 let hash = xxh3_128(&data);
96 if seen.insert(hash) {
97 kept_indices.push(row_idx);
98 }
99 }
100 }
101
102 let result = all_columns.extract_by_indices(&kept_indices);
103 self.headers = Some(ColumnHeaders::from_columns(&result));
104
105 Ok(Some(result))
106 }
107
108 fn headers(&self) -> Option<ColumnHeaders> {
109 self.headers.clone().or(self.input.headers())
110 }
111}