reifydb_sub_flow/operator/stateful/
row_number.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3use reifydb_core::{
4	EncodedKey,
5	interface::FlowNodeId,
6	key::{EncodableKey, FlowNodeStateKey},
7	util::{CowVec, encoding::keycode::KeySerializer},
8	value::encoded::{EncodedKeyRange, EncodedValues},
9};
10use reifydb_type::RowNumber;
11
12use crate::{operator::stateful::RawStatefulOperator, transaction::FlowTransaction};
13
14/// Provides stable encoded numbers for keys with automatic Insert/Update detection
15///
16/// This component maintains:
17/// - A sequential counter for generating new encoded numbers
18/// - A mapping from keys to their assigned encoded numbers
19///
20/// When a key is seen for the first time, it gets a new encoded number and returns
21/// true. When a key is seen again, it returns the existing encoded number and
22/// false.
23pub struct RowNumberProvider {
24	node: FlowNodeId,
25}
26
27impl RowNumberProvider {
28	/// Create a new RowNumberProvider for the given operator
29	pub fn new(node: FlowNodeId) -> Self {
30		Self {
31			node,
32		}
33	}
34
35	/// Get or create a RowNumber for a given key
36	/// Returns (RowNumber, is_new) where is_new indicates if it was newly
37	/// created
38	pub fn get_or_create_row_number<O: RawStatefulOperator>(
39		&self,
40		txn: &mut FlowTransaction,
41		operator: &O,
42		key: &EncodedKey,
43	) -> crate::Result<(RowNumber, bool)> {
44		// Check if we already have a encoded number for this key
45		let map_key = self.make_map_key(key);
46		let encoded_map_key = EncodedKey::new(map_key.clone());
47
48		if let Some(existing_row) = operator.state_get(txn, &encoded_map_key)? {
49			// Key exists, return existing encoded number
50			let bytes = existing_row.as_ref();
51			if bytes.len() >= 8 {
52				let row_num = u64::from_be_bytes([
53					bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
54				]);
55				return Ok((RowNumber(row_num), false));
56			}
57		}
58
59		// Key doesn't exist, generate a new encoded number
60		let counter = self.load_counter::<O>(txn, operator)?;
61		let new_row_number = RowNumber(counter);
62
63		// Save the new counter value
64		self.save_counter::<O>(txn, operator, counter + 1)?;
65
66		// Save the mapping from key to encoded number
67		let row_num_bytes = counter.to_be_bytes().to_vec();
68		operator.state_set(txn, &encoded_map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
69
70		Ok((new_row_number, true))
71	}
72
73	/// Load the current counter value
74	fn load_counter<O: RawStatefulOperator>(&self, txn: &mut FlowTransaction, operator: &O) -> crate::Result<u64> {
75		let key = self.make_counter_key();
76		let encoded_key = EncodedKey::new(key);
77		match operator.state_get(txn, &encoded_key)? {
78			None => Ok(1), // First time, start at 1
79			Some(state_row) => {
80				// Parse the stored counter
81				let bytes = state_row.as_ref();
82				if bytes.len() >= 8 {
83					Ok(u64::from_be_bytes([
84						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
85						bytes[7],
86					]))
87				} else {
88					Ok(1)
89				}
90			}
91		}
92	}
93
94	/// Save the counter value
95	fn save_counter<O: RawStatefulOperator>(
96		&self,
97		txn: &mut FlowTransaction,
98		operator: &O,
99		counter: u64,
100	) -> crate::Result<()> {
101		let key = self.make_counter_key();
102		let encoded_key = EncodedKey::new(key);
103		let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
104		operator.state_set(txn, &encoded_key, value)?;
105		Ok(())
106	}
107
108	/// Create a key for the counter, including node_id
109	fn make_counter_key(&self) -> Vec<u8> {
110		let mut serializer = KeySerializer::new();
111		serializer.extend_u64(self.node.0);
112		serializer.extend_u8(b'C'); // 'C' for counter
113		serializer.finish()
114	}
115
116	/// Create a mapping key for a given encoded key, including node_id
117	fn make_map_key(&self, key: &EncodedKey) -> Vec<u8> {
118		let mut serializer = KeySerializer::new();
119		serializer.extend_u64(self.node.0);
120		serializer.extend_u8(b'M'); // 'M' for mapping
121		serializer.extend_bytes(key.as_ref());
122		serializer.finish()
123	}
124
125	/// Remove all encoded number mappings with the given prefix
126	/// This is useful for cleaning up all join results from a specific left encoded
127	pub fn remove_by_prefix<O: RawStatefulOperator>(
128		&self,
129		txn: &mut FlowTransaction,
130		operator: &O,
131		key_prefix: &[u8],
132	) -> crate::Result<()> {
133		// Create the prefix for scanning
134		let mut prefix = Vec::new();
135		let mut serializer = KeySerializer::new();
136		serializer.extend_u64(self.node.0);
137		serializer.extend_u8(b'M'); // 'M' for mapping
138		prefix.extend_from_slice(&serializer.finish());
139		prefix.extend_from_slice(key_prefix);
140
141		// Create range for prefix scan with the operator state prefix
142		let state_prefix = FlowNodeStateKey::new(operator.id(), prefix.clone());
143		let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
144
145		// Collect keys to remove (similar pattern to state_clear in utils.rs)
146		let keys_to_remove: Vec<_> = txn.range(full_range)?.map(|multi| multi.key).collect();
147
148		for key in keys_to_remove {
149			txn.remove(&key)?;
150		}
151
152		Ok(())
153	}
154}
155
156#[cfg(test)]
157mod tests {
158	use reifydb_core::CommitVersion;
159
160	use super::*;
161	use crate::operator::stateful::test_utils::test::*;
162
163	// TestOperator already implements SimpleStatefulOperator
164
165	#[test]
166	fn test_first_row_number() {
167		let mut txn = create_test_transaction();
168		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
169		let operator = TestOperator::simple(FlowNodeId(1));
170		let provider = RowNumberProvider::new(FlowNodeId(1));
171
172		let key = test_key("first");
173		let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
174
175		assert_eq!(row_num.0, 1);
176		assert!(is_new);
177	}
178
179	#[test]
180	fn test_duplicate_key_same_row_number() {
181		let mut txn = create_test_transaction();
182		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
183		let operator = TestOperator::simple(FlowNodeId(1));
184		let provider = RowNumberProvider::new(FlowNodeId(1));
185
186		let key = test_key("duplicate");
187
188		// First call - should create new
189		let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
190		assert_eq!(row_num1.0, 1);
191		assert!(is_new1);
192
193		// Second call with same key - should return existing
194		let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
195		assert_eq!(row_num2.0, 1);
196		assert!(!is_new2);
197
198		// Row numbers should be the same
199		assert_eq!(row_num1, row_num2);
200	}
201
202	#[test]
203	fn test_sequential_row_numbers() {
204		let mut txn = create_test_transaction();
205		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
206		let operator = TestOperator::simple(FlowNodeId(1));
207		let provider = RowNumberProvider::new(FlowNodeId(1));
208
209		// Create multiple unique keys
210		for i in 1..=5 {
211			let key = test_key(&format!("key_{}", i));
212			let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
213
214			assert_eq!(row_num.0, i as u64);
215			assert!(is_new);
216		}
217	}
218
219	#[test]
220	fn test_mixed_new_and_existing() {
221		let mut txn = create_test_transaction();
222		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
223		let operator = TestOperator::simple(FlowNodeId(1));
224		let provider = RowNumberProvider::new(FlowNodeId(1));
225
226		// Create some keys
227		let key1 = test_key("mixed_1");
228		let key2 = test_key("mixed_2");
229		let key3 = test_key("mixed_3");
230
231		// First round - all new
232		let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &operator, &key1).unwrap();
233		let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &operator, &key2).unwrap();
234		let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &operator, &key3).unwrap();
235
236		assert_eq!(rn1.0, 1);
237		assert!(new1);
238		assert_eq!(rn2.0, 2);
239		assert!(new2);
240		assert_eq!(rn3.0, 3);
241		assert!(new3);
242
243		// Second round - mixed
244		let key4 = test_key("mixed_4");
245		let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &operator, &key2).unwrap();
246		let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &operator, &key4).unwrap();
247		let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &operator, &key1).unwrap();
248
249		assert_eq!(rn2_again.0, 2);
250		assert!(!new2_again);
251		assert_eq!(rn4.0, 4); // Next sequential number
252		assert!(new4);
253		assert_eq!(rn1_again.0, 1);
254		assert!(!new1_again);
255	}
256
257	#[test]
258	fn test_multiple_providers_isolated() {
259		let mut txn = create_test_transaction();
260		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
261		let operator1 = TestOperator::simple(FlowNodeId(1));
262		let operator2 = TestOperator::simple(FlowNodeId(2));
263		let provider1 = RowNumberProvider::new(FlowNodeId(1));
264		let provider2 = RowNumberProvider::new(FlowNodeId(2));
265
266		let key = test_key("shared_key");
267
268		// Same key in different providers should get different encoded numbers
269		let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &operator1, &key).unwrap();
270		let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &operator2, &key).unwrap();
271
272		assert_eq!(rn1.0, 1);
273		assert_eq!(rn2.0, 1);
274
275		// Add more keys to provider1
276		let key2 = test_key("key2");
277		let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &operator1, &key2).unwrap();
278		assert_eq!(rn1_2.0, 2);
279
280		// Provider2 should still be at 1 for new keys
281		let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &operator2, &key2).unwrap();
282		assert_eq!(rn2_2.0, 2);
283	}
284
285	#[test]
286	fn test_counter_persistence() {
287		let mut txn = create_test_transaction();
288		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
289		let operator = TestOperator::simple(FlowNodeId(1));
290		let provider = RowNumberProvider::new(FlowNodeId(1));
291
292		// Create some encoded numbers
293		for i in 1..=3 {
294			let key = test_key(&format!("persist_{}", i));
295			let (rn, _) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
296			assert_eq!(rn.0, i as u64);
297		}
298
299		// Simulate loading counter again (internally happens in get_or_create)
300		let new_key = test_key("persist_new");
301		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &new_key).unwrap();
302
303		// Should continue from where we left off
304		assert_eq!(rn.0, 4);
305		assert!(is_new);
306	}
307
308	#[test]
309	fn test_large_row_numbers() {
310		let mut txn = create_test_transaction();
311		let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
312		let operator = TestOperator::simple(FlowNodeId(1));
313		let provider = RowNumberProvider::new(FlowNodeId(1));
314
315		// Create many encoded numbers
316		for i in 1..=1000 {
317			let key = test_key(&format!("large_{}", i));
318			let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
319			assert_eq!(rn.0, i as u64);
320			assert!(is_new);
321		}
322
323		// Verify we can still retrieve early ones
324		let key = test_key("large_1");
325		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
326		assert_eq!(rn.0, 1);
327		assert!(!is_new);
328
329		// And continue adding new ones
330		let key = test_key("large_1001");
331		let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
332		assert_eq!(rn.0, 1001);
333		assert!(is_new);
334	}
335}