Skip to main content

reifydb_sub_flow/operator/stateful/
row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use std::iter::once;
4
5use reifydb_core::{
6	encoded::{
7		key::{EncodedKey, EncodedKeyRange},
8		row::EncodedRow,
9	},
10	interface::catalog::flow::FlowNodeId,
11	key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12	util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17	operator::stateful::{
18		counter::{Counter, CounterDirection},
19		utils::{internal_state_get, internal_state_set},
20	},
21	transaction::FlowTransaction,
22};
23
24/// Provides stable encoded numbers for keys with automatic Insert/Update detection
25///
26/// This component maintains:
27/// - A sequential counter for generating new encoded numbers
28/// - A mapping from keys to their assigned encoded numbers
29///
30/// When a key is seen for the first time, it gets a new encoded number and returns
31/// true. When a key is seen again, it returns the existing encoded number and
32/// false.
33pub struct RowNumberProvider {
34	node: FlowNodeId,
35	counter: Counter,
36}
37
38impl RowNumberProvider {
39	/// Create a new RowNumberProvider for the given operator
40	pub fn new(node: FlowNodeId) -> Self {
41		Self {
42			node,
43			counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44		}
45	}
46
47	/// Get or create RowNumbers for multiple keys
48	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
49	/// where is_new indicates if the row number was newly created
50	pub fn get_or_create_row_numbers<'a, I>(
51		&self,
52		txn: &mut FlowTransaction,
53		keys: I,
54	) -> Result<Vec<(RowNumber, bool)>>
55	where
56		I: IntoIterator<Item = &'a EncodedKey>,
57	{
58		let mut results = Vec::new();
59
60		for key in keys {
61			let map_key = self.make_map_key(key);
62
63			if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64				let bytes = existing_row.as_slice();
65				if bytes.len() >= 8 {
66					let row_num = u64::from_be_bytes([
67						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68						bytes[7],
69					]);
70					results.push((RowNumber(row_num), false));
71					continue;
72				}
73			}
74
75			let new_row_number = self.counter.next(txn)?;
76
77			// Save the mapping from key to encoded number
78			let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79			internal_state_set(self.node, txn, &map_key, EncodedRow(CowVec::new(row_num_bytes)))?;
80
81			// Save the reverse mapping from row_number to key
82			let reverse_key = self.make_reverse_map_key(new_row_number);
83			internal_state_set(
84				self.node,
85				txn,
86				&reverse_key,
87				EncodedRow(CowVec::new(key.as_ref().to_vec())),
88			)?;
89
90			results.push((new_row_number, true));
91		}
92
93		Ok(results)
94	}
95
96	/// Get or create a RowNumber for a given key
97	/// Returns (RowNumber, is_new) where is_new indicates if it was newly
98	/// created
99	pub fn get_or_create_row_number(
100		&self,
101		txn: &mut FlowTransaction,
102		key: &EncodedKey,
103	) -> Result<(RowNumber, bool)> {
104		Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105	}
106
107	/// Get the original key for a given row number (reverse lookup)
108	pub fn get_key_for_row_number(
109		&self,
110		txn: &mut FlowTransaction,
111		row_number: RowNumber,
112	) -> Result<Option<EncodedKey>> {
113		let reverse_key = self.make_reverse_map_key(row_number);
114		if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115			Ok(Some(EncodedKey::new(key_bytes.to_vec())))
116		} else {
117			Ok(None)
118		}
119	}
120
121	/// Create a mapping key for a given encoded key (node_id added by FlowNodeInternalStateKey wrapper)
122	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123		let mut serializer = KeySerializer::new();
124		serializer.extend_u8(b'M'); // 'M' for mapping
125		serializer.extend_bytes(key.as_ref());
126		EncodedKey::new(serializer.finish())
127	}
128
129	/// Create a reverse mapping key for a given row number (node_id added by FlowNodeInternalStateKey wrapper)
130	fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131		let mut serializer = KeySerializer::new();
132		serializer.extend_u8(b'R'); // 'R' for reverse mapping
133		serializer.extend_u64(row_number.0);
134		EncodedKey::new(serializer.finish())
135	}
136
137	/// Remove all encoded number mappings with the given prefix
138	/// This is useful for cleaning up all join results from a specific left encoded
139	pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> Result<()> {
140		// Create the prefix for scanning
141		let mut prefix = Vec::new();
142		let mut serializer = KeySerializer::new();
143		serializer.extend_u8(b'M'); // 'M' for mapping
144		prefix.extend_from_slice(&serializer.finish());
145		prefix.extend_from_slice(key_prefix);
146
147		let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148		let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150		let keys_to_remove = {
151			let stream = txn.range(full_range, 1024);
152			let mut keys = Vec::new();
153			for result in stream {
154				let multi = result?;
155				keys.push(multi.key);
156			}
157			keys
158		};
159
160		for key in keys_to_remove {
161			txn.remove(&key)?;
162		}
163
164		Ok(())
165	}
166}
167
168#[cfg(test)]
169pub mod tests {
170	use reifydb_catalog::catalog::Catalog;
171	use reifydb_core::common::CommitVersion;
172	use reifydb_runtime::context::clock::{Clock, MockClock};
173	use reifydb_transaction::interceptor::interceptors::Interceptors;
174
175	use super::*;
176	use crate::operator::stateful::test_utils::test::*;
177
178	#[test]
179	fn test_first_row_number() {
180		let mut txn = create_test_transaction();
181		let mut txn = FlowTransaction::deferred(
182			&mut txn,
183			CommitVersion(1),
184			Catalog::testing(),
185			Interceptors::new(),
186			Clock::Mock(MockClock::from_millis(1000)),
187		);
188		let provider = RowNumberProvider::new(FlowNodeId(1));
189
190		let key = test_key("first");
191		let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
192
193		assert_eq!(row_num.0, 1);
194		assert!(is_new);
195	}
196
197	#[test]
198	fn test_duplicate_key_same_row_number() {
199		let mut txn = create_test_transaction();
200		let mut txn = FlowTransaction::deferred(
201			&mut txn,
202			CommitVersion(1),
203			Catalog::testing(),
204			Interceptors::new(),
205			Clock::Mock(MockClock::from_millis(1000)),
206		);
207		let provider = RowNumberProvider::new(FlowNodeId(1));
208
209		let key = test_key("duplicate");
210
211		// First call - should create new
212		let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
213		assert_eq!(row_num1.0, 1);
214		assert!(is_new1);
215
216		// Second call with same key - should return existing
217		let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
218		assert_eq!(row_num2.0, 1);
219		assert!(!is_new2);
220
221		// Row numbers should be the same
222		assert_eq!(row_num1, row_num2);
223	}
224
225	#[test]
226	fn test_sequential_row_numbers() {
227		let mut txn = create_test_transaction();
228		let mut txn = FlowTransaction::deferred(
229			&mut txn,
230			CommitVersion(1),
231			Catalog::testing(),
232			Interceptors::new(),
233			Clock::Mock(MockClock::from_millis(1000)),
234		);
235		let provider = RowNumberProvider::new(FlowNodeId(1));
236
237		// Create multiple unique keys
238		for i in 1..=5 {
239			let key = test_key(&format!("key_{}", i));
240			let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
241
242			assert_eq!(row_num.0, i as u64);
243			assert!(is_new);
244		}
245	}
246
247	#[test]
248	fn test_mixed_new_and_existing() {
249		let mut txn = create_test_transaction();
250		let mut txn = FlowTransaction::deferred(
251			&mut txn,
252			CommitVersion(1),
253			Catalog::testing(),
254			Interceptors::new(),
255			Clock::Mock(MockClock::from_millis(1000)),
256		);
257		let provider = RowNumberProvider::new(FlowNodeId(1));
258
259		// Create some keys
260		let key1 = test_key("mixed_1");
261		let key2 = test_key("mixed_2");
262		let key3 = test_key("mixed_3");
263
264		// First round - all new
265		let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
266		let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
267		let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
268
269		assert_eq!(rn1.0, 1);
270		assert!(new1);
271		assert_eq!(rn2.0, 2);
272		assert!(new2);
273		assert_eq!(rn3.0, 3);
274		assert!(new3);
275
276		// Second round - mixed
277		let key4 = test_key("mixed_4");
278		let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
279		let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
280		let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
281
282		assert_eq!(rn2_again.0, 2);
283		assert!(!new2_again);
284		assert_eq!(rn4.0, 4); // Next sequential number
285		assert!(new4);
286		assert_eq!(rn1_again.0, 1);
287		assert!(!new1_again);
288	}
289
290	#[test]
291	fn test_multiple_providers_isolated() {
292		let mut txn = create_test_transaction();
293		let mut txn = FlowTransaction::deferred(
294			&mut txn,
295			CommitVersion(1),
296			Catalog::testing(),
297			Interceptors::new(),
298			Clock::Mock(MockClock::from_millis(1000)),
299		);
300		let provider1 = RowNumberProvider::new(FlowNodeId(1));
301		let provider2 = RowNumberProvider::new(FlowNodeId(2));
302
303		let key = test_key("shared_key");
304
305		// Same key in different providers should get different encoded numbers
306		let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
307		let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
308
309		assert_eq!(rn1.0, 1);
310		assert_eq!(rn2.0, 1);
311
312		// Add more keys to provider1
313		let key2 = test_key("key2");
314		let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
315		assert_eq!(rn1_2.0, 2);
316
317		// Provider2 should still be at 1 for new keys
318		let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
319		assert_eq!(rn2_2.0, 2);
320	}
321
322	#[test]
323	fn test_counter_persistence() {
324		let mut txn = create_test_transaction();
325		let mut txn = FlowTransaction::deferred(
326			&mut txn,
327			CommitVersion(1),
328			Catalog::testing(),
329			Interceptors::new(),
330			Clock::Mock(MockClock::from_millis(1000)),
331		);
332		let provider = RowNumberProvider::new(FlowNodeId(1));
333
334		// Create some encoded numbers
335		for i in 1..=3 {
336			let key = test_key(&format!("persist_{}", i));
337			let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
338			assert_eq!(rn.0, i as u64);
339		}
340
341		// Simulate loading counter again (internally happens in get_or_create)
342		let new_key = test_key("persist_new");
343		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
344
345		// Should continue from where we left off
346		assert_eq!(rn.0, 4);
347		assert!(is_new);
348	}
349
350	#[test]
351	fn test_large_row_numbers() {
352		let mut txn = create_test_transaction();
353		let mut txn = FlowTransaction::deferred(
354			&mut txn,
355			CommitVersion(1),
356			Catalog::testing(),
357			Interceptors::new(),
358			Clock::Mock(MockClock::from_millis(1000)),
359		);
360		let provider = RowNumberProvider::new(FlowNodeId(1));
361
362		// Create many encoded numbers
363		for i in 1..=1000 {
364			let key = test_key(&format!("large_{}", i));
365			let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
366			assert_eq!(rn.0, i as u64);
367			assert!(is_new);
368		}
369
370		// Verify we can still retrieve early ones
371		let key = test_key("large_1");
372		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
373		assert_eq!(rn.0, 1);
374		assert!(!is_new);
375
376		// And continue adding new ones
377		let key = test_key("large_1001");
378		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
379		assert_eq!(rn.0, 1001);
380		assert!(is_new);
381	}
382
383	#[test]
384	fn test_mixed_existing_and_new_keys() {
385		let mut txn = create_test_transaction();
386		let mut txn = FlowTransaction::deferred(
387			&mut txn,
388			CommitVersion(1),
389			Catalog::testing(),
390			Interceptors::new(),
391			Clock::Mock(MockClock::from_millis(1000)),
392		);
393		let provider = RowNumberProvider::new(FlowNodeId(1));
394
395		// Create 3 initial keys to establish existing row numbers
396		let key1 = test_key("key_1");
397		let key2 = test_key("key_2");
398		let key3 = test_key("key_3");
399
400		let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
401		assert_eq!(rn1.0, 1);
402
403		let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
404		assert_eq!(rn2.0, 2);
405
406		let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
407		assert_eq!(rn3.0, 3);
408
409		// Now test batch with mix of existing and new keys
410		let key4 = test_key("key_4");
411		let key5 = test_key("key_5");
412
413		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
414		let keys = vec![&key2, &key4, &key1, &key5, &key3];
415
416		let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
417
418		// Verify results are in correct order and have correct values
419		assert_eq!(results.len(), 5);
420
421		// key2 (existing) -> row number 2, not new
422		assert_eq!(results[0].0.0, 2);
423		assert!(!results[0].1);
424
425		// key4 (new) -> row number 4, is new
426		assert_eq!(results[1].0.0, 4);
427		assert!(results[1].1);
428
429		// key1 (existing) -> row number 1, not new
430		assert_eq!(results[2].0.0, 1);
431		assert!(!results[2].1);
432
433		// key5 (new) -> row number 5, is new
434		assert_eq!(results[3].0.0, 5);
435		assert!(results[3].1);
436
437		// key3 (existing) -> row number 3, not new
438		assert_eq!(results[4].0.0, 3);
439		assert!(!results[4].1);
440
441		// Verify that counter was only incremented by 2 (for key4 and key5)
442		// by checking that the next new key gets row number 6
443		let key6 = test_key("key_6");
444		let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
445		assert_eq!(rn6.0, 6);
446		assert!(is_new6);
447
448		// Verify all mappings are still correct by retrieving them individually
449		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
450		assert_eq!(check_rn4.0, 4);
451		assert!(!is_new4);
452
453		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
454		assert_eq!(check_rn5.0, 5);
455		assert!(!is_new5);
456
457		// Verify reverse mappings exist for the new keys created in batch
458		let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
459		assert_eq!(reverse_key4, Some(key4));
460
461		let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
462		assert_eq!(reverse_key5, Some(key5));
463
464		// Verify reverse mappings also exist for keys created before batch
465		let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
466		assert_eq!(reverse_key1, Some(key1));
467
468		let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
469		assert_eq!(reverse_key2, Some(key2));
470	}
471}