Skip to main content

reifydb_sub_flow/operator/stateful/
row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use std::iter::once;
4
5use reifydb_core::{
6	encoded::{
7		key::{EncodedKey, EncodedKeyRange},
8		row::EncodedRow,
9	},
10	interface::catalog::flow::FlowNodeId,
11	key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12	util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17	operator::stateful::{
18		counter::{Counter, CounterDirection},
19		utils::{internal_state_get, internal_state_set},
20	},
21	transaction::FlowTransaction,
22};
23
24/// Provides stable encoded numbers for keys with automatic Insert/Update detection
25///
26/// This component maintains:
27/// - A sequential counter for generating new encoded numbers
28/// - A mapping from keys to their assigned encoded numbers
29///
30/// When a key is seen for the first time, it gets a new encoded number and returns
31/// true. When a key is seen again, it returns the existing encoded number and
32/// false.
33pub struct RowNumberProvider {
34	node: FlowNodeId,
35	counter: Counter,
36}
37
38impl RowNumberProvider {
39	/// Create a new RowNumberProvider for the given operator
40	pub fn new(node: FlowNodeId) -> Self {
41		Self {
42			node,
43			counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44		}
45	}
46
47	/// Get or create RowNumbers for multiple keys
48	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
49	/// where is_new indicates if the row number was newly created
50	pub fn get_or_create_row_numbers<'a, I>(
51		&self,
52		txn: &mut FlowTransaction,
53		keys: I,
54	) -> Result<Vec<(RowNumber, bool)>>
55	where
56		I: IntoIterator<Item = &'a EncodedKey>,
57	{
58		let mut results = Vec::new();
59
60		for key in keys {
61			let map_key = self.make_map_key(key);
62
63			if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64				let bytes = existing_row.as_slice();
65				if bytes.len() >= 8 {
66					let row_num = u64::from_be_bytes([
67						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68						bytes[7],
69					]);
70					results.push((RowNumber(row_num), false));
71					continue;
72				}
73			}
74
75			let new_row_number = self.counter.next(txn)?;
76
77			// Save the mapping from key to encoded number
78			let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79			internal_state_set(self.node, txn, &map_key, EncodedRow(CowVec::new(row_num_bytes)))?;
80
81			// Save the reverse mapping from row_number to key
82			let reverse_key = self.make_reverse_map_key(new_row_number);
83			internal_state_set(
84				self.node,
85				txn,
86				&reverse_key,
87				EncodedRow(CowVec::new(key.as_ref().to_vec())),
88			)?;
89
90			results.push((new_row_number, true));
91		}
92
93		Ok(results)
94	}
95
96	/// Get or create a RowNumber for a given key
97	/// Returns (RowNumber, is_new) where is_new indicates if it was newly
98	/// created
99	pub fn get_or_create_row_number(
100		&self,
101		txn: &mut FlowTransaction,
102		key: &EncodedKey,
103	) -> Result<(RowNumber, bool)> {
104		Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105	}
106
107	/// Get the original key for a given row number (reverse lookup)
108	pub fn get_key_for_row_number(
109		&self,
110		txn: &mut FlowTransaction,
111		row_number: RowNumber,
112	) -> Result<Option<EncodedKey>> {
113		let reverse_key = self.make_reverse_map_key(row_number);
114		if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115			Ok(Some(EncodedKey::new(key_bytes.to_vec())))
116		} else {
117			Ok(None)
118		}
119	}
120
121	/// Create a mapping key for a given encoded key (node_id added by FlowNodeInternalStateKey wrapper)
122	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123		let mut serializer = KeySerializer::new();
124		serializer.extend_u8(b'M'); // 'M' for mapping
125		serializer.extend_bytes(key.as_ref());
126		EncodedKey::new(serializer.finish())
127	}
128
129	/// Create a reverse mapping key for a given row number (node_id added by FlowNodeInternalStateKey wrapper)
130	fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131		let mut serializer = KeySerializer::new();
132		serializer.extend_u8(b'R'); // 'R' for reverse mapping
133		serializer.extend_u64(row_number.0);
134		EncodedKey::new(serializer.finish())
135	}
136
137	/// Remove all encoded number mappings with the given prefix
138	/// This is useful for cleaning up all join results from a specific left encoded
139	pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> Result<()> {
140		// Create the prefix for scanning
141		let mut prefix = Vec::new();
142		let mut serializer = KeySerializer::new();
143		serializer.extend_u8(b'M'); // 'M' for mapping
144		prefix.extend_from_slice(&serializer.finish());
145		prefix.extend_from_slice(key_prefix);
146
147		let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148		let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150		let keys_to_remove = {
151			let mut stream = txn.range(full_range, 1024);
152			let mut keys = Vec::new();
153			while let Some(result) = stream.next() {
154				let multi = result?;
155				keys.push(multi.key);
156			}
157			keys
158		};
159
160		for key in keys_to_remove {
161			txn.remove(&key)?;
162		}
163
164		Ok(())
165	}
166}
167
168#[cfg(test)]
169pub mod tests {
170	use reifydb_catalog::catalog::Catalog;
171	use reifydb_core::common::CommitVersion;
172	use reifydb_transaction::interceptor::interceptors::Interceptors;
173
174	use super::*;
175	use crate::operator::stateful::test_utils::test::*;
176
177	#[test]
178	fn test_first_row_number() {
179		let mut txn = create_test_transaction();
180		let mut txn =
181			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
182		let provider = RowNumberProvider::new(FlowNodeId(1));
183
184		let key = test_key("first");
185		let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
186
187		assert_eq!(row_num.0, 1);
188		assert!(is_new);
189	}
190
191	#[test]
192	fn test_duplicate_key_same_row_number() {
193		let mut txn = create_test_transaction();
194		let mut txn =
195			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196		let provider = RowNumberProvider::new(FlowNodeId(1));
197
198		let key = test_key("duplicate");
199
200		// First call - should create new
201		let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
202		assert_eq!(row_num1.0, 1);
203		assert!(is_new1);
204
205		// Second call with same key - should return existing
206		let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
207		assert_eq!(row_num2.0, 1);
208		assert!(!is_new2);
209
210		// Row numbers should be the same
211		assert_eq!(row_num1, row_num2);
212	}
213
214	#[test]
215	fn test_sequential_row_numbers() {
216		let mut txn = create_test_transaction();
217		let mut txn =
218			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
219		let provider = RowNumberProvider::new(FlowNodeId(1));
220
221		// Create multiple unique keys
222		for i in 1..=5 {
223			let key = test_key(&format!("key_{}", i));
224			let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
225
226			assert_eq!(row_num.0, i as u64);
227			assert!(is_new);
228		}
229	}
230
231	#[test]
232	fn test_mixed_new_and_existing() {
233		let mut txn = create_test_transaction();
234		let mut txn =
235			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
236		let provider = RowNumberProvider::new(FlowNodeId(1));
237
238		// Create some keys
239		let key1 = test_key("mixed_1");
240		let key2 = test_key("mixed_2");
241		let key3 = test_key("mixed_3");
242
243		// First round - all new
244		let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
245		let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
246		let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
247
248		assert_eq!(rn1.0, 1);
249		assert!(new1);
250		assert_eq!(rn2.0, 2);
251		assert!(new2);
252		assert_eq!(rn3.0, 3);
253		assert!(new3);
254
255		// Second round - mixed
256		let key4 = test_key("mixed_4");
257		let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
258		let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
259		let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
260
261		assert_eq!(rn2_again.0, 2);
262		assert!(!new2_again);
263		assert_eq!(rn4.0, 4); // Next sequential number
264		assert!(new4);
265		assert_eq!(rn1_again.0, 1);
266		assert!(!new1_again);
267	}
268
269	#[test]
270	fn test_multiple_providers_isolated() {
271		let mut txn = create_test_transaction();
272		let mut txn =
273			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
274		let provider1 = RowNumberProvider::new(FlowNodeId(1));
275		let provider2 = RowNumberProvider::new(FlowNodeId(2));
276
277		let key = test_key("shared_key");
278
279		// Same key in different providers should get different encoded numbers
280		let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
281		let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
282
283		assert_eq!(rn1.0, 1);
284		assert_eq!(rn2.0, 1);
285
286		// Add more keys to provider1
287		let key2 = test_key("key2");
288		let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
289		assert_eq!(rn1_2.0, 2);
290
291		// Provider2 should still be at 1 for new keys
292		let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
293		assert_eq!(rn2_2.0, 2);
294	}
295
296	#[test]
297	fn test_counter_persistence() {
298		let mut txn = create_test_transaction();
299		let mut txn =
300			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
301		let provider = RowNumberProvider::new(FlowNodeId(1));
302
303		// Create some encoded numbers
304		for i in 1..=3 {
305			let key = test_key(&format!("persist_{}", i));
306			let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
307			assert_eq!(rn.0, i as u64);
308		}
309
310		// Simulate loading counter again (internally happens in get_or_create)
311		let new_key = test_key("persist_new");
312		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
313
314		// Should continue from where we left off
315		assert_eq!(rn.0, 4);
316		assert!(is_new);
317	}
318
319	#[test]
320	fn test_large_row_numbers() {
321		let mut txn = create_test_transaction();
322		let mut txn =
323			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
324		let provider = RowNumberProvider::new(FlowNodeId(1));
325
326		// Create many encoded numbers
327		for i in 1..=1000 {
328			let key = test_key(&format!("large_{}", i));
329			let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
330			assert_eq!(rn.0, i as u64);
331			assert!(is_new);
332		}
333
334		// Verify we can still retrieve early ones
335		let key = test_key("large_1");
336		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
337		assert_eq!(rn.0, 1);
338		assert!(!is_new);
339
340		// And continue adding new ones
341		let key = test_key("large_1001");
342		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
343		assert_eq!(rn.0, 1001);
344		assert!(is_new);
345	}
346
347	#[test]
348	fn test_mixed_existing_and_new_keys() {
349		let mut txn = create_test_transaction();
350		let mut txn =
351			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
352		let provider = RowNumberProvider::new(FlowNodeId(1));
353
354		// Create 3 initial keys to establish existing row numbers
355		let key1 = test_key("key_1");
356		let key2 = test_key("key_2");
357		let key3 = test_key("key_3");
358
359		let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
360		assert_eq!(rn1.0, 1);
361
362		let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
363		assert_eq!(rn2.0, 2);
364
365		let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
366		assert_eq!(rn3.0, 3);
367
368		// Now test batch with mix of existing and new keys
369		let key4 = test_key("key_4");
370		let key5 = test_key("key_5");
371
372		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
373		let keys = vec![&key2, &key4, &key1, &key5, &key3];
374
375		let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
376
377		// Verify results are in correct order and have correct values
378		assert_eq!(results.len(), 5);
379
380		// key2 (existing) -> row number 2, not new
381		assert_eq!(results[0].0.0, 2);
382		assert!(!results[0].1);
383
384		// key4 (new) -> row number 4, is new
385		assert_eq!(results[1].0.0, 4);
386		assert!(results[1].1);
387
388		// key1 (existing) -> row number 1, not new
389		assert_eq!(results[2].0.0, 1);
390		assert!(!results[2].1);
391
392		// key5 (new) -> row number 5, is new
393		assert_eq!(results[3].0.0, 5);
394		assert!(results[3].1);
395
396		// key3 (existing) -> row number 3, not new
397		assert_eq!(results[4].0.0, 3);
398		assert!(!results[4].1);
399
400		// Verify that counter was only incremented by 2 (for key4 and key5)
401		// by checking that the next new key gets row number 6
402		let key6 = test_key("key_6");
403		let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
404		assert_eq!(rn6.0, 6);
405		assert!(is_new6);
406
407		// Verify all mappings are still correct by retrieving them individually
408		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
409		assert_eq!(check_rn4.0, 4);
410		assert!(!is_new4);
411
412		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
413		assert_eq!(check_rn5.0, 5);
414		assert!(!is_new5);
415
416		// Verify reverse mappings exist for the new keys created in batch
417		let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
418		assert_eq!(reverse_key4, Some(key4));
419
420		let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
421		assert_eq!(reverse_key5, Some(key5));
422
423		// Verify reverse mappings also exist for keys created before batch
424		let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
425		assert_eq!(reverse_key1, Some(key1));
426
427		let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
428		assert_eq!(reverse_key2, Some(key2));
429	}
430}