1use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9 common::{CommitVersion, JoinType},
10 encoded::{key::EncodedKey, shape::RowShape},
11 interface::{
12 catalog::flow::FlowNodeId,
13 change::{Change, ChangeOrigin, Diff},
14 },
15 internal,
16 util::encoding::keycode::serializer::KeySerializer,
17 value::column::{ColumnWithName, columns::Columns},
18};
19use reifydb_engine::{
20 expression::{
21 compile::{CompiledExpr, compile_expression},
22 context::{CompileContext, EvalContext},
23 },
24 vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_routine::routine::registry::Routines;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29 context::RuntimeContext,
30 hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33 Result,
34 error::Error,
35 fragment::Fragment,
36 params::Params,
37 util::cowvec::CowVec,
38 value::{Value, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42 column::JoinedColumnsBuilder,
43 state::{JoinSide, JoinState},
44 strategy::{JoinContext, JoinStrategy, UpdateKeys},
45};
46use crate::{
47 operator::{
48 Operator, Operators,
49 stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50 },
51 transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
56
57pub struct JoinSideConfig {
59 pub parent: Arc<Operators>,
60 pub node: FlowNodeId,
61 pub exprs: Vec<Expression>,
62}
63
64pub struct JoinOperator {
65 pub(crate) left_parent: Arc<Operators>,
66 pub(crate) right_parent: Arc<Operators>,
67 node: FlowNodeId,
68 strategy: JoinStrategy,
69 left_node: FlowNodeId,
70 right_node: FlowNodeId,
71 left_exprs: Vec<Expression>,
72 pub(crate) right_exprs: Vec<Expression>,
73 compiled_left_exprs: Vec<CompiledExpr>,
74 compiled_right_exprs: Vec<CompiledExpr>,
75 alias: Option<String>,
76 shape: RowShape,
77 row_number_provider: RowNumberProvider,
78 executor: Executor,
79 routines: Routines,
80 runtime_context: RuntimeContext,
81}
82
83impl JoinOperator {
84 pub fn new(
85 left: JoinSideConfig,
86 right: JoinSideConfig,
87 node: FlowNodeId,
88 join_type: JoinType,
89 alias: Option<String>,
90 executor: Executor,
91 ) -> Self {
92 let left_parent = left.parent;
93 let right_parent = right.parent;
94 let left_node = left.node;
95 let right_node = right.node;
96 let left_exprs = left.exprs;
97 let right_exprs = right.exprs;
98 let strategy = JoinStrategy::from(join_type);
99 let shape = Self::state_shape();
100 let row_number_provider = RowNumberProvider::new(node);
101
102 let compile_ctx = CompileContext {
104 symbols: &EMPTY_SYMBOL_TABLE,
105 };
106
107 let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
109 .iter()
110 .map(|e| compile_expression(&compile_ctx, e))
111 .collect::<Result<Vec<_>>>()
112 .expect("Failed to compile left expressions");
113
114 let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
115 .iter()
116 .map(|e| compile_expression(&compile_ctx, e))
117 .collect::<Result<Vec<_>>>()
118 .expect("Failed to compile right expressions");
119
120 let routines = executor.routines.clone();
122 let runtime_context = executor.runtime_context.clone();
123
124 Self {
125 left_parent,
126 right_parent,
127 node,
128 strategy,
129 left_node,
130 right_node,
131 left_exprs,
132 right_exprs,
133 compiled_left_exprs,
134 compiled_right_exprs,
135 alias,
136 shape,
137 row_number_provider,
138 executor,
139 routines,
140 runtime_context,
141 }
142 }
143
144 fn state_shape() -> RowShape {
145 RowShape::testing(&[Type::Blob])
146 }
147
148 pub(crate) fn compute_join_keys(
151 &self,
152 columns: &Columns,
153 compiled_exprs: &[CompiledExpr],
154 ) -> Result<Vec<Option<Hash128>>> {
155 let row_count = columns.row_count();
156 if row_count == 0 {
157 return Ok(Vec::new());
158 }
159
160 let session = EvalContext {
161 params: &EMPTY_PARAMS,
162 symbols: &EMPTY_SYMBOL_TABLE,
163 routines: &self.routines,
164 runtime_context: &self.runtime_context,
165 arena: None,
166 identity: IdentityId::root(),
167 is_aggregate_context: false,
168 columns: Columns::empty(),
169 row_count: 1,
170 target: None,
171 take: None,
172 };
173 let exec_ctx = session.with_eval(columns.clone(), row_count);
174
175 let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
177 for compiled_expr in compiled_exprs.iter() {
178 let col: ColumnWithName = if let Some(col_name) = compiled_expr.access_column_name() {
179 columns.column(col_name)
180 .map(|c| ColumnWithName::new(c.name().clone(), c.data().clone()))
181 .unwrap_or_else(|| {
182 ColumnWithName::undefined_typed(col_name, Type::Boolean, row_count)
183 })
184 } else {
185 compiled_expr.execute(&exec_ctx)?
186 };
187 expr_columns.push(col);
188 }
189
190 let mut hashes = Vec::with_capacity(row_count);
192 for row_idx in 0..row_count {
193 let mut hasher = Vec::with_capacity(256);
194 let mut has_undefined = false;
195
196 for col in &expr_columns {
197 let value = col.data().get_value(row_idx);
198
199 if matches!(value, Value::None { .. }) {
201 has_undefined = true;
202 break;
203 }
204
205 let bytes = to_stdvec(&value).map_err(|e| {
206 Error(Box::new(internal!("Failed to encode value for hash: {}", e)))
207 })?;
208 hasher.extend_from_slice(&bytes);
209 }
210
211 if has_undefined {
212 hashes.push(None);
213 } else {
214 hashes.push(Some(xxh3_128(&hasher)));
215 }
216 }
217
218 Ok(hashes)
219 }
220
221 pub(crate) fn unmatched_left_columns(
224 &self,
225 txn: &mut FlowTransaction,
226 left: &Columns,
227 left_idx: usize,
228 ) -> Result<Columns> {
229 let left_row_number = left.row_numbers[left_idx];
230
231 let mut serializer = KeySerializer::new();
233 serializer.extend_u8(b'L');
234 serializer.extend_u64(left_row_number.0);
235 let composite_key = EncodedKey::new(serializer.finish());
236
237 let (result_row_number, _is_new) =
239 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
240
241 let right_shape = self.right_parent.pull(txn, &[])?;
243
244 let builder = JoinedColumnsBuilder::new(left, &right_shape, &self.alias);
246 Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_shape))
247 }
248
249 pub(crate) fn unmatched_left_columns_batch(
251 &self,
252 txn: &mut FlowTransaction,
253 left: &Columns,
254 left_indices: &[usize],
255 ) -> Result<Columns> {
256 if left_indices.is_empty() {
257 return Ok(Columns::empty());
258 }
259
260 let composite_keys: Vec<EncodedKey> = left_indices
262 .iter()
263 .map(|&idx| {
264 let left_row_number = left.row_numbers[idx];
265 let mut serializer = KeySerializer::new();
266 serializer.extend_u8(b'L');
267 serializer.extend_u64(left_row_number.0);
268 EncodedKey::new(serializer.finish())
269 })
270 .collect();
271
272 let row_numbers_with_flags =
274 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
275 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
276
277 let right_shape = self.right_parent.pull(txn, &[])?;
279
280 let builder = JoinedColumnsBuilder::new(left, &right_shape, &self.alias);
282 Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_shape))
283 }
284
285 pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
288 let mut serializer = KeySerializer::new();
289 serializer.extend_u8(b'L');
290 serializer.extend_u64(left_number);
291 let prefix = serializer.finish();
292
293 self.row_number_provider.remove_by_prefix(txn, &prefix)
295 }
296
297 pub(crate) fn join_columns(
299 &self,
300 txn: &mut FlowTransaction,
301 left: &Columns,
302 left_idx: usize,
303 right: &Columns,
304 right_idx: usize,
305 ) -> Result<Columns> {
306 let left_row_number = left.row_numbers[left_idx];
307 let right_row_number = right.row_numbers[right_idx];
308
309 let composite_key = Self::make_composite_key(left_row_number, right_row_number);
310 let (result_row_number, _is_new) =
311 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
312
313 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
315 Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
316 }
317
318 fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
320 let mut serializer = KeySerializer::new();
321 serializer.extend_u8(b'L');
322 serializer.extend_u64(left_num.0);
323 serializer.extend_u64(right_num.0);
324 EncodedKey::new(serializer.finish())
325 }
326
327 fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
330 let arr: [u8; 8] =
331 [!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
332 u64::from_be_bytes(arr)
333 }
334
335 fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
339 if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
341 return None;
342 }
343
344 let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
345 let right_num = if key_bytes.len() >= 17 {
346 Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
347 } else {
348 None
349 };
350
351 Some((RowNumber(left_num), right_num))
352 }
353
354 pub(crate) fn join_columns_one_to_many(
357 &self,
358 txn: &mut FlowTransaction,
359 left: &Columns,
360 left_idx: usize,
361 right: &Columns,
362 ) -> Result<Columns> {
363 let right_count = right.row_count();
364 if right_count == 0 {
365 return Ok(Columns::empty());
366 }
367
368 let left_row_number = left.row_numbers[left_idx];
369
370 let composite_keys: Vec<EncodedKey> = (0..right_count)
372 .map(|right_idx| {
373 let right_row_number = right.row_numbers[right_idx];
374 Self::make_composite_key(left_row_number, right_row_number)
375 })
376 .collect();
377
378 let row_numbers_with_flags =
380 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
381 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
382
383 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
384 Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
385 }
386
387 pub(crate) fn join_columns_many_to_one(
390 &self,
391 txn: &mut FlowTransaction,
392 left: &Columns,
393 right: &Columns,
394 right_idx: usize,
395 ) -> Result<Columns> {
396 let left_count = left.row_count();
397 if left_count == 0 {
398 return Ok(Columns::empty());
399 }
400
401 let right_row_number = right.row_numbers[right_idx];
402
403 let composite_keys: Vec<EncodedKey> = (0..left_count)
405 .map(|left_idx| {
406 let left_row_number = left.row_numbers[left_idx];
407 Self::make_composite_key(left_row_number, right_row_number)
408 })
409 .collect();
410
411 let row_numbers_with_flags =
413 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
414 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
415
416 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
417 Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
418 }
419
420 pub(crate) fn join_columns_cartesian(
423 &self,
424 txn: &mut FlowTransaction,
425 left: &Columns,
426 left_indices: &[usize],
427 right: &Columns,
428 right_indices: &[usize],
429 ) -> Result<Columns> {
430 let left_count = left_indices.len();
431 let right_count = right_indices.len();
432 if left_count == 0 || right_count == 0 {
433 return Ok(Columns::empty());
434 }
435
436 let total_results = left_count * right_count;
438 let mut composite_keys = Vec::with_capacity(total_results);
439
440 for &left_idx in left_indices {
441 let left_row_number = left.row_numbers[left_idx];
442 for &right_idx in right_indices {
443 let right_row_number = right.row_numbers[right_idx];
444 composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
445 }
446 }
447
448 let row_numbers_with_flags =
450 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
451 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
452
453 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
454 Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
455 }
456
457 fn determine_side(&self, change: &Change) -> Option<JoinSide> {
458 match &change.origin {
459 ChangeOrigin::Flow(from_node) => {
460 if *from_node == self.left_node {
461 Some(JoinSide::Left)
462 } else if *from_node == self.right_node {
463 Some(JoinSide::Right)
464 } else {
465 None
466 }
467 }
468 _ => None,
469 }
470 }
471}
472
473impl RawStatefulOperator for JoinOperator {}
474
475impl SingleStateful for JoinOperator {
476 fn layout(&self) -> RowShape {
477 self.shape.clone()
478 }
479}
480
481impl Operator for JoinOperator {
482 fn id(&self) -> FlowNodeId {
483 self.node
484 }
485
486 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
487 if let ChangeOrigin::Flow(from_node) = &change.origin
488 && *from_node == self.node
489 {
490 return Ok(Change::from_flow(self.node, change.version, Vec::new(), DateTime::default()));
491 }
492
493 let mut state = JoinState::new(self.node);
494 let mut result = Vec::with_capacity(change.diffs.len() * 2);
495
496 let side = self
497 .determine_side(&change)
498 .ok_or_else(|| Error(Box::new(internal!("Join operator received change from unknown node"))))?;
499
500 let compiled_exprs = match side {
501 JoinSide::Left => &self.compiled_left_exprs,
502 JoinSide::Right => &self.compiled_right_exprs,
503 };
504
505 let version = change.version;
506 for diff in change.diffs {
507 match diff {
508 Diff::Insert {
509 post,
510 } => self.apply_join_insert(
511 txn,
512 &post,
513 compiled_exprs,
514 side,
515 version,
516 &mut state,
517 &mut result,
518 )?,
519 Diff::Remove {
520 pre,
521 } => self.apply_join_remove(
522 txn,
523 &pre,
524 compiled_exprs,
525 side,
526 version,
527 &mut state,
528 &mut result,
529 )?,
530 Diff::Update {
531 pre,
532 post,
533 } => self.apply_join_update(
534 txn,
535 &pre,
536 &post,
537 compiled_exprs,
538 side,
539 version,
540 &mut state,
541 &mut result,
542 )?,
543 }
544 }
545
546 Ok(Change::from_flow(self.node, version, result, change.changed_at))
547 }
548
549 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
556 let mut found_columns: Vec<Columns> = Vec::new();
557
558 for &row_number in rows {
559 if let Some(joined) = self.pull_one_joined_row(txn, row_number)? {
560 found_columns.push(joined);
561 }
562 }
563
564 if found_columns.is_empty() {
565 return self.empty_joined_shape(txn);
566 }
567 if found_columns.len() == 1 {
568 return Ok(found_columns.remove(0));
569 }
570
571 let mut result = found_columns.remove(0);
572 for cols in found_columns {
573 result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
574 for (i, col) in cols.columns.into_iter().enumerate() {
575 result.columns.make_mut()[i].extend(col).expect("shape mismatch in join pull");
576 }
577 }
578 Ok(result)
579 }
580}
581
582impl JoinOperator {
583 #[inline]
584 fn pull_one_joined_row(&self, txn: &mut FlowTransaction, row_number: RowNumber) -> Result<Option<Columns>> {
585 let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
586 return Ok(None);
587 };
588 let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
589 return Ok(None);
590 };
591
592 let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
593 if left_cols.is_empty() {
594 return Ok(None);
595 }
596
597 if let Some(right_row_num) = right_row_number {
598 let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
599 if right_cols.is_empty() {
600 return Ok(None);
601 }
602 let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
603 let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
604 joined.row_numbers = CowVec::new(vec![row_number]);
605 Ok(Some(joined))
606 } else {
607 let right_shape = self.right_parent.pull(txn, &[])?;
608 let builder = JoinedColumnsBuilder::new(&left_cols, &right_shape, &self.alias);
609 let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_shape);
610 unmatched.row_numbers = CowVec::new(vec![row_number]);
611 Ok(Some(unmatched))
612 }
613 }
614
615 #[inline]
616 fn empty_joined_shape(&self, txn: &mut FlowTransaction) -> Result<Columns> {
617 let left_shape = self.left_parent.pull(txn, &[])?;
618 let right_shape = self.right_parent.pull(txn, &[])?;
619 let builder = JoinedColumnsBuilder::new(&left_shape, &right_shape, &self.alias);
620 let right_names = builder.right_column_names();
621
622 let mut all_columns: Vec<ColumnWithName> = left_shape
623 .names
624 .iter()
625 .zip(left_shape.columns.iter())
626 .map(|(name, data)| ColumnWithName::new(name.clone(), data.clone()))
627 .collect();
628
629 for (col, aliased_name) in right_shape.columns.into_iter().zip(right_names.iter()) {
630 all_columns.push(ColumnWithName::new(Fragment::internal(aliased_name), col));
631 }
632
633 Ok(Columns::new(all_columns))
634 }
635}
636
637impl JoinOperator {
638 #[inline]
639 #[allow(clippy::too_many_arguments)]
640 fn apply_join_insert(
641 &self,
642 txn: &mut FlowTransaction,
643 post: &Columns,
644 compiled_exprs: &[CompiledExpr],
645 side: JoinSide,
646 version: CommitVersion,
647 state: &mut JoinState,
648 result: &mut Vec<Diff>,
649 ) -> Result<()> {
650 let keys = self.compute_join_keys(post, compiled_exprs)?;
651 let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
652 let mut inserts_undefined: Vec<usize> = Vec::new();
653
654 for (row_idx, key) in keys.iter().enumerate() {
655 if let Some(key_hash) = key {
656 inserts_by_key.entry(*key_hash).or_default().push(row_idx);
657 } else {
658 inserts_undefined.push(row_idx);
659 }
660 }
661
662 for (key_hash, indices) in inserts_by_key {
663 let mut ctx = JoinContext {
664 side,
665 state,
666 operator: self,
667 version,
668 };
669 let diffs = self.strategy.handle_insert(txn, post, &indices, &key_hash, &mut ctx)?;
670 result.extend(diffs);
671 }
672
673 for idx in inserts_undefined {
674 let mut ctx = JoinContext {
675 side,
676 state,
677 operator: self,
678 version,
679 };
680 let diffs = self.strategy.handle_insert_undefined(txn, post, idx, &mut ctx)?;
681 result.extend(diffs);
682 }
683
684 Ok(())
685 }
686
687 #[inline]
688 #[allow(clippy::too_many_arguments)]
689 fn apply_join_remove(
690 &self,
691 txn: &mut FlowTransaction,
692 pre: &Columns,
693 compiled_exprs: &[CompiledExpr],
694 side: JoinSide,
695 version: CommitVersion,
696 state: &mut JoinState,
697 result: &mut Vec<Diff>,
698 ) -> Result<()> {
699 let keys = self.compute_join_keys(pre, compiled_exprs)?;
700 let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
701 let mut removes_undefined: Vec<usize> = Vec::new();
702
703 for (row_idx, key) in keys.iter().enumerate() {
704 if let Some(key_hash) = key {
705 removes_by_key.entry(*key_hash).or_default().push(row_idx);
706 } else {
707 removes_undefined.push(row_idx);
708 }
709 }
710
711 for (key_hash, indices) in removes_by_key {
712 let mut ctx = JoinContext {
713 side,
714 state,
715 operator: self,
716 version,
717 };
718 let diffs = self.strategy.handle_remove(txn, pre, &indices, &key_hash, &mut ctx)?;
719 result.extend(diffs);
720 }
721
722 for idx in removes_undefined {
723 let mut ctx = JoinContext {
724 side,
725 state,
726 operator: self,
727 version,
728 };
729 let diffs = self.strategy.handle_remove_undefined(txn, pre, idx, &mut ctx)?;
730 result.extend(diffs);
731 }
732
733 Ok(())
734 }
735
736 #[inline]
737 #[allow(clippy::too_many_arguments)]
738 fn apply_join_update(
739 &self,
740 txn: &mut FlowTransaction,
741 pre: &Columns,
742 post: &Columns,
743 compiled_exprs: &[CompiledExpr],
744 side: JoinSide,
745 version: CommitVersion,
746 state: &mut JoinState,
747 result: &mut Vec<Diff>,
748 ) -> Result<()> {
749 let pre_keys = self.compute_join_keys(pre, compiled_exprs)?;
750 let post_keys = self.compute_join_keys(post, compiled_exprs)?;
751 let row_count = post.row_count();
752
753 let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> = IndexMap::new();
754 let mut updates_undefined: Vec<usize> = Vec::new();
755
756 for row_idx in 0..row_count {
757 match (pre_keys[row_idx], post_keys[row_idx]) {
758 (Some(pre_key), Some(post_key)) => {
759 updates_by_key.entry((pre_key, post_key)).or_default().push(row_idx);
760 }
761 _ => {
762 updates_undefined.push(row_idx);
763 }
764 }
765 }
766
767 for ((pre_key, post_key), indices) in updates_by_key {
768 let mut ctx = JoinContext {
769 side,
770 state,
771 operator: self,
772 version,
773 };
774 let keys = UpdateKeys {
775 pre: &pre_key,
776 post: &post_key,
777 };
778 let diffs = self.strategy.handle_update(txn, pre, post, &indices, keys, &mut ctx)?;
779 result.extend(diffs);
780 }
781
782 for row_idx in updates_undefined {
783 let mut ctx = JoinContext {
784 side,
785 state,
786 operator: self,
787 version,
788 };
789 let diffs = self.strategy.handle_update_undefined(txn, pre, post, row_idx, &mut ctx)?;
790 result.extend(diffs);
791 }
792
793 Ok(())
794 }
795}