Skip to main content

reifydb_sub_flow/operator/
distinct.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::{from_bytes, to_stdvec};
8use reifydb_core::{
9	encoded::schema::Schema,
10	interface::{
11		catalog::flow::FlowNodeId,
12		change::{Change, Diff},
13	},
14	internal,
15	value::column::{Column, columns::Columns, data::ColumnData},
16};
17use reifydb_engine::{
18	expression::{
19		compile::{CompiledExpr, compile_expression},
20		context::{CompileContext, EvalContext},
21	},
22	vm::stack::SymbolTable,
23};
24use reifydb_function::registry::Functions;
25use reifydb_rql::expression::Expression;
26use reifydb_runtime::{
27	clock::Clock,
28	hash::{Hash128, xxh3_128},
29};
30use reifydb_type::{
31	Result,
32	error::Error,
33	fragment::Fragment,
34	params::Params,
35	util::cowvec::CowVec,
36	value::{Value, blob::Blob, identity::IdentityId, row_number::RowNumber, r#type::Type},
37};
38use serde::{Deserialize, Serialize};
39
40use crate::{
41	operator::{
42		Operator, Operators,
43		stateful::{raw::RawStatefulOperator, single::SingleStateful},
44	},
45	transaction::FlowTransaction,
46};
47
48static EMPTY_PARAMS: Params = Params::None;
49static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
50
51/// Layout information shared across all rows
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct DistinctLayout {
54	names: Vec<String>,
55	types: Vec<Type>,
56}
57
58/// Serialized row data - stores column values directly without Row conversion
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SerializedRow {
61	number: RowNumber,
62	/// Column values serialized with postcard
63	#[serde(with = "serde_bytes")]
64	values_bytes: Vec<u8>,
65}
66
67impl SerializedRow {
68	/// Create from Columns at a specific row index - no Row allocation
69	fn from_columns_at_index(columns: &Columns, row_idx: usize) -> Self {
70		let number = columns.row_numbers[row_idx];
71
72		let values: Vec<Value> = columns.iter().map(|c| c.data().get_value(row_idx)).collect();
73
74		// Serialize values directly with postcard
75		let values_bytes = to_stdvec(&values).expect("Failed to serialize column values");
76
77		Self {
78			number,
79			values_bytes,
80		}
81	}
82
83	/// Convert back to Columns - no Row allocation
84	fn to_columns(&self, layout: &DistinctLayout) -> Columns {
85		// Deserialize values
86		let values: Vec<Value> = from_bytes(&self.values_bytes).expect("Failed to deserialize column values");
87
88		let mut columns_vec = Vec::with_capacity(layout.names.len());
89		for (i, (name, typ)) in layout.names.iter().zip(layout.types.iter()).enumerate() {
90			let value = values.get(i).cloned().unwrap_or(Value::none());
91			let mut col_data = ColumnData::with_capacity(typ.clone(), 1);
92			col_data.push_value(value);
93			columns_vec.push(Column {
94				name: Fragment::internal(name),
95				data: col_data,
96			});
97		}
98
99		Columns {
100			row_numbers: CowVec::new(vec![self.number]),
101			columns: CowVec::new(columns_vec),
102		}
103	}
104}
105
106impl DistinctLayout {
107	fn new() -> Self {
108		Self {
109			names: Vec::new(),
110			types: Vec::new(),
111		}
112	}
113
114	/// Update the layout from Columns (uses first row if multiple)
115	fn update_from_columns(&mut self, columns: &Columns) {
116		if columns.is_empty() {
117			return;
118		}
119
120		let names: Vec<String> = columns.iter().map(|c| c.name().text().to_string()).collect();
121		let types: Vec<Type> = columns.iter().map(|c| c.data().get_type()).collect();
122
123		if self.names.is_empty() {
124			self.names = names;
125			self.types = types;
126			return;
127		}
128
129		for (i, new_type) in types.iter().enumerate() {
130			if i < self.types.len() {
131				if !self.types[i].is_option() && new_type.is_option() {
132					self.types[i] = new_type.clone();
133				}
134			} else {
135				self.types.push(new_type.clone());
136				if i < names.len() {
137					self.names.push(names[i].clone());
138				}
139			}
140		}
141	}
142}
143
144/// Entry for tracking distinct values
145#[derive(Debug, Clone, Serialize, Deserialize)]
146struct DistinctEntry {
147	/// Number of times this distinct value appears
148	count: usize,
149	/// The first row that had this distinct value
150	first_row: SerializedRow,
151}
152
153/// State for tracking distinct values
154#[derive(Debug, Clone, Serialize, Deserialize)]
155struct DistinctState {
156	/// Map from hash of distinct expressions to entry
157	/// Using IndexMap to preserve insertion order for "first occurrence" semantics
158	entries: IndexMap<Hash128, DistinctEntry>,
159	/// Shared layout information
160	layout: DistinctLayout,
161}
162
163impl Default for DistinctState {
164	fn default() -> Self {
165		Self {
166			entries: IndexMap::new(),
167			layout: DistinctLayout::new(),
168		}
169	}
170}
171
172pub struct DistinctOperator {
173	parent: Arc<Operators>,
174	node: FlowNodeId,
175	compiled_expressions: Vec<CompiledExpr>,
176	schema: Schema,
177	functions: Functions,
178	clock: Clock,
179}
180
181impl DistinctOperator {
182	pub fn new(
183		parent: Arc<Operators>,
184		node: FlowNodeId,
185		expressions: Vec<Expression>,
186		functions: Functions,
187		clock: Clock,
188	) -> Self {
189		let symbol_table = SymbolTable::new();
190		let compile_ctx = CompileContext {
191			functions: &functions,
192			symbol_table: &symbol_table,
193		};
194		let compiled_expressions: Vec<CompiledExpr> = expressions
195			.iter()
196			.map(|e| compile_expression(&compile_ctx, e))
197			.collect::<Result<Vec<_>>>()
198			.expect("Failed to compile expressions");
199
200		Self {
201			parent,
202			node,
203			compiled_expressions,
204			schema: Schema::testing(&[Type::Blob]),
205			functions,
206			clock,
207		}
208	}
209
210	/// Compute hashes for all rows in Columns
211	fn compute_hashes(&self, columns: &Columns) -> Result<Vec<Hash128>> {
212		let row_count = columns.row_count();
213		if row_count == 0 {
214			return Ok(Vec::new());
215		}
216
217		if self.compiled_expressions.is_empty() {
218			// Hash the entire row data for each row
219			let mut hashes = Vec::with_capacity(row_count);
220			for row_idx in 0..row_count {
221				let mut data = Vec::new();
222				for col in columns.iter() {
223					let value = col.data().get_value(row_idx);
224					let value_str = value.to_string();
225					data.extend_from_slice(value_str.as_bytes());
226				}
227				hashes.push(xxh3_128(&data));
228			}
229			Ok(hashes)
230		} else {
231			let exec_ctx = EvalContext {
232				target: None,
233				columns: columns.clone(),
234				row_count,
235				take: None,
236				params: &EMPTY_PARAMS,
237				symbol_table: &EMPTY_SYMBOL_TABLE,
238				is_aggregate_context: false,
239				functions: &self.functions,
240				clock: &self.clock,
241				arena: None,
242				identity: IdentityId::root(),
243			};
244			let mut expr_columns = Vec::new();
245			for compiled_expr in &self.compiled_expressions {
246				let col = compiled_expr.execute(&exec_ctx)?;
247				expr_columns.push(col);
248			}
249
250			let mut hashes = Vec::with_capacity(row_count);
251			for row_idx in 0..row_count {
252				let mut data = Vec::new();
253				for col in &expr_columns {
254					let value = col.data().get_value(row_idx);
255					let value_str = value.to_string();
256					data.extend_from_slice(value_str.as_bytes());
257				}
258				hashes.push(xxh3_128(&data));
259			}
260			Ok(hashes)
261		}
262	}
263
264	fn load_distinct_state(&self, txn: &mut FlowTransaction) -> Result<DistinctState> {
265		let state_row = self.load_state(txn)?;
266
267		if state_row.is_empty() || !state_row.is_defined(0) {
268			return Ok(DistinctState::default());
269		}
270
271		let blob = self.schema.get_blob(&state_row, 0);
272		if blob.is_empty() {
273			return Ok(DistinctState::default());
274		}
275
276		from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize DistinctState: {}", e)))
277	}
278
279	fn save_distinct_state(&self, txn: &mut FlowTransaction, state: &DistinctState) -> Result<()> {
280		let serialized =
281			to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize DistinctState: {}", e)))?;
282
283		let mut state_row = self.schema.allocate();
284		let blob = Blob::from(serialized);
285		self.schema.set_blob(&mut state_row, 0, &blob);
286
287		self.save_state(txn, state_row)
288	}
289
290	/// Process inserts - operates directly on Columns without Row conversion
291	fn process_insert(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
292		let mut result = Vec::new();
293		let row_count = columns.row_count();
294		if row_count == 0 {
295			return Ok(result);
296		}
297
298		state.layout.update_from_columns(columns);
299		let hashes = self.compute_hashes(columns)?;
300
301		let mut new_distinct_indices: Vec<usize> = Vec::new();
302
303		for row_idx in 0..row_count {
304			let hash = hashes[row_idx];
305
306			match state.entries.get_mut(&hash) {
307				Some(entry) => {
308					entry.count += 1;
309					// Already seen this distinct value - just increment count
310				}
311				None => {
312					state.entries.insert(
313						hash,
314						DistinctEntry {
315							count: 1,
316							first_row: SerializedRow::from_columns_at_index(
317								columns, row_idx,
318							),
319						},
320					);
321					new_distinct_indices.push(row_idx);
322				}
323			}
324		}
325
326		if !new_distinct_indices.is_empty() {
327			let output = columns.extract_by_indices(&new_distinct_indices);
328			result.push(Diff::Insert {
329				post: output,
330			});
331		}
332
333		Ok(result)
334	}
335
336	/// Process updates - operates directly on Columns without Row conversion
337	fn process_update(
338		&self,
339		state: &mut DistinctState,
340		pre_columns: &Columns,
341		post_columns: &Columns,
342	) -> Result<Vec<Diff>> {
343		let mut result = Vec::new();
344		let row_count = post_columns.row_count();
345		if row_count == 0 {
346			return Ok(result);
347		}
348
349		state.layout.update_from_columns(post_columns);
350		let pre_hashes = self.compute_hashes(pre_columns)?;
351		let post_hashes = self.compute_hashes(post_columns)?;
352
353		// Group updates by type for batch output
354		let mut same_key_update_indices: Vec<usize> = Vec::new();
355		let mut removed_indices: Vec<usize> = Vec::new();
356		let mut inserted_indices: Vec<usize> = Vec::new();
357
358		for row_idx in 0..row_count {
359			let pre_hash = pre_hashes[row_idx];
360			let post_hash = post_hashes[row_idx];
361
362			if pre_hash == post_hash {
363				// Distinct key didn't change - update the stored row
364				if let Some(entry) = state.entries.get_mut(&pre_hash) {
365					if entry.first_row.number == post_columns.row_numbers[row_idx] {
366						entry.first_row =
367							SerializedRow::from_columns_at_index(post_columns, row_idx);
368					}
369					same_key_update_indices.push(row_idx);
370				}
371			} else {
372				// Key changed - remove from old, add to new
373				if let Some(entry) = state.entries.get_mut(&pre_hash) {
374					if entry.count > 1 {
375						entry.count -= 1;
376					} else {
377						state.entries.shift_remove(&pre_hash);
378						removed_indices.push(row_idx);
379					}
380				}
381
382				match state.entries.get_mut(&post_hash) {
383					Some(entry) => {
384						entry.count += 1;
385					}
386					None => {
387						state.entries.insert(
388							post_hash,
389							DistinctEntry {
390								count: 1,
391								first_row: SerializedRow::from_columns_at_index(
392									post_columns,
393									row_idx,
394								),
395							},
396						);
397						inserted_indices.push(row_idx);
398					}
399				}
400			}
401		}
402
403		if !same_key_update_indices.is_empty() {
404			let pre_output = pre_columns.extract_by_indices(&same_key_update_indices);
405			let post_output = post_columns.extract_by_indices(&same_key_update_indices);
406			result.push(Diff::Update {
407				pre: pre_output,
408				post: post_output,
409			});
410		}
411
412		if !removed_indices.is_empty() {
413			let output = pre_columns.extract_by_indices(&removed_indices);
414			result.push(Diff::Remove {
415				pre: output,
416			});
417		}
418
419		if !inserted_indices.is_empty() {
420			let output = post_columns.extract_by_indices(&inserted_indices);
421			result.push(Diff::Insert {
422				post: output,
423			});
424		}
425
426		Ok(result)
427	}
428
429	/// Process removes - operates directly on Columns without Row conversion
430	fn process_remove(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
431		let mut result = Vec::new();
432		let row_count = columns.row_count();
433		if row_count == 0 {
434			return Ok(result);
435		}
436
437		let hashes = self.compute_hashes(columns)?;
438
439		let mut removed_hashes: Vec<Hash128> = Vec::new();
440
441		for row_idx in 0..row_count {
442			let hash = hashes[row_idx];
443
444			if let Some(entry) = state.entries.get_mut(&hash) {
445				if entry.count > 1 {
446					entry.count -= 1;
447				} else {
448					removed_hashes.push(hash);
449				}
450			}
451		}
452
453		for hash in removed_hashes {
454			if let Some(entry) = state.entries.shift_remove(&hash) {
455				let stored_columns = entry.first_row.to_columns(&state.layout);
456				result.push(Diff::Remove {
457					pre: stored_columns,
458				});
459			}
460		}
461
462		Ok(result)
463	}
464}
465
466impl RawStatefulOperator for DistinctOperator {}
467
468impl SingleStateful for DistinctOperator {
469	fn layout(&self) -> Schema {
470		self.schema.clone()
471	}
472}
473
474impl Operator for DistinctOperator {
475	fn id(&self) -> FlowNodeId {
476		self.node
477	}
478
479	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
480		let mut state = self.load_distinct_state(txn)?;
481		let mut result = Vec::new();
482
483		for diff in change.diffs {
484			match diff {
485				Diff::Insert {
486					post,
487				} => {
488					let insert_result = self.process_insert(&mut state, &post)?;
489					result.extend(insert_result);
490				}
491				Diff::Update {
492					pre,
493					post,
494				} => {
495					let update_result = self.process_update(&mut state, &pre, &post)?;
496					result.extend(update_result);
497				}
498				Diff::Remove {
499					pre,
500				} => {
501					let remove_result = self.process_remove(&mut state, &pre)?;
502					result.extend(remove_result);
503				}
504			}
505		}
506
507		self.save_distinct_state(txn, &state)?;
508
509		Ok(Change::from_flow(self.node, change.version, result))
510	}
511
512	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
513		self.parent.pull(txn, rows)
514	}
515}