Skip to main content

reifydb_sub_flow/operator/stateful/
row.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3use std::iter::once;
4
5use reifydb_core::{
6	encoded::{
7		encoded::EncodedValues,
8		key::{EncodedKey, EncodedKeyRange},
9	},
10	interface::catalog::flow::FlowNodeId,
11	key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12	util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17	operator::stateful::{
18		counter::{Counter, CounterDirection},
19		utils::{internal_state_get, internal_state_set},
20	},
21	transaction::FlowTransaction,
22};
23
24/// Provides stable encoded numbers for keys with automatic Insert/Update detection
25///
26/// This component maintains:
27/// - A sequential counter for generating new encoded numbers
28/// - A mapping from keys to their assigned encoded numbers
29///
30/// When a key is seen for the first time, it gets a new encoded number and returns
31/// true. When a key is seen again, it returns the existing encoded number and
32/// false.
33pub struct RowNumberProvider {
34	node: FlowNodeId,
35	counter: Counter,
36}
37
38impl RowNumberProvider {
39	/// Create a new RowNumberProvider for the given operator
40	pub fn new(node: FlowNodeId) -> Self {
41		Self {
42			node,
43			counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44		}
45	}
46
47	/// Get or create RowNumbers for multiple keys
48	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
49	/// where is_new indicates if the row number was newly created
50	pub fn get_or_create_row_numbers<'a, I>(
51		&self,
52		txn: &mut FlowTransaction,
53		keys: I,
54	) -> reifydb_type::Result<Vec<(RowNumber, bool)>>
55	where
56		I: IntoIterator<Item = &'a EncodedKey>,
57	{
58		let mut results = Vec::new();
59
60		for key in keys {
61			let map_key = self.make_map_key(key);
62
63			if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64				let bytes = existing_row.as_ref();
65				if bytes.len() >= 8 {
66					let row_num = u64::from_be_bytes([
67						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68						bytes[7],
69					]);
70					results.push((RowNumber(row_num), false));
71					continue;
72				}
73			}
74
75			let new_row_number = self.counter.next(txn)?;
76
77			// Save the mapping from key to encoded number
78			let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79			internal_state_set(self.node, txn, &map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
80
81			// Save the reverse mapping from row_number to key
82			let reverse_key = self.make_reverse_map_key(new_row_number);
83			internal_state_set(
84				self.node,
85				txn,
86				&reverse_key,
87				EncodedValues(CowVec::new(key.as_ref().to_vec())),
88			)?;
89
90			results.push((new_row_number, true));
91		}
92
93		Ok(results)
94	}
95
96	/// Get or create a RowNumber for a given key
97	/// Returns (RowNumber, is_new) where is_new indicates if it was newly
98	/// created
99	pub fn get_or_create_row_number(
100		&self,
101		txn: &mut FlowTransaction,
102		key: &EncodedKey,
103	) -> reifydb_type::Result<(RowNumber, bool)> {
104		Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105	}
106
107	/// Get the original key for a given row number (reverse lookup)
108	pub fn get_key_for_row_number(
109		&self,
110		txn: &mut FlowTransaction,
111		row_number: RowNumber,
112	) -> reifydb_type::Result<Option<EncodedKey>> {
113		let reverse_key = self.make_reverse_map_key(row_number);
114		if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115			Ok(Some(EncodedKey::new(key_bytes.as_ref().to_vec())))
116		} else {
117			Ok(None)
118		}
119	}
120
121	/// Create a mapping key for a given encoded key (node_id added by FlowNodeInternalStateKey wrapper)
122	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123		let mut serializer = KeySerializer::new();
124		serializer.extend_u8(b'M'); // 'M' for mapping
125		serializer.extend_bytes(key.as_ref());
126		EncodedKey::new(serializer.finish())
127	}
128
129	/// Create a reverse mapping key for a given row number (node_id added by FlowNodeInternalStateKey wrapper)
130	fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131		let mut serializer = KeySerializer::new();
132		serializer.extend_u8(b'R'); // 'R' for reverse mapping
133		serializer.extend_u64(row_number.0);
134		EncodedKey::new(serializer.finish())
135	}
136
137	/// Remove all encoded number mappings with the given prefix
138	/// This is useful for cleaning up all join results from a specific left encoded
139	pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> reifydb_type::Result<()> {
140		// Create the prefix for scanning
141		let mut prefix = Vec::new();
142		let mut serializer = KeySerializer::new();
143		serializer.extend_u8(b'M'); // 'M' for mapping
144		prefix.extend_from_slice(&serializer.finish());
145		prefix.extend_from_slice(key_prefix);
146
147		let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148		let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150		let keys_to_remove = {
151			let mut stream = txn.range(full_range, 1024);
152			let mut keys = Vec::new();
153			while let Some(result) = stream.next() {
154				let multi = result?;
155				keys.push(multi.key);
156			}
157			keys
158		};
159
160		for key in keys_to_remove {
161			txn.remove(&key)?;
162		}
163
164		Ok(())
165	}
166}
167
168#[cfg(test)]
169pub mod tests {
170	use reifydb_catalog::catalog::Catalog;
171	use reifydb_core::common::CommitVersion;
172	use reifydb_transaction::interceptor::interceptors::Interceptors;
173
174	use super::*;
175	use crate::operator::stateful::test_utils::test::*;
176
177	#[test]
178	fn test_first_row_number() {
179		let mut txn = create_test_transaction();
180		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
181		let provider = RowNumberProvider::new(FlowNodeId(1));
182
183		let key = test_key("first");
184		let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
185
186		assert_eq!(row_num.0, 1);
187		assert!(is_new);
188	}
189
190	#[test]
191	fn test_duplicate_key_same_row_number() {
192		let mut txn = create_test_transaction();
193		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
194		let provider = RowNumberProvider::new(FlowNodeId(1));
195
196		let key = test_key("duplicate");
197
198		// First call - should create new
199		let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
200		assert_eq!(row_num1.0, 1);
201		assert!(is_new1);
202
203		// Second call with same key - should return existing
204		let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
205		assert_eq!(row_num2.0, 1);
206		assert!(!is_new2);
207
208		// Row numbers should be the same
209		assert_eq!(row_num1, row_num2);
210	}
211
212	#[test]
213	fn test_sequential_row_numbers() {
214		let mut txn = create_test_transaction();
215		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
216		let provider = RowNumberProvider::new(FlowNodeId(1));
217
218		// Create multiple unique keys
219		for i in 1..=5 {
220			let key = test_key(&format!("key_{}", i));
221			let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
222
223			assert_eq!(row_num.0, i as u64);
224			assert!(is_new);
225		}
226	}
227
228	#[test]
229	fn test_mixed_new_and_existing() {
230		let mut txn = create_test_transaction();
231		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
232		let provider = RowNumberProvider::new(FlowNodeId(1));
233
234		// Create some keys
235		let key1 = test_key("mixed_1");
236		let key2 = test_key("mixed_2");
237		let key3 = test_key("mixed_3");
238
239		// First round - all new
240		let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
241		let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
242		let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
243
244		assert_eq!(rn1.0, 1);
245		assert!(new1);
246		assert_eq!(rn2.0, 2);
247		assert!(new2);
248		assert_eq!(rn3.0, 3);
249		assert!(new3);
250
251		// Second round - mixed
252		let key4 = test_key("mixed_4");
253		let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
254		let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
255		let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
256
257		assert_eq!(rn2_again.0, 2);
258		assert!(!new2_again);
259		assert_eq!(rn4.0, 4); // Next sequential number
260		assert!(new4);
261		assert_eq!(rn1_again.0, 1);
262		assert!(!new1_again);
263	}
264
265	#[test]
266	fn test_multiple_providers_isolated() {
267		let mut txn = create_test_transaction();
268		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
269		let provider1 = RowNumberProvider::new(FlowNodeId(1));
270		let provider2 = RowNumberProvider::new(FlowNodeId(2));
271
272		let key = test_key("shared_key");
273
274		// Same key in different providers should get different encoded numbers
275		let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
276		let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
277
278		assert_eq!(rn1.0, 1);
279		assert_eq!(rn2.0, 1);
280
281		// Add more keys to provider1
282		let key2 = test_key("key2");
283		let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
284		assert_eq!(rn1_2.0, 2);
285
286		// Provider2 should still be at 1 for new keys
287		let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
288		assert_eq!(rn2_2.0, 2);
289	}
290
291	#[test]
292	fn test_counter_persistence() {
293		let mut txn = create_test_transaction();
294		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
295		let provider = RowNumberProvider::new(FlowNodeId(1));
296
297		// Create some encoded numbers
298		for i in 1..=3 {
299			let key = test_key(&format!("persist_{}", i));
300			let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
301			assert_eq!(rn.0, i as u64);
302		}
303
304		// Simulate loading counter again (internally happens in get_or_create)
305		let new_key = test_key("persist_new");
306		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
307
308		// Should continue from where we left off
309		assert_eq!(rn.0, 4);
310		assert!(is_new);
311	}
312
313	#[test]
314	fn test_large_row_numbers() {
315		let mut txn = create_test_transaction();
316		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
317		let provider = RowNumberProvider::new(FlowNodeId(1));
318
319		// Create many encoded numbers
320		for i in 1..=1000 {
321			let key = test_key(&format!("large_{}", i));
322			let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
323			assert_eq!(rn.0, i as u64);
324			assert!(is_new);
325		}
326
327		// Verify we can still retrieve early ones
328		let key = test_key("large_1");
329		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
330		assert_eq!(rn.0, 1);
331		assert!(!is_new);
332
333		// And continue adding new ones
334		let key = test_key("large_1001");
335		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
336		assert_eq!(rn.0, 1001);
337		assert!(is_new);
338	}
339
340	#[test]
341	fn test_mixed_existing_and_new_keys() {
342		let mut txn = create_test_transaction();
343		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
344		let provider = RowNumberProvider::new(FlowNodeId(1));
345
346		// Create 3 initial keys to establish existing row numbers
347		let key1 = test_key("key_1");
348		let key2 = test_key("key_2");
349		let key3 = test_key("key_3");
350
351		let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
352		assert_eq!(rn1.0, 1);
353
354		let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
355		assert_eq!(rn2.0, 2);
356
357		let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
358		assert_eq!(rn3.0, 3);
359
360		// Now test batch with mix of existing and new keys
361		let key4 = test_key("key_4");
362		let key5 = test_key("key_5");
363
364		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
365		let keys = vec![&key2, &key4, &key1, &key5, &key3];
366
367		let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
368
369		// Verify results are in correct order and have correct values
370		assert_eq!(results.len(), 5);
371
372		// key2 (existing) -> row number 2, not new
373		assert_eq!(results[0].0.0, 2);
374		assert!(!results[0].1);
375
376		// key4 (new) -> row number 4, is new
377		assert_eq!(results[1].0.0, 4);
378		assert!(results[1].1);
379
380		// key1 (existing) -> row number 1, not new
381		assert_eq!(results[2].0.0, 1);
382		assert!(!results[2].1);
383
384		// key5 (new) -> row number 5, is new
385		assert_eq!(results[3].0.0, 5);
386		assert!(results[3].1);
387
388		// key3 (existing) -> row number 3, not new
389		assert_eq!(results[4].0.0, 3);
390		assert!(!results[4].1);
391
392		// Verify that counter was only incremented by 2 (for key4 and key5)
393		// by checking that the next new key gets row number 6
394		let key6 = test_key("key_6");
395		let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
396		assert_eq!(rn6.0, 6);
397		assert!(is_new6);
398
399		// Verify all mappings are still correct by retrieving them individually
400		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
401		assert_eq!(check_rn4.0, 4);
402		assert!(!is_new4);
403
404		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
405		assert_eq!(check_rn5.0, 5);
406		assert!(!is_new5);
407
408		// Verify reverse mappings exist for the new keys created in batch
409		let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
410		assert_eq!(reverse_key4, Some(key4));
411
412		let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
413		assert_eq!(reverse_key5, Some(key5));
414
415		// Verify reverse mappings also exist for keys created before batch
416		let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
417		assert_eq!(reverse_key1, Some(key1));
418
419		let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
420		assert_eq!(reverse_key2, Some(key2));
421	}
422}