reifydb_sub_flow/operator/stateful/
row.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3use std::iter::once;
4
5use reifydb_core::{
6	EncodedKey,
7	interface::FlowNodeId,
8	key::{EncodableKey, FlowNodeInternalStateKey},
9	util::{CowVec, encoding::keycode::KeySerializer},
10	value::encoded::{EncodedKeyRange, EncodedValues},
11};
12use reifydb_type::RowNumber;
13
14use crate::{
15	operator::stateful::utils::{internal_state_get, internal_state_set},
16	transaction::FlowTransaction,
17};
18
19/// Provides stable encoded numbers for keys with automatic Insert/Update detection
20///
21/// This component maintains:
22/// - A sequential counter for generating new encoded numbers
23/// - A mapping from keys to their assigned encoded numbers
24///
25/// When a key is seen for the first time, it gets a new encoded number and returns
26/// true. When a key is seen again, it returns the existing encoded number and
27/// false.
28pub struct RowNumberProvider {
29	node: FlowNodeId,
30}
31
32impl RowNumberProvider {
33	/// Create a new RowNumberProvider for the given operator
34	pub fn new(node: FlowNodeId) -> Self {
35		Self {
36			node,
37		}
38	}
39
40	/// Get or create RowNumbers for a batch of keys
41	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
42	/// where is_new indicates if the row number was newly created
43	pub fn get_or_create_row_numbers_batch<'a, I>(
44		&self,
45		txn: &mut FlowTransaction,
46		keys: I,
47	) -> crate::Result<Vec<(RowNumber, bool)>>
48	where
49		I: IntoIterator<Item = &'a EncodedKey>,
50	{
51		let mut results = Vec::new();
52		let mut counter = self.load_counter(txn)?;
53		let initial_counter = counter;
54
55		for key in keys {
56			let map_key = self.make_map_key(key);
57
58			if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
59				let bytes = existing_row.as_ref();
60				if bytes.len() >= 8 {
61					let row_num = u64::from_be_bytes([
62						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
63						bytes[7],
64					]);
65					results.push((RowNumber(row_num), false));
66					continue;
67				}
68			}
69
70			let new_row_number = RowNumber(counter);
71
72			// Save the mapping from key to encoded number
73			let row_num_bytes = counter.to_be_bytes().to_vec();
74			internal_state_set(self.node, txn, &map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
75
76			// Save the reverse mapping from row_number to key
77			let reverse_key = self.make_reverse_map_key(new_row_number);
78			internal_state_set(
79				self.node,
80				txn,
81				&reverse_key,
82				EncodedValues(CowVec::new(key.as_ref().to_vec())),
83			)?;
84
85			results.push((new_row_number, true));
86			counter += 1;
87		}
88
89		// Save the updated counter if we allocated any new row numbers
90		if counter != initial_counter {
91			self.save_counter(txn, counter)?;
92		}
93
94		Ok(results)
95	}
96
97	/// Get or create a RowNumber for a given key
98	/// Returns (RowNumber, is_new) where is_new indicates if it was newly
99	/// created
100	pub fn get_or_create_row_number(
101		&self,
102		txn: &mut FlowTransaction,
103		key: &EncodedKey,
104	) -> crate::Result<(RowNumber, bool)> {
105		Ok(self.get_or_create_row_numbers_batch(txn, once(key))?.into_iter().next().unwrap())
106	}
107
108	/// Get the original key for a given row number (reverse lookup)
109	pub fn get_key_for_row_number(
110		&self,
111		txn: &mut FlowTransaction,
112		row_number: RowNumber,
113	) -> crate::Result<Option<EncodedKey>> {
114		let reverse_key = self.make_reverse_map_key(row_number);
115		if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
116			Ok(Some(EncodedKey::new(key_bytes.as_ref().to_vec())))
117		} else {
118			Ok(None)
119		}
120	}
121
122	/// Load the current counter value
123	fn load_counter(&self, txn: &mut FlowTransaction) -> crate::Result<u64> {
124		let key = self.make_counter_key();
125		match internal_state_get(self.node, txn, &key)? {
126			None => Ok(1), // First time, start at 1
127			Some(state_row) => {
128				// Parse the stored counter
129				let bytes = state_row.as_ref();
130				if bytes.len() >= 8 {
131					Ok(u64::from_be_bytes([
132						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
133						bytes[7],
134					]))
135				} else {
136					Ok(1)
137				}
138			}
139		}
140	}
141
142	/// Save the counter value
143	fn save_counter(&self, txn: &mut FlowTransaction, counter: u64) -> crate::Result<()> {
144		let key = self.make_counter_key();
145		let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
146		internal_state_set(self.node, txn, &key, value)?;
147		Ok(())
148	}
149
150	/// Create a key for the counter (node_id added by FlowNodeInternalStateKey wrapper)
151	fn make_counter_key(&self) -> EncodedKey {
152		let mut serializer = KeySerializer::new();
153		serializer.extend_u8(b'C'); // 'C' for counter
154		EncodedKey::new(serializer.finish())
155	}
156
157	/// Create a mapping key for a given encoded key (node_id added by FlowNodeInternalStateKey wrapper)
158	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
159		let mut serializer = KeySerializer::new();
160		serializer.extend_u8(b'M'); // 'M' for mapping
161		serializer.extend_bytes(key.as_ref());
162		EncodedKey::new(serializer.finish())
163	}
164
165	/// Create a reverse mapping key for a given row number (node_id added by FlowNodeInternalStateKey wrapper)
166	fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
167		let mut serializer = KeySerializer::new();
168		serializer.extend_u8(b'R'); // 'R' for reverse mapping
169		serializer.extend_u64(row_number.0);
170		EncodedKey::new(serializer.finish())
171	}
172
173	/// Remove all encoded number mappings with the given prefix
174	/// This is useful for cleaning up all join results from a specific left encoded
175	pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> crate::Result<()> {
176		// Create the prefix for scanning
177		let mut prefix = Vec::new();
178		let mut serializer = KeySerializer::new();
179		serializer.extend_u8(b'M'); // 'M' for mapping
180		prefix.extend_from_slice(&serializer.finish());
181		prefix.extend_from_slice(key_prefix);
182
183		let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
184		let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
185
186		let keys_to_remove: Vec<_> = txn.range(full_range)?.map(|multi| multi.key).collect();
187
188		for key in keys_to_remove {
189			txn.remove(&key)?;
190		}
191
192		Ok(())
193	}
194}
195
196#[cfg(test)]
197mod tests {
198	use reifydb_core::CommitVersion;
199
200	use super::*;
201	use crate::operator::stateful::test_utils::test::*;
202
203	#[test]
204	fn test_first_row_number() {
205		let mut txn = create_test_transaction();
206		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
207		let provider = RowNumberProvider::new(FlowNodeId(1));
208
209		let key = test_key("first");
210		let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
211
212		assert_eq!(row_num.0, 1);
213		assert!(is_new);
214	}
215
216	#[test]
217	fn test_duplicate_key_same_row_number() {
218		let mut txn = create_test_transaction();
219		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
220		let provider = RowNumberProvider::new(FlowNodeId(1));
221
222		let key = test_key("duplicate");
223
224		// First call - should create new
225		let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
226		assert_eq!(row_num1.0, 1);
227		assert!(is_new1);
228
229		// Second call with same key - should return existing
230		let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
231		assert_eq!(row_num2.0, 1);
232		assert!(!is_new2);
233
234		// Row numbers should be the same
235		assert_eq!(row_num1, row_num2);
236	}
237
238	#[test]
239	fn test_sequential_row_numbers() {
240		let mut txn = create_test_transaction();
241		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
242		let provider = RowNumberProvider::new(FlowNodeId(1));
243
244		// Create multiple unique keys
245		for i in 1..=5 {
246			let key = test_key(&format!("key_{}", i));
247			let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
248
249			assert_eq!(row_num.0, i as u64);
250			assert!(is_new);
251		}
252	}
253
254	#[test]
255	fn test_mixed_new_and_existing() {
256		let mut txn = create_test_transaction();
257		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
258		let provider = RowNumberProvider::new(FlowNodeId(1));
259
260		// Create some keys
261		let key1 = test_key("mixed_1");
262		let key2 = test_key("mixed_2");
263		let key3 = test_key("mixed_3");
264
265		// First round - all new
266		let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
267		let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
268		let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
269
270		assert_eq!(rn1.0, 1);
271		assert!(new1);
272		assert_eq!(rn2.0, 2);
273		assert!(new2);
274		assert_eq!(rn3.0, 3);
275		assert!(new3);
276
277		// Second round - mixed
278		let key4 = test_key("mixed_4");
279		let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
280		let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
281		let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
282
283		assert_eq!(rn2_again.0, 2);
284		assert!(!new2_again);
285		assert_eq!(rn4.0, 4); // Next sequential number
286		assert!(new4);
287		assert_eq!(rn1_again.0, 1);
288		assert!(!new1_again);
289	}
290
291	#[test]
292	fn test_multiple_providers_isolated() {
293		let mut txn = create_test_transaction();
294		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
295		let provider1 = RowNumberProvider::new(FlowNodeId(1));
296		let provider2 = RowNumberProvider::new(FlowNodeId(2));
297
298		let key = test_key("shared_key");
299
300		// Same key in different providers should get different encoded numbers
301		let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
302		let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
303
304		assert_eq!(rn1.0, 1);
305		assert_eq!(rn2.0, 1);
306
307		// Add more keys to provider1
308		let key2 = test_key("key2");
309		let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
310		assert_eq!(rn1_2.0, 2);
311
312		// Provider2 should still be at 1 for new keys
313		let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
314		assert_eq!(rn2_2.0, 2);
315	}
316
317	#[test]
318	fn test_counter_persistence() {
319		let mut txn = create_test_transaction();
320		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
321		let operator = TestOperator::simple(FlowNodeId(1));
322		let provider = RowNumberProvider::new(FlowNodeId(1));
323
324		// Create some encoded numbers
325		for i in 1..=3 {
326			let key = test_key(&format!("persist_{}", i));
327			let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
328			assert_eq!(rn.0, i as u64);
329		}
330
331		// Simulate loading counter again (internally happens in get_or_create)
332		let new_key = test_key("persist_new");
333		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
334
335		// Should continue from where we left off
336		assert_eq!(rn.0, 4);
337		assert!(is_new);
338	}
339
340	#[test]
341	fn test_large_row_numbers() {
342		let mut txn = create_test_transaction();
343		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
344		let operator = TestOperator::simple(FlowNodeId(1));
345		let provider = RowNumberProvider::new(FlowNodeId(1));
346
347		// Create many encoded numbers
348		for i in 1..=1000 {
349			let key = test_key(&format!("large_{}", i));
350			let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
351			assert_eq!(rn.0, i as u64);
352			assert!(is_new);
353		}
354
355		// Verify we can still retrieve early ones
356		let key = test_key("large_1");
357		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
358		assert_eq!(rn.0, 1);
359		assert!(!is_new);
360
361		// And continue adding new ones
362		let key = test_key("large_1001");
363		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
364		assert_eq!(rn.0, 1001);
365		assert!(is_new);
366	}
367
368	#[test]
369	fn test_batch_mixed_existing_and_new_keys() {
370		let mut txn = create_test_transaction();
371		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
372		let operator = TestOperator::simple(FlowNodeId(1));
373		let provider = RowNumberProvider::new(FlowNodeId(1));
374
375		// Create 3 initial keys to establish existing row numbers
376		let key1 = test_key("batch_key_1");
377		let key2 = test_key("batch_key_2");
378		let key3 = test_key("batch_key_3");
379
380		let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
381		assert_eq!(rn1.0, 1);
382
383		let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
384		assert_eq!(rn2.0, 2);
385
386		let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
387		assert_eq!(rn3.0, 3);
388
389		// Now test batch with mix of existing and new keys
390		let key4 = test_key("batch_key_4");
391		let key5 = test_key("batch_key_5");
392
393		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
394		let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
395
396		let results = provider.get_or_create_row_numbers_batch(&mut txn, batch_keys.into_iter()).unwrap();
397
398		// Verify results are in correct order and have correct values
399		assert_eq!(results.len(), 5);
400
401		// key2 (existing) -> row number 2, not new
402		assert_eq!(results[0].0.0, 2);
403		assert!(!results[0].1);
404
405		// key4 (new) -> row number 4, is new
406		assert_eq!(results[1].0.0, 4);
407		assert!(results[1].1);
408
409		// key1 (existing) -> row number 1, not new
410		assert_eq!(results[2].0.0, 1);
411		assert!(!results[2].1);
412
413		// key5 (new) -> row number 5, is new
414		assert_eq!(results[3].0.0, 5);
415		assert!(results[3].1);
416
417		// key3 (existing) -> row number 3, not new
418		assert_eq!(results[4].0.0, 3);
419		assert!(!results[4].1);
420
421		// Verify that counter was only incremented by 2 (for key4 and key5)
422		// by checking that the next new key gets row number 6
423		let key6 = test_key("batch_key_6");
424		let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
425		assert_eq!(rn6.0, 6);
426		assert!(is_new6);
427
428		// Verify all mappings are still correct by retrieving them individually
429		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
430		assert_eq!(check_rn4.0, 4);
431		assert!(!is_new4);
432
433		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
434		assert_eq!(check_rn5.0, 5);
435		assert!(!is_new5);
436
437		// Verify reverse mappings exist for the new keys created in batch
438		let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
439		assert_eq!(reverse_key4, Some(key4));
440
441		let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
442		assert_eq!(reverse_key5, Some(key5));
443
444		// Verify reverse mappings also exist for keys created before batch
445		let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
446		assert_eq!(reverse_key1, Some(key1));
447
448		let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
449		assert_eq!(reverse_key2, Some(key2));
450	}
451}