reifydb_sub_flow/operator/
distinct.rs1use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::{from_bytes, to_stdvec};
8use reifydb_core::{
9 encoded::schema::Schema,
10 interface::{
11 catalog::flow::FlowNodeId,
12 change::{Change, Diff},
13 },
14 internal,
15 value::column::{Column, columns::Columns, data::ColumnData},
16};
17use reifydb_engine::{
18 expression::{
19 compile::{CompiledExpr, compile_expression},
20 context::{CompileContext, EvalContext},
21 },
22 vm::stack::SymbolTable,
23};
24use reifydb_function::registry::Functions;
25use reifydb_rql::expression::Expression;
26use reifydb_runtime::{
27 clock::Clock,
28 hash::{Hash128, xxh3_128},
29};
30use reifydb_type::{
31 Result,
32 error::Error,
33 fragment::Fragment,
34 params::Params,
35 util::cowvec::CowVec,
36 value::{Value, blob::Blob, identity::IdentityId, row_number::RowNumber, r#type::Type},
37};
38use serde::{Deserialize, Serialize};
39
40use crate::{
41 operator::{
42 Operator, Operators,
43 stateful::{raw::RawStatefulOperator, single::SingleStateful},
44 },
45 transaction::FlowTransaction,
46};
47
48static EMPTY_PARAMS: Params = Params::None;
49static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53struct DistinctLayout {
54 names: Vec<String>,
55 types: Vec<Type>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SerializedRow {
61 number: RowNumber,
62 #[serde(with = "serde_bytes")]
64 values_bytes: Vec<u8>,
65}
66
67impl SerializedRow {
68 fn from_columns_at_index(columns: &Columns, row_idx: usize) -> Self {
70 let number = columns.row_numbers[row_idx];
71
72 let values: Vec<Value> = columns.iter().map(|c| c.data().get_value(row_idx)).collect();
73
74 let values_bytes = to_stdvec(&values).expect("Failed to serialize column values");
76
77 Self {
78 number,
79 values_bytes,
80 }
81 }
82
83 fn to_columns(&self, layout: &DistinctLayout) -> Columns {
85 let values: Vec<Value> = from_bytes(&self.values_bytes).expect("Failed to deserialize column values");
87
88 let mut columns_vec = Vec::with_capacity(layout.names.len());
89 for (i, (name, typ)) in layout.names.iter().zip(layout.types.iter()).enumerate() {
90 let value = values.get(i).cloned().unwrap_or(Value::none());
91 let mut col_data = ColumnData::with_capacity(typ.clone(), 1);
92 col_data.push_value(value);
93 columns_vec.push(Column {
94 name: Fragment::internal(name),
95 data: col_data,
96 });
97 }
98
99 Columns {
100 row_numbers: CowVec::new(vec![self.number]),
101 columns: CowVec::new(columns_vec),
102 }
103 }
104}
105
106impl DistinctLayout {
107 fn new() -> Self {
108 Self {
109 names: Vec::new(),
110 types: Vec::new(),
111 }
112 }
113
114 fn update_from_columns(&mut self, columns: &Columns) {
116 if columns.is_empty() {
117 return;
118 }
119
120 let names: Vec<String> = columns.iter().map(|c| c.name().text().to_string()).collect();
121 let types: Vec<Type> = columns.iter().map(|c| c.data().get_type()).collect();
122
123 if self.names.is_empty() {
124 self.names = names;
125 self.types = types;
126 return;
127 }
128
129 for (i, new_type) in types.iter().enumerate() {
130 if i < self.types.len() {
131 if !self.types[i].is_option() && new_type.is_option() {
132 self.types[i] = new_type.clone();
133 }
134 } else {
135 self.types.push(new_type.clone());
136 if i < names.len() {
137 self.names.push(names[i].clone());
138 }
139 }
140 }
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146struct DistinctEntry {
147 count: usize,
149 first_row: SerializedRow,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155struct DistinctState {
156 entries: IndexMap<Hash128, DistinctEntry>,
159 layout: DistinctLayout,
161}
162
163impl Default for DistinctState {
164 fn default() -> Self {
165 Self {
166 entries: IndexMap::new(),
167 layout: DistinctLayout::new(),
168 }
169 }
170}
171
172pub struct DistinctOperator {
173 parent: Arc<Operators>,
174 node: FlowNodeId,
175 compiled_expressions: Vec<CompiledExpr>,
176 schema: Schema,
177 functions: Functions,
178 clock: Clock,
179}
180
181impl DistinctOperator {
182 pub fn new(
183 parent: Arc<Operators>,
184 node: FlowNodeId,
185 expressions: Vec<Expression>,
186 functions: Functions,
187 clock: Clock,
188 ) -> Self {
189 let symbol_table = SymbolTable::new();
190 let compile_ctx = CompileContext {
191 functions: &functions,
192 symbol_table: &symbol_table,
193 };
194 let compiled_expressions: Vec<CompiledExpr> = expressions
195 .iter()
196 .map(|e| compile_expression(&compile_ctx, e))
197 .collect::<Result<Vec<_>>>()
198 .expect("Failed to compile expressions");
199
200 Self {
201 parent,
202 node,
203 compiled_expressions,
204 schema: Schema::testing(&[Type::Blob]),
205 functions,
206 clock,
207 }
208 }
209
210 fn compute_hashes(&self, columns: &Columns) -> Result<Vec<Hash128>> {
212 let row_count = columns.row_count();
213 if row_count == 0 {
214 return Ok(Vec::new());
215 }
216
217 if self.compiled_expressions.is_empty() {
218 let mut hashes = Vec::with_capacity(row_count);
220 for row_idx in 0..row_count {
221 let mut data = Vec::new();
222 for col in columns.iter() {
223 let value = col.data().get_value(row_idx);
224 let value_str = value.to_string();
225 data.extend_from_slice(value_str.as_bytes());
226 }
227 hashes.push(xxh3_128(&data));
228 }
229 Ok(hashes)
230 } else {
231 let exec_ctx = EvalContext {
232 target: None,
233 columns: columns.clone(),
234 row_count,
235 take: None,
236 params: &EMPTY_PARAMS,
237 symbol_table: &EMPTY_SYMBOL_TABLE,
238 is_aggregate_context: false,
239 functions: &self.functions,
240 clock: &self.clock,
241 arena: None,
242 identity: IdentityId::root(),
243 };
244 let mut expr_columns = Vec::new();
245 for compiled_expr in &self.compiled_expressions {
246 let col = compiled_expr.execute(&exec_ctx)?;
247 expr_columns.push(col);
248 }
249
250 let mut hashes = Vec::with_capacity(row_count);
251 for row_idx in 0..row_count {
252 let mut data = Vec::new();
253 for col in &expr_columns {
254 let value = col.data().get_value(row_idx);
255 let value_str = value.to_string();
256 data.extend_from_slice(value_str.as_bytes());
257 }
258 hashes.push(xxh3_128(&data));
259 }
260 Ok(hashes)
261 }
262 }
263
264 fn load_distinct_state(&self, txn: &mut FlowTransaction) -> Result<DistinctState> {
265 let state_row = self.load_state(txn)?;
266
267 if state_row.is_empty() || !state_row.is_defined(0) {
268 return Ok(DistinctState::default());
269 }
270
271 let blob = self.schema.get_blob(&state_row, 0);
272 if blob.is_empty() {
273 return Ok(DistinctState::default());
274 }
275
276 from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize DistinctState: {}", e)))
277 }
278
279 fn save_distinct_state(&self, txn: &mut FlowTransaction, state: &DistinctState) -> Result<()> {
280 let serialized =
281 to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize DistinctState: {}", e)))?;
282
283 let mut state_row = self.schema.allocate();
284 let blob = Blob::from(serialized);
285 self.schema.set_blob(&mut state_row, 0, &blob);
286
287 self.save_state(txn, state_row)
288 }
289
290 fn process_insert(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
292 let mut result = Vec::new();
293 let row_count = columns.row_count();
294 if row_count == 0 {
295 return Ok(result);
296 }
297
298 state.layout.update_from_columns(columns);
299 let hashes = self.compute_hashes(columns)?;
300
301 let mut new_distinct_indices: Vec<usize> = Vec::new();
302
303 for row_idx in 0..row_count {
304 let hash = hashes[row_idx];
305
306 match state.entries.get_mut(&hash) {
307 Some(entry) => {
308 entry.count += 1;
309 }
311 None => {
312 state.entries.insert(
313 hash,
314 DistinctEntry {
315 count: 1,
316 first_row: SerializedRow::from_columns_at_index(
317 columns, row_idx,
318 ),
319 },
320 );
321 new_distinct_indices.push(row_idx);
322 }
323 }
324 }
325
326 if !new_distinct_indices.is_empty() {
327 let output = columns.extract_by_indices(&new_distinct_indices);
328 result.push(Diff::Insert {
329 post: output,
330 });
331 }
332
333 Ok(result)
334 }
335
336 fn process_update(
338 &self,
339 state: &mut DistinctState,
340 pre_columns: &Columns,
341 post_columns: &Columns,
342 ) -> Result<Vec<Diff>> {
343 let mut result = Vec::new();
344 let row_count = post_columns.row_count();
345 if row_count == 0 {
346 return Ok(result);
347 }
348
349 state.layout.update_from_columns(post_columns);
350 let pre_hashes = self.compute_hashes(pre_columns)?;
351 let post_hashes = self.compute_hashes(post_columns)?;
352
353 let mut same_key_update_indices: Vec<usize> = Vec::new();
355 let mut removed_indices: Vec<usize> = Vec::new();
356 let mut inserted_indices: Vec<usize> = Vec::new();
357
358 for row_idx in 0..row_count {
359 let pre_hash = pre_hashes[row_idx];
360 let post_hash = post_hashes[row_idx];
361
362 if pre_hash == post_hash {
363 if let Some(entry) = state.entries.get_mut(&pre_hash) {
365 if entry.first_row.number == post_columns.row_numbers[row_idx] {
366 entry.first_row =
367 SerializedRow::from_columns_at_index(post_columns, row_idx);
368 }
369 same_key_update_indices.push(row_idx);
370 }
371 } else {
372 if let Some(entry) = state.entries.get_mut(&pre_hash) {
374 if entry.count > 1 {
375 entry.count -= 1;
376 } else {
377 state.entries.shift_remove(&pre_hash);
378 removed_indices.push(row_idx);
379 }
380 }
381
382 match state.entries.get_mut(&post_hash) {
383 Some(entry) => {
384 entry.count += 1;
385 }
386 None => {
387 state.entries.insert(
388 post_hash,
389 DistinctEntry {
390 count: 1,
391 first_row: SerializedRow::from_columns_at_index(
392 post_columns,
393 row_idx,
394 ),
395 },
396 );
397 inserted_indices.push(row_idx);
398 }
399 }
400 }
401 }
402
403 if !same_key_update_indices.is_empty() {
404 let pre_output = pre_columns.extract_by_indices(&same_key_update_indices);
405 let post_output = post_columns.extract_by_indices(&same_key_update_indices);
406 result.push(Diff::Update {
407 pre: pre_output,
408 post: post_output,
409 });
410 }
411
412 if !removed_indices.is_empty() {
413 let output = pre_columns.extract_by_indices(&removed_indices);
414 result.push(Diff::Remove {
415 pre: output,
416 });
417 }
418
419 if !inserted_indices.is_empty() {
420 let output = post_columns.extract_by_indices(&inserted_indices);
421 result.push(Diff::Insert {
422 post: output,
423 });
424 }
425
426 Ok(result)
427 }
428
429 fn process_remove(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
431 let mut result = Vec::new();
432 let row_count = columns.row_count();
433 if row_count == 0 {
434 return Ok(result);
435 }
436
437 let hashes = self.compute_hashes(columns)?;
438
439 let mut removed_hashes: Vec<Hash128> = Vec::new();
440
441 for row_idx in 0..row_count {
442 let hash = hashes[row_idx];
443
444 if let Some(entry) = state.entries.get_mut(&hash) {
445 if entry.count > 1 {
446 entry.count -= 1;
447 } else {
448 removed_hashes.push(hash);
449 }
450 }
451 }
452
453 for hash in removed_hashes {
454 if let Some(entry) = state.entries.shift_remove(&hash) {
455 let stored_columns = entry.first_row.to_columns(&state.layout);
456 result.push(Diff::Remove {
457 pre: stored_columns,
458 });
459 }
460 }
461
462 Ok(result)
463 }
464}
465
466impl RawStatefulOperator for DistinctOperator {}
467
468impl SingleStateful for DistinctOperator {
469 fn layout(&self) -> Schema {
470 self.schema.clone()
471 }
472}
473
474impl Operator for DistinctOperator {
475 fn id(&self) -> FlowNodeId {
476 self.node
477 }
478
479 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
480 let mut state = self.load_distinct_state(txn)?;
481 let mut result = Vec::new();
482
483 for diff in change.diffs {
484 match diff {
485 Diff::Insert {
486 post,
487 } => {
488 let insert_result = self.process_insert(&mut state, &post)?;
489 result.extend(insert_result);
490 }
491 Diff::Update {
492 pre,
493 post,
494 } => {
495 let update_result = self.process_update(&mut state, &pre, &post)?;
496 result.extend(update_result);
497 }
498 Diff::Remove {
499 pre,
500 } => {
501 let remove_result = self.process_remove(&mut state, &pre)?;
502 result.extend(remove_result);
503 }
504 }
505 }
506
507 self.save_distinct_state(txn, &state)?;
508
509 Ok(Change::from_flow(self.node, change.version, result))
510 }
511
512 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
513 self.parent.pull(txn, rows)
514 }
515}