Skip to main content

reifydb_sub_flow/operator/
distinct.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::{from_bytes, to_stdvec};
8use reifydb_core::{
9	encoded::shape::RowShape,
10	interface::{
11		catalog::flow::FlowNodeId,
12		change::{Change, Diff},
13	},
14	internal,
15	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
16};
17use reifydb_engine::{
18	expression::{
19		compile::{CompiledExpr, compile_expression},
20		context::{CompileContext, EvalContext},
21	},
22	vm::stack::SymbolTable,
23};
24use reifydb_routine::routine::registry::Routines;
25use reifydb_rql::expression::Expression;
26use reifydb_runtime::{
27	context::RuntimeContext,
28	hash::{Hash128, xxh3_128},
29};
30use reifydb_type::{
31	Result,
32	error::Error,
33	fragment::Fragment,
34	params::Params,
35	value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40	operator::{
41		Operator, Operators,
42		stateful::{raw::RawStatefulOperator, single::SingleStateful, utils},
43	},
44	transaction::{FlowTransaction, slot::PersistFn},
45};
46
47static EMPTY_PARAMS: Params = Params::None;
48static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
49
50/// Layout information shared across all rows
51#[derive(Debug, Clone, Serialize, Deserialize)]
52struct DistinctLayout {
53	names: Vec<String>,
54	types: Vec<Type>,
55}
56
57/// Serialized row data - stores column values directly without Row conversion
58#[derive(Debug, Clone, Serialize, Deserialize)]
59struct SerializedRow {
60	number: RowNumber,
61	created_at: DateTime,
62	updated_at: DateTime,
63	/// Column values serialized with postcard
64	#[serde(with = "serde_bytes")]
65	values_bytes: Vec<u8>,
66}
67
68impl SerializedRow {
69	/// Create from Columns at a specific row index - no Row allocation
70	fn from_columns_at_index(columns: &Columns, row_idx: usize) -> Self {
71		let number = columns.row_numbers[row_idx];
72		let created_at = if columns.created_at.is_empty() {
73			DateTime::default()
74		} else {
75			columns.created_at[row_idx]
76		};
77		let updated_at = if columns.updated_at.is_empty() {
78			DateTime::default()
79		} else {
80			columns.updated_at[row_idx]
81		};
82
83		let values: Vec<Value> = columns.iter().map(|c| c.data().get_value(row_idx)).collect();
84
85		// Serialize values directly with postcard
86		let values_bytes = to_stdvec(&values).expect("Failed to serialize column values");
87
88		Self {
89			number,
90			created_at,
91			updated_at,
92			values_bytes,
93		}
94	}
95
96	/// Convert back to Columns - no Row allocation
97	fn to_columns(&self, layout: &DistinctLayout) -> Columns {
98		// Deserialize values
99		let values: Vec<Value> = from_bytes(&self.values_bytes).expect("Failed to deserialize column values");
100
101		let mut columns_vec = Vec::with_capacity(layout.names.len());
102		for (i, (name, typ)) in layout.names.iter().zip(layout.types.iter()).enumerate() {
103			let value = values.get(i).cloned().unwrap_or(Value::none());
104			let mut col_data = ColumnBuffer::with_capacity(typ.clone(), 1);
105			col_data.push_value(value);
106			columns_vec.push(ColumnWithName::new(Fragment::internal(name), col_data));
107		}
108
109		Columns::with_system_columns(
110			columns_vec,
111			vec![self.number],
112			vec![self.created_at],
113			vec![self.updated_at],
114		)
115	}
116}
117
118impl DistinctLayout {
119	fn new() -> Self {
120		Self {
121			names: Vec::new(),
122			types: Vec::new(),
123		}
124	}
125
126	/// Update the layout from Columns (uses first row if multiple)
127	fn update_from_columns(&mut self, columns: &Columns) {
128		if columns.is_empty() {
129			return;
130		}
131
132		let names: Vec<String> = columns.iter().map(|c| c.name().text().to_string()).collect();
133		let types: Vec<Type> = columns.iter().map(|c| c.data().get_type()).collect();
134
135		if self.names.is_empty() {
136			self.names = names;
137			self.types = types;
138			return;
139		}
140
141		for (i, new_type) in types.iter().enumerate() {
142			if i < self.types.len() {
143				if !self.types[i].is_option() && new_type.is_option() {
144					self.types[i] = new_type.clone();
145				}
146			} else {
147				self.types.push(new_type.clone());
148				if i < names.len() {
149					self.names.push(names[i].clone());
150				}
151			}
152		}
153	}
154}
155
156/// Entry for tracking distinct values
157#[derive(Debug, Clone, Serialize, Deserialize)]
158struct DistinctEntry {
159	/// Number of times this distinct value appears
160	count: usize,
161	/// The first row that had this distinct value
162	first_row: SerializedRow,
163}
164
165/// State for tracking distinct values
166#[derive(Debug, Clone, Serialize, Deserialize)]
167struct DistinctState {
168	/// Map from hash of distinct expressions to entry
169	/// Using IndexMap to preserve insertion order for "first occurrence" semantics
170	entries: IndexMap<Hash128, DistinctEntry>,
171	/// Shared layout information
172	layout: DistinctLayout,
173}
174
175impl Default for DistinctState {
176	fn default() -> Self {
177		Self {
178			entries: IndexMap::new(),
179			layout: DistinctLayout::new(),
180		}
181	}
182}
183
184pub struct DistinctOperator {
185	parent: Arc<Operators>,
186	node: FlowNodeId,
187	compiled_expressions: Vec<CompiledExpr>,
188	shape: RowShape,
189	routines: Routines,
190	runtime_context: RuntimeContext,
191}
192
193impl DistinctOperator {
194	pub fn new(
195		parent: Arc<Operators>,
196		node: FlowNodeId,
197		expressions: Vec<Expression>,
198		routines: Routines,
199		runtime_context: RuntimeContext,
200	) -> Self {
201		let symbols = SymbolTable::new();
202		let compile_ctx = CompileContext {
203			symbols: &symbols,
204		};
205		let compiled_expressions: Vec<CompiledExpr> = expressions
206			.iter()
207			.map(|e| compile_expression(&compile_ctx, e))
208			.collect::<Result<Vec<_>>>()
209			.expect("Failed to compile expressions");
210
211		Self {
212			parent,
213			node,
214			compiled_expressions,
215			shape: RowShape::testing(&[Type::Blob]),
216			routines,
217			runtime_context,
218		}
219	}
220
221	/// Compute hashes for all rows in Columns
222	fn compute_hashes(&self, columns: &Columns) -> Result<Vec<Hash128>> {
223		let row_count = columns.row_count();
224		if row_count == 0 {
225			return Ok(Vec::new());
226		}
227
228		if self.compiled_expressions.is_empty() {
229			// Hash the entire row data for each row
230			let mut hashes = Vec::with_capacity(row_count);
231			for row_idx in 0..row_count {
232				let mut data = Vec::new();
233				for col in columns.iter() {
234					let value = col.data().get_value(row_idx);
235					let value_str = value.to_string();
236					data.extend_from_slice(value_str.as_bytes());
237				}
238				hashes.push(xxh3_128(&data));
239			}
240			Ok(hashes)
241		} else {
242			let session = EvalContext {
243				params: &EMPTY_PARAMS,
244				symbols: &EMPTY_SYMBOL_TABLE,
245				routines: &self.routines,
246				runtime_context: &self.runtime_context,
247				arena: None,
248				identity: IdentityId::root(),
249				is_aggregate_context: false,
250				columns: Columns::empty(),
251				row_count: 1,
252				target: None,
253				take: None,
254			};
255			let exec_ctx = session.with_eval(columns.clone(), row_count);
256			let mut expr_columns = Vec::new();
257			for compiled_expr in &self.compiled_expressions {
258				let col = compiled_expr.execute(&exec_ctx)?;
259				expr_columns.push(col);
260			}
261
262			let mut hashes = Vec::with_capacity(row_count);
263			for row_idx in 0..row_count {
264				let mut data = Vec::new();
265				for col in &expr_columns {
266					let value = col.data().get_value(row_idx);
267					let value_str = value.to_string();
268					data.extend_from_slice(value_str.as_bytes());
269				}
270				hashes.push(xxh3_128(&data));
271			}
272			Ok(hashes)
273		}
274	}
275
276	fn load_distinct_state(&self, txn: &mut FlowTransaction) -> Result<DistinctState> {
277		let state_row = self.load_state(txn)?;
278
279		if state_row.is_empty() || !state_row.is_defined(0) {
280			return Ok(DistinctState::default());
281		}
282
283		let blob = self.shape.get_blob(&state_row, 0);
284		if blob.is_empty() {
285			return Ok(DistinctState::default());
286		}
287
288		from_bytes(blob.as_ref())
289			.map_err(|e| Error(Box::new(internal!("Failed to deserialize DistinctState: {}", e))))
290	}
291
292	fn save_distinct_state(&self, txn: &mut FlowTransaction, state: &DistinctState) -> Result<()> {
293		let serialized = to_stdvec(state)
294			.map_err(|e| Error(Box::new(internal!("Failed to serialize DistinctState: {}", e))))?;
295		let blob = Blob::from(serialized);
296
297		self.update_state(txn, |shape, row| {
298			shape.set_blob(row, 0, &blob);
299			Ok(())
300		})?;
301		Ok(())
302	}
303
304	/// Process inserts - operates directly on Columns without Row conversion
305	fn process_insert(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
306		let mut result = Vec::new();
307		let row_count = columns.row_count();
308		if row_count == 0 {
309			return Ok(result);
310		}
311
312		state.layout.update_from_columns(columns);
313		let hashes = self.compute_hashes(columns)?;
314
315		let mut new_distinct_indices: Vec<usize> = Vec::new();
316
317		for (row_idx, &hash) in hashes.iter().enumerate() {
318			match state.entries.get_mut(&hash) {
319				Some(entry) => {
320					entry.count += 1;
321					// Already seen this distinct value - just increment count
322				}
323				None => {
324					state.entries.insert(
325						hash,
326						DistinctEntry {
327							count: 1,
328							first_row: SerializedRow::from_columns_at_index(
329								columns, row_idx,
330							),
331						},
332					);
333					new_distinct_indices.push(row_idx);
334				}
335			}
336		}
337
338		if !new_distinct_indices.is_empty() {
339			let output = columns.extract_by_indices(&new_distinct_indices);
340			result.push(Diff::insert(output));
341		}
342
343		Ok(result)
344	}
345
346	fn process_update(
347		&self,
348		state: &mut DistinctState,
349		pre_columns: &Columns,
350		post_columns: &Columns,
351	) -> Result<Vec<Diff>> {
352		let row_count = post_columns.row_count();
353		if row_count == 0 {
354			return Ok(Vec::new());
355		}
356
357		state.layout.update_from_columns(post_columns);
358		let pre_hashes = self.compute_hashes(pre_columns)?;
359		let post_hashes = self.compute_hashes(post_columns)?;
360
361		let mut same_key_update_indices: Vec<usize> = Vec::new();
362		let mut removed_indices: Vec<usize> = Vec::new();
363		let mut inserted_indices: Vec<usize> = Vec::new();
364
365		for row_idx in 0..row_count {
366			let pre_hash = pre_hashes[row_idx];
367			let post_hash = post_hashes[row_idx];
368
369			if pre_hash == post_hash {
370				update_same_distinct_key(
371					state,
372					pre_hash,
373					post_columns,
374					row_idx,
375					&mut same_key_update_indices,
376				);
377			} else {
378				if drop_pre_distinct_key(state, pre_hash) {
379					removed_indices.push(row_idx);
380				}
381				if add_post_distinct_key(state, post_hash, post_columns, row_idx) {
382					inserted_indices.push(row_idx);
383				}
384			}
385		}
386
387		let mut result = Vec::new();
388		if !same_key_update_indices.is_empty() {
389			let pre_output = pre_columns.extract_by_indices(&same_key_update_indices);
390			let post_output = post_columns.extract_by_indices(&same_key_update_indices);
391			result.push(Diff::update(pre_output, post_output));
392		}
393		if !removed_indices.is_empty() {
394			result.push(Diff::remove(pre_columns.extract_by_indices(&removed_indices)));
395		}
396		if !inserted_indices.is_empty() {
397			result.push(Diff::insert(post_columns.extract_by_indices(&inserted_indices)));
398		}
399		Ok(result)
400	}
401
402	/// Process removes - operates directly on Columns without Row conversion
403	fn process_remove(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
404		let mut result = Vec::new();
405		let row_count = columns.row_count();
406		if row_count == 0 {
407			return Ok(result);
408		}
409
410		let hashes = self.compute_hashes(columns)?;
411
412		let mut removed_hashes: Vec<Hash128> = Vec::new();
413
414		for &hash in &hashes {
415			if let Some(entry) = state.entries.get_mut(&hash) {
416				if entry.count > 1 {
417					entry.count -= 1;
418				} else {
419					removed_hashes.push(hash);
420				}
421			}
422		}
423
424		for hash in removed_hashes {
425			if let Some(entry) = state.entries.shift_remove(&hash) {
426				let stored_columns = entry.first_row.to_columns(&state.layout);
427				result.push(Diff::remove(stored_columns));
428			}
429		}
430
431		Ok(result)
432	}
433}
434
435#[inline]
436fn update_same_distinct_key(
437	state: &mut DistinctState,
438	hash: Hash128,
439	post_columns: &Columns,
440	row_idx: usize,
441	indices: &mut Vec<usize>,
442) {
443	if let Some(entry) = state.entries.get_mut(&hash) {
444		if entry.first_row.number == post_columns.row_numbers[row_idx] {
445			entry.first_row = SerializedRow::from_columns_at_index(post_columns, row_idx);
446		}
447		indices.push(row_idx);
448	}
449}
450
451#[inline]
452fn drop_pre_distinct_key(state: &mut DistinctState, hash: Hash128) -> bool {
453	let Some(entry) = state.entries.get_mut(&hash) else {
454		return false;
455	};
456	if entry.count > 1 {
457		entry.count -= 1;
458		false
459	} else {
460		state.entries.shift_remove(&hash);
461		true
462	}
463}
464
465#[inline]
466fn add_post_distinct_key(state: &mut DistinctState, hash: Hash128, post_columns: &Columns, row_idx: usize) -> bool {
467	match state.entries.get_mut(&hash) {
468		Some(entry) => {
469			entry.count += 1;
470			false
471		}
472		None => {
473			state.entries.insert(
474				hash,
475				DistinctEntry {
476					count: 1,
477					first_row: SerializedRow::from_columns_at_index(post_columns, row_idx),
478				},
479			);
480			true
481		}
482	}
483}
484
485impl RawStatefulOperator for DistinctOperator {}
486
487impl SingleStateful for DistinctOperator {
488	fn layout(&self) -> RowShape {
489		self.shape.clone()
490	}
491}
492
493impl Operator for DistinctOperator {
494	fn id(&self) -> FlowNodeId {
495		self.node
496	}
497
498	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
499		let node_id = self.node;
500		let shape = self.shape.clone();
501
502		// Load (or fetch from cache) the cached DistinctState for this txn.
503		// On first access we register the persist closure; subsequent batches
504		// reuse the in-memory state without re-decoding the postcard blob.
505		let state: &mut DistinctState = txn.operator_state(node_id, |txn| {
506			let s = self.load_distinct_state(txn)?;
507			let persist: PersistFn = Box::new(move |txn, value| {
508				let state = value.downcast::<DistinctState>().expect("DistinctState slot type");
509				let serialized = to_stdvec(&*state).map_err(|e| {
510					Error(Box::new(internal!("Failed to serialize DistinctState: {}", e)))
511				})?;
512				let blob = Blob::from(serialized);
513				let key = utils::empty_key();
514				let mut row = utils::load_or_create_row(node_id, txn, &key, &shape)?;
515				shape.set_blob(&mut row, 0, &blob);
516				utils::save_row(node_id, txn, &key, row)?;
517				Ok(())
518			});
519			Ok((s, persist))
520		})?;
521
522		let mut result = Vec::new();
523		for diff in change.diffs {
524			match diff {
525				Diff::Insert {
526					post,
527				} => {
528					let insert_result = self.process_insert(state, &post)?;
529					result.extend(insert_result);
530				}
531				Diff::Update {
532					pre,
533					post,
534				} => {
535					let update_result = self.process_update(state, &pre, &post)?;
536					result.extend(update_result);
537				}
538				Diff::Remove {
539					pre,
540				} => {
541					let remove_result = self.process_remove(state, &pre)?;
542					result.extend(remove_result);
543				}
544			}
545		}
546
547		txn.mark_state_dirty(node_id);
548
549		Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
550	}
551
552	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
553		self.parent.pull(txn, rows)
554	}
555}