Skip to main content

reifydb_sub_flow/operator/
distinct.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::{from_bytes, to_stdvec};
8use reifydb_core::{
9	encoded::schema::RowSchema,
10	interface::{
11		catalog::flow::FlowNodeId,
12		change::{Change, Diff},
13	},
14	internal,
15	value::column::{Column, columns::Columns, data::ColumnData},
16};
17use reifydb_engine::{
18	expression::{
19		compile::{CompiledExpr, compile_expression},
20		context::{CompileContext, EvalSession},
21	},
22	vm::stack::SymbolTable,
23};
24use reifydb_routine::function::registry::Functions;
25use reifydb_rql::expression::Expression;
26use reifydb_runtime::{
27	context::RuntimeContext,
28	hash::{Hash128, xxh3_128},
29};
30use reifydb_type::{
31	Result,
32	error::Error,
33	fragment::Fragment,
34	params::Params,
35	util::cowvec::CowVec,
36	value::{Value, blob::Blob, identity::IdentityId, row_number::RowNumber, r#type::Type},
37};
38use serde::{Deserialize, Serialize};
39
40use crate::{
41	operator::{
42		Operator, Operators,
43		stateful::{raw::RawStatefulOperator, single::SingleStateful},
44	},
45	transaction::FlowTransaction,
46};
47
48static EMPTY_PARAMS: Params = Params::None;
49static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
50
51/// Layout information shared across all rows
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct DistinctLayout {
54	names: Vec<String>,
55	types: Vec<Type>,
56}
57
58/// Serialized row data - stores column values directly without Row conversion
59#[derive(Debug, Clone, Serialize, Deserialize)]
60struct SerializedRow {
61	number: RowNumber,
62	/// Column values serialized with postcard
63	#[serde(with = "serde_bytes")]
64	values_bytes: Vec<u8>,
65}
66
67impl SerializedRow {
68	/// Create from Columns at a specific row index - no Row allocation
69	fn from_columns_at_index(columns: &Columns, row_idx: usize) -> Self {
70		let number = columns.row_numbers[row_idx];
71
72		let values: Vec<Value> = columns.iter().map(|c| c.data().get_value(row_idx)).collect();
73
74		// Serialize values directly with postcard
75		let values_bytes = to_stdvec(&values).expect("Failed to serialize column values");
76
77		Self {
78			number,
79			values_bytes,
80		}
81	}
82
83	/// Convert back to Columns - no Row allocation
84	fn to_columns(&self, layout: &DistinctLayout) -> Columns {
85		// Deserialize values
86		let values: Vec<Value> = from_bytes(&self.values_bytes).expect("Failed to deserialize column values");
87
88		let mut columns_vec = Vec::with_capacity(layout.names.len());
89		for (i, (name, typ)) in layout.names.iter().zip(layout.types.iter()).enumerate() {
90			let value = values.get(i).cloned().unwrap_or(Value::none());
91			let mut col_data = ColumnData::with_capacity(typ.clone(), 1);
92			col_data.push_value(value);
93			columns_vec.push(Column {
94				name: Fragment::internal(name),
95				data: col_data,
96			});
97		}
98
99		Columns {
100			row_numbers: CowVec::new(vec![self.number]),
101			columns: CowVec::new(columns_vec),
102		}
103	}
104}
105
106impl DistinctLayout {
107	fn new() -> Self {
108		Self {
109			names: Vec::new(),
110			types: Vec::new(),
111		}
112	}
113
114	/// Update the layout from Columns (uses first row if multiple)
115	fn update_from_columns(&mut self, columns: &Columns) {
116		if columns.is_empty() {
117			return;
118		}
119
120		let names: Vec<String> = columns.iter().map(|c| c.name().text().to_string()).collect();
121		let types: Vec<Type> = columns.iter().map(|c| c.data().get_type()).collect();
122
123		if self.names.is_empty() {
124			self.names = names;
125			self.types = types;
126			return;
127		}
128
129		for (i, new_type) in types.iter().enumerate() {
130			if i < self.types.len() {
131				if !self.types[i].is_option() && new_type.is_option() {
132					self.types[i] = new_type.clone();
133				}
134			} else {
135				self.types.push(new_type.clone());
136				if i < names.len() {
137					self.names.push(names[i].clone());
138				}
139			}
140		}
141	}
142}
143
144/// Entry for tracking distinct values
145#[derive(Debug, Clone, Serialize, Deserialize)]
146struct DistinctEntry {
147	/// Number of times this distinct value appears
148	count: usize,
149	/// The first row that had this distinct value
150	first_row: SerializedRow,
151}
152
153/// State for tracking distinct values
154#[derive(Debug, Clone, Serialize, Deserialize)]
155struct DistinctState {
156	/// Map from hash of distinct expressions to entry
157	/// Using IndexMap to preserve insertion order for "first occurrence" semantics
158	entries: IndexMap<Hash128, DistinctEntry>,
159	/// Shared layout information
160	layout: DistinctLayout,
161}
162
163impl Default for DistinctState {
164	fn default() -> Self {
165		Self {
166			entries: IndexMap::new(),
167			layout: DistinctLayout::new(),
168		}
169	}
170}
171
172pub struct DistinctOperator {
173	parent: Arc<Operators>,
174	node: FlowNodeId,
175	compiled_expressions: Vec<CompiledExpr>,
176	schema: RowSchema,
177	functions: Functions,
178	runtime_context: RuntimeContext,
179}
180
181impl DistinctOperator {
182	pub fn new(
183		parent: Arc<Operators>,
184		node: FlowNodeId,
185		expressions: Vec<Expression>,
186		functions: Functions,
187		runtime_context: RuntimeContext,
188	) -> Self {
189		let symbols = SymbolTable::new();
190		let compile_ctx = CompileContext {
191			functions: &functions,
192			symbols: &symbols,
193		};
194		let compiled_expressions: Vec<CompiledExpr> = expressions
195			.iter()
196			.map(|e| compile_expression(&compile_ctx, e))
197			.collect::<Result<Vec<_>>>()
198			.expect("Failed to compile expressions");
199
200		Self {
201			parent,
202			node,
203			compiled_expressions,
204			schema: RowSchema::testing(&[Type::Blob]),
205			functions,
206			runtime_context,
207		}
208	}
209
210	/// Compute hashes for all rows in Columns
211	fn compute_hashes(&self, columns: &Columns) -> Result<Vec<Hash128>> {
212		let row_count = columns.row_count();
213		if row_count == 0 {
214			return Ok(Vec::new());
215		}
216
217		if self.compiled_expressions.is_empty() {
218			// Hash the entire row data for each row
219			let mut hashes = Vec::with_capacity(row_count);
220			for row_idx in 0..row_count {
221				let mut data = Vec::new();
222				for col in columns.iter() {
223					let value = col.data().get_value(row_idx);
224					let value_str = value.to_string();
225					data.extend_from_slice(value_str.as_bytes());
226				}
227				hashes.push(xxh3_128(&data));
228			}
229			Ok(hashes)
230		} else {
231			let session = EvalSession {
232				params: &EMPTY_PARAMS,
233				symbols: &EMPTY_SYMBOL_TABLE,
234				functions: &self.functions,
235				runtime_context: &self.runtime_context,
236				arena: None,
237				identity: IdentityId::root(),
238				is_aggregate_context: false,
239			};
240			let exec_ctx = session.eval(columns.clone(), row_count);
241			let mut expr_columns = Vec::new();
242			for compiled_expr in &self.compiled_expressions {
243				let col = compiled_expr.execute(&exec_ctx)?;
244				expr_columns.push(col);
245			}
246
247			let mut hashes = Vec::with_capacity(row_count);
248			for row_idx in 0..row_count {
249				let mut data = Vec::new();
250				for col in &expr_columns {
251					let value = col.data().get_value(row_idx);
252					let value_str = value.to_string();
253					data.extend_from_slice(value_str.as_bytes());
254				}
255				hashes.push(xxh3_128(&data));
256			}
257			Ok(hashes)
258		}
259	}
260
261	fn load_distinct_state(&self, txn: &mut FlowTransaction) -> Result<DistinctState> {
262		let state_row = self.load_state(txn)?;
263
264		if state_row.is_empty() || !state_row.is_defined(0) {
265			return Ok(DistinctState::default());
266		}
267
268		let blob = self.schema.get_blob(&state_row, 0);
269		if blob.is_empty() {
270			return Ok(DistinctState::default());
271		}
272
273		from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize DistinctState: {}", e)))
274	}
275
276	fn save_distinct_state(&self, txn: &mut FlowTransaction, state: &DistinctState) -> Result<()> {
277		let serialized =
278			to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize DistinctState: {}", e)))?;
279		let blob = Blob::from(serialized);
280
281		self.update_state(txn, |schema, row| {
282			schema.set_blob(row, 0, &blob);
283			Ok(())
284		})?;
285		Ok(())
286	}
287
288	/// Process inserts - operates directly on Columns without Row conversion
289	fn process_insert(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
290		let mut result = Vec::new();
291		let row_count = columns.row_count();
292		if row_count == 0 {
293			return Ok(result);
294		}
295
296		state.layout.update_from_columns(columns);
297		let hashes = self.compute_hashes(columns)?;
298
299		let mut new_distinct_indices: Vec<usize> = Vec::new();
300
301		for row_idx in 0..row_count {
302			let hash = hashes[row_idx];
303
304			match state.entries.get_mut(&hash) {
305				Some(entry) => {
306					entry.count += 1;
307					// Already seen this distinct value - just increment count
308				}
309				None => {
310					state.entries.insert(
311						hash,
312						DistinctEntry {
313							count: 1,
314							first_row: SerializedRow::from_columns_at_index(
315								columns, row_idx,
316							),
317						},
318					);
319					new_distinct_indices.push(row_idx);
320				}
321			}
322		}
323
324		if !new_distinct_indices.is_empty() {
325			let output = columns.extract_by_indices(&new_distinct_indices);
326			result.push(Diff::Insert {
327				post: output,
328			});
329		}
330
331		Ok(result)
332	}
333
334	/// Process updates - operates directly on Columns without Row conversion
335	fn process_update(
336		&self,
337		state: &mut DistinctState,
338		pre_columns: &Columns,
339		post_columns: &Columns,
340	) -> Result<Vec<Diff>> {
341		let mut result = Vec::new();
342		let row_count = post_columns.row_count();
343		if row_count == 0 {
344			return Ok(result);
345		}
346
347		state.layout.update_from_columns(post_columns);
348		let pre_hashes = self.compute_hashes(pre_columns)?;
349		let post_hashes = self.compute_hashes(post_columns)?;
350
351		// Group updates by type for batch output
352		let mut same_key_update_indices: Vec<usize> = Vec::new();
353		let mut removed_indices: Vec<usize> = Vec::new();
354		let mut inserted_indices: Vec<usize> = Vec::new();
355
356		for row_idx in 0..row_count {
357			let pre_hash = pre_hashes[row_idx];
358			let post_hash = post_hashes[row_idx];
359
360			if pre_hash == post_hash {
361				// Distinct key didn't change - update the stored row
362				if let Some(entry) = state.entries.get_mut(&pre_hash) {
363					if entry.first_row.number == post_columns.row_numbers[row_idx] {
364						entry.first_row =
365							SerializedRow::from_columns_at_index(post_columns, row_idx);
366					}
367					same_key_update_indices.push(row_idx);
368				}
369			} else {
370				// Key changed - remove from old, add to new
371				if let Some(entry) = state.entries.get_mut(&pre_hash) {
372					if entry.count > 1 {
373						entry.count -= 1;
374					} else {
375						state.entries.shift_remove(&pre_hash);
376						removed_indices.push(row_idx);
377					}
378				}
379
380				match state.entries.get_mut(&post_hash) {
381					Some(entry) => {
382						entry.count += 1;
383					}
384					None => {
385						state.entries.insert(
386							post_hash,
387							DistinctEntry {
388								count: 1,
389								first_row: SerializedRow::from_columns_at_index(
390									post_columns,
391									row_idx,
392								),
393							},
394						);
395						inserted_indices.push(row_idx);
396					}
397				}
398			}
399		}
400
401		if !same_key_update_indices.is_empty() {
402			let pre_output = pre_columns.extract_by_indices(&same_key_update_indices);
403			let post_output = post_columns.extract_by_indices(&same_key_update_indices);
404			result.push(Diff::Update {
405				pre: pre_output,
406				post: post_output,
407			});
408		}
409
410		if !removed_indices.is_empty() {
411			let output = pre_columns.extract_by_indices(&removed_indices);
412			result.push(Diff::Remove {
413				pre: output,
414			});
415		}
416
417		if !inserted_indices.is_empty() {
418			let output = post_columns.extract_by_indices(&inserted_indices);
419			result.push(Diff::Insert {
420				post: output,
421			});
422		}
423
424		Ok(result)
425	}
426
427	/// Process removes - operates directly on Columns without Row conversion
428	fn process_remove(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
429		let mut result = Vec::new();
430		let row_count = columns.row_count();
431		if row_count == 0 {
432			return Ok(result);
433		}
434
435		let hashes = self.compute_hashes(columns)?;
436
437		let mut removed_hashes: Vec<Hash128> = Vec::new();
438
439		for row_idx in 0..row_count {
440			let hash = hashes[row_idx];
441
442			if let Some(entry) = state.entries.get_mut(&hash) {
443				if entry.count > 1 {
444					entry.count -= 1;
445				} else {
446					removed_hashes.push(hash);
447				}
448			}
449		}
450
451		for hash in removed_hashes {
452			if let Some(entry) = state.entries.shift_remove(&hash) {
453				let stored_columns = entry.first_row.to_columns(&state.layout);
454				result.push(Diff::Remove {
455					pre: stored_columns,
456				});
457			}
458		}
459
460		Ok(result)
461	}
462}
463
464impl RawStatefulOperator for DistinctOperator {}
465
466impl SingleStateful for DistinctOperator {
467	fn layout(&self) -> RowSchema {
468		self.schema.clone()
469	}
470}
471
472impl Operator for DistinctOperator {
473	fn id(&self) -> FlowNodeId {
474		self.node
475	}
476
477	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
478		let mut state = self.load_distinct_state(txn)?;
479		let mut result = Vec::new();
480
481		for diff in change.diffs {
482			match diff {
483				Diff::Insert {
484					post,
485				} => {
486					let insert_result = self.process_insert(&mut state, &post)?;
487					result.extend(insert_result);
488				}
489				Diff::Update {
490					pre,
491					post,
492				} => {
493					let update_result = self.process_update(&mut state, &pre, &post)?;
494					result.extend(update_result);
495				}
496				Diff::Remove {
497					pre,
498				} => {
499					let remove_result = self.process_remove(&mut state, &pre)?;
500					result.extend(remove_result);
501				}
502			}
503		}
504
505		self.save_distinct_state(txn, &state)?;
506
507		Ok(Change::from_flow(self.node, change.version, result))
508	}
509
510	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
511		self.parent.pull(txn, rows)
512	}
513}