1use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::{from_bytes, to_stdvec};
8use reifydb_core::{
9 encoded::shape::RowShape,
10 interface::{
11 catalog::flow::FlowNodeId,
12 change::{Change, Diff},
13 },
14 internal,
15 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
16};
17use reifydb_engine::{
18 expression::{
19 compile::{CompiledExpr, compile_expression},
20 context::{CompileContext, EvalContext},
21 },
22 vm::stack::SymbolTable,
23};
24use reifydb_routine::routine::registry::Routines;
25use reifydb_rql::expression::Expression;
26use reifydb_runtime::{
27 context::RuntimeContext,
28 hash::{Hash128, xxh3_128},
29};
30use reifydb_type::{
31 Result,
32 error::Error,
33 fragment::Fragment,
34 params::Params,
35 value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40 operator::{
41 Operator, Operators,
42 stateful::{raw::RawStatefulOperator, single::SingleStateful, utils},
43 },
44 transaction::{FlowTransaction, slot::PersistFn},
45};
46
47static EMPTY_PARAMS: Params = Params::None;
48static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52struct DistinctLayout {
53 names: Vec<String>,
54 types: Vec<Type>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59struct SerializedRow {
60 number: RowNumber,
61 created_at: DateTime,
62 updated_at: DateTime,
63 #[serde(with = "serde_bytes")]
65 values_bytes: Vec<u8>,
66}
67
68impl SerializedRow {
69 fn from_columns_at_index(columns: &Columns, row_idx: usize) -> Self {
71 let number = columns.row_numbers[row_idx];
72 let created_at = if columns.created_at.is_empty() {
73 DateTime::default()
74 } else {
75 columns.created_at[row_idx]
76 };
77 let updated_at = if columns.updated_at.is_empty() {
78 DateTime::default()
79 } else {
80 columns.updated_at[row_idx]
81 };
82
83 let values: Vec<Value> = columns.iter().map(|c| c.data().get_value(row_idx)).collect();
84
85 let values_bytes = to_stdvec(&values).expect("Failed to serialize column values");
87
88 Self {
89 number,
90 created_at,
91 updated_at,
92 values_bytes,
93 }
94 }
95
96 fn to_columns(&self, layout: &DistinctLayout) -> Columns {
98 let values: Vec<Value> = from_bytes(&self.values_bytes).expect("Failed to deserialize column values");
100
101 let mut columns_vec = Vec::with_capacity(layout.names.len());
102 for (i, (name, typ)) in layout.names.iter().zip(layout.types.iter()).enumerate() {
103 let value = values.get(i).cloned().unwrap_or(Value::none());
104 let mut col_data = ColumnBuffer::with_capacity(typ.clone(), 1);
105 col_data.push_value(value);
106 columns_vec.push(ColumnWithName::new(Fragment::internal(name), col_data));
107 }
108
109 Columns::with_system_columns(
110 columns_vec,
111 vec![self.number],
112 vec![self.created_at],
113 vec![self.updated_at],
114 )
115 }
116}
117
118impl DistinctLayout {
119 fn new() -> Self {
120 Self {
121 names: Vec::new(),
122 types: Vec::new(),
123 }
124 }
125
126 fn update_from_columns(&mut self, columns: &Columns) {
128 if columns.is_empty() {
129 return;
130 }
131
132 let names: Vec<String> = columns.iter().map(|c| c.name().text().to_string()).collect();
133 let types: Vec<Type> = columns.iter().map(|c| c.data().get_type()).collect();
134
135 if self.names.is_empty() {
136 self.names = names;
137 self.types = types;
138 return;
139 }
140
141 for (i, new_type) in types.iter().enumerate() {
142 if i < self.types.len() {
143 if !self.types[i].is_option() && new_type.is_option() {
144 self.types[i] = new_type.clone();
145 }
146 } else {
147 self.types.push(new_type.clone());
148 if i < names.len() {
149 self.names.push(names[i].clone());
150 }
151 }
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158struct DistinctEntry {
159 count: usize,
161 first_row: SerializedRow,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167struct DistinctState {
168 entries: IndexMap<Hash128, DistinctEntry>,
171 layout: DistinctLayout,
173}
174
175impl Default for DistinctState {
176 fn default() -> Self {
177 Self {
178 entries: IndexMap::new(),
179 layout: DistinctLayout::new(),
180 }
181 }
182}
183
184pub struct DistinctOperator {
185 parent: Arc<Operators>,
186 node: FlowNodeId,
187 compiled_expressions: Vec<CompiledExpr>,
188 shape: RowShape,
189 routines: Routines,
190 runtime_context: RuntimeContext,
191}
192
193impl DistinctOperator {
194 pub fn new(
195 parent: Arc<Operators>,
196 node: FlowNodeId,
197 expressions: Vec<Expression>,
198 routines: Routines,
199 runtime_context: RuntimeContext,
200 ) -> Self {
201 let symbols = SymbolTable::new();
202 let compile_ctx = CompileContext {
203 symbols: &symbols,
204 };
205 let compiled_expressions: Vec<CompiledExpr> = expressions
206 .iter()
207 .map(|e| compile_expression(&compile_ctx, e))
208 .collect::<Result<Vec<_>>>()
209 .expect("Failed to compile expressions");
210
211 Self {
212 parent,
213 node,
214 compiled_expressions,
215 shape: RowShape::testing(&[Type::Blob]),
216 routines,
217 runtime_context,
218 }
219 }
220
221 fn compute_hashes(&self, columns: &Columns) -> Result<Vec<Hash128>> {
223 let row_count = columns.row_count();
224 if row_count == 0 {
225 return Ok(Vec::new());
226 }
227
228 if self.compiled_expressions.is_empty() {
229 let mut hashes = Vec::with_capacity(row_count);
231 for row_idx in 0..row_count {
232 let mut data = Vec::new();
233 for col in columns.iter() {
234 let value = col.data().get_value(row_idx);
235 let value_str = value.to_string();
236 data.extend_from_slice(value_str.as_bytes());
237 }
238 hashes.push(xxh3_128(&data));
239 }
240 Ok(hashes)
241 } else {
242 let session = EvalContext {
243 params: &EMPTY_PARAMS,
244 symbols: &EMPTY_SYMBOL_TABLE,
245 routines: &self.routines,
246 runtime_context: &self.runtime_context,
247 arena: None,
248 identity: IdentityId::root(),
249 is_aggregate_context: false,
250 columns: Columns::empty(),
251 row_count: 1,
252 target: None,
253 take: None,
254 };
255 let exec_ctx = session.with_eval(columns.clone(), row_count);
256 let mut expr_columns = Vec::new();
257 for compiled_expr in &self.compiled_expressions {
258 let col = compiled_expr.execute(&exec_ctx)?;
259 expr_columns.push(col);
260 }
261
262 let mut hashes = Vec::with_capacity(row_count);
263 for row_idx in 0..row_count {
264 let mut data = Vec::new();
265 for col in &expr_columns {
266 let value = col.data().get_value(row_idx);
267 let value_str = value.to_string();
268 data.extend_from_slice(value_str.as_bytes());
269 }
270 hashes.push(xxh3_128(&data));
271 }
272 Ok(hashes)
273 }
274 }
275
276 fn load_distinct_state(&self, txn: &mut FlowTransaction) -> Result<DistinctState> {
277 let state_row = self.load_state(txn)?;
278
279 if state_row.is_empty() || !state_row.is_defined(0) {
280 return Ok(DistinctState::default());
281 }
282
283 let blob = self.shape.get_blob(&state_row, 0);
284 if blob.is_empty() {
285 return Ok(DistinctState::default());
286 }
287
288 from_bytes(blob.as_ref())
289 .map_err(|e| Error(Box::new(internal!("Failed to deserialize DistinctState: {}", e))))
290 }
291
292 fn save_distinct_state(&self, txn: &mut FlowTransaction, state: &DistinctState) -> Result<()> {
293 let serialized = to_stdvec(state)
294 .map_err(|e| Error(Box::new(internal!("Failed to serialize DistinctState: {}", e))))?;
295 let blob = Blob::from(serialized);
296
297 self.update_state(txn, |shape, row| {
298 shape.set_blob(row, 0, &blob);
299 Ok(())
300 })?;
301 Ok(())
302 }
303
304 fn process_insert(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
306 let mut result = Vec::new();
307 let row_count = columns.row_count();
308 if row_count == 0 {
309 return Ok(result);
310 }
311
312 state.layout.update_from_columns(columns);
313 let hashes = self.compute_hashes(columns)?;
314
315 let mut new_distinct_indices: Vec<usize> = Vec::new();
316
317 for (row_idx, &hash) in hashes.iter().enumerate() {
318 match state.entries.get_mut(&hash) {
319 Some(entry) => {
320 entry.count += 1;
321 }
323 None => {
324 state.entries.insert(
325 hash,
326 DistinctEntry {
327 count: 1,
328 first_row: SerializedRow::from_columns_at_index(
329 columns, row_idx,
330 ),
331 },
332 );
333 new_distinct_indices.push(row_idx);
334 }
335 }
336 }
337
338 if !new_distinct_indices.is_empty() {
339 let output = columns.extract_by_indices(&new_distinct_indices);
340 result.push(Diff::insert(output));
341 }
342
343 Ok(result)
344 }
345
346 fn process_update(
347 &self,
348 state: &mut DistinctState,
349 pre_columns: &Columns,
350 post_columns: &Columns,
351 ) -> Result<Vec<Diff>> {
352 let row_count = post_columns.row_count();
353 if row_count == 0 {
354 return Ok(Vec::new());
355 }
356
357 state.layout.update_from_columns(post_columns);
358 let pre_hashes = self.compute_hashes(pre_columns)?;
359 let post_hashes = self.compute_hashes(post_columns)?;
360
361 let mut same_key_update_indices: Vec<usize> = Vec::new();
362 let mut removed_indices: Vec<usize> = Vec::new();
363 let mut inserted_indices: Vec<usize> = Vec::new();
364
365 for row_idx in 0..row_count {
366 let pre_hash = pre_hashes[row_idx];
367 let post_hash = post_hashes[row_idx];
368
369 if pre_hash == post_hash {
370 update_same_distinct_key(
371 state,
372 pre_hash,
373 post_columns,
374 row_idx,
375 &mut same_key_update_indices,
376 );
377 } else {
378 if drop_pre_distinct_key(state, pre_hash) {
379 removed_indices.push(row_idx);
380 }
381 if add_post_distinct_key(state, post_hash, post_columns, row_idx) {
382 inserted_indices.push(row_idx);
383 }
384 }
385 }
386
387 let mut result = Vec::new();
388 if !same_key_update_indices.is_empty() {
389 let pre_output = pre_columns.extract_by_indices(&same_key_update_indices);
390 let post_output = post_columns.extract_by_indices(&same_key_update_indices);
391 result.push(Diff::update(pre_output, post_output));
392 }
393 if !removed_indices.is_empty() {
394 result.push(Diff::remove(pre_columns.extract_by_indices(&removed_indices)));
395 }
396 if !inserted_indices.is_empty() {
397 result.push(Diff::insert(post_columns.extract_by_indices(&inserted_indices)));
398 }
399 Ok(result)
400 }
401
402 fn process_remove(&self, state: &mut DistinctState, columns: &Columns) -> Result<Vec<Diff>> {
404 let mut result = Vec::new();
405 let row_count = columns.row_count();
406 if row_count == 0 {
407 return Ok(result);
408 }
409
410 let hashes = self.compute_hashes(columns)?;
411
412 let mut removed_hashes: Vec<Hash128> = Vec::new();
413
414 for &hash in &hashes {
415 if let Some(entry) = state.entries.get_mut(&hash) {
416 if entry.count > 1 {
417 entry.count -= 1;
418 } else {
419 removed_hashes.push(hash);
420 }
421 }
422 }
423
424 for hash in removed_hashes {
425 if let Some(entry) = state.entries.shift_remove(&hash) {
426 let stored_columns = entry.first_row.to_columns(&state.layout);
427 result.push(Diff::remove(stored_columns));
428 }
429 }
430
431 Ok(result)
432 }
433}
434
435#[inline]
436fn update_same_distinct_key(
437 state: &mut DistinctState,
438 hash: Hash128,
439 post_columns: &Columns,
440 row_idx: usize,
441 indices: &mut Vec<usize>,
442) {
443 if let Some(entry) = state.entries.get_mut(&hash) {
444 if entry.first_row.number == post_columns.row_numbers[row_idx] {
445 entry.first_row = SerializedRow::from_columns_at_index(post_columns, row_idx);
446 }
447 indices.push(row_idx);
448 }
449}
450
451#[inline]
452fn drop_pre_distinct_key(state: &mut DistinctState, hash: Hash128) -> bool {
453 let Some(entry) = state.entries.get_mut(&hash) else {
454 return false;
455 };
456 if entry.count > 1 {
457 entry.count -= 1;
458 false
459 } else {
460 state.entries.shift_remove(&hash);
461 true
462 }
463}
464
465#[inline]
466fn add_post_distinct_key(state: &mut DistinctState, hash: Hash128, post_columns: &Columns, row_idx: usize) -> bool {
467 match state.entries.get_mut(&hash) {
468 Some(entry) => {
469 entry.count += 1;
470 false
471 }
472 None => {
473 state.entries.insert(
474 hash,
475 DistinctEntry {
476 count: 1,
477 first_row: SerializedRow::from_columns_at_index(post_columns, row_idx),
478 },
479 );
480 true
481 }
482 }
483}
484
485impl RawStatefulOperator for DistinctOperator {}
486
487impl SingleStateful for DistinctOperator {
488 fn layout(&self) -> RowShape {
489 self.shape.clone()
490 }
491}
492
493impl Operator for DistinctOperator {
494 fn id(&self) -> FlowNodeId {
495 self.node
496 }
497
498 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
499 let node_id = self.node;
500 let shape = self.shape.clone();
501
502 let state: &mut DistinctState = txn.operator_state(node_id, |txn| {
506 let s = self.load_distinct_state(txn)?;
507 let persist: PersistFn = Box::new(move |txn, value| {
508 let state = value.downcast::<DistinctState>().expect("DistinctState slot type");
509 let serialized = to_stdvec(&*state).map_err(|e| {
510 Error(Box::new(internal!("Failed to serialize DistinctState: {}", e)))
511 })?;
512 let blob = Blob::from(serialized);
513 let key = utils::empty_key();
514 let mut row = utils::load_or_create_row(node_id, txn, &key, &shape)?;
515 shape.set_blob(&mut row, 0, &blob);
516 utils::save_row(node_id, txn, &key, row)?;
517 Ok(())
518 });
519 Ok((s, persist))
520 })?;
521
522 let mut result = Vec::new();
523 for diff in change.diffs {
524 match diff {
525 Diff::Insert {
526 post,
527 } => {
528 let insert_result = self.process_insert(state, &post)?;
529 result.extend(insert_result);
530 }
531 Diff::Update {
532 pre,
533 post,
534 } => {
535 let update_result = self.process_update(state, &pre, &post)?;
536 result.extend(update_result);
537 }
538 Diff::Remove {
539 pre,
540 } => {
541 let remove_result = self.process_remove(state, &pre)?;
542 result.extend(remove_result);
543 }
544 }
545 }
546
547 txn.mark_state_dirty(node_id);
548
549 Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
550 }
551
552 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
553 self.parent.pull(txn, rows)
554 }
555}