Skip to main content

reifydb_sdk/state/
row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::iter;
5
6use reifydb_core::{
7	encoded::{key::EncodedKey, row::EncodedRow},
8	interface::catalog::flow::FlowNodeId,
9	key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
10	util::encoding::keycode::serializer::KeySerializer,
11};
12use reifydb_type::{util::cowvec::CowVec, value::row_number::RowNumber};
13
14use crate::{error::Result, operator::context::OperatorContext};
15
16/// Provides stable row numbers for keys with automatic Insert/Update detection
17///
18/// This component maintains:
19/// - A sequential counter for generating new row numbers
20/// - A mapping from keys to their assigned row numbers
21///
22/// When a key is seen for the first time, it gets a new row number and returns
23/// true. When a key is seen again, it returns the existing row number and false.
24pub struct RowNumberProvider {
25	node: FlowNodeId,
26}
27
28impl RowNumberProvider {
29	pub fn new(node: FlowNodeId) -> Self {
30		Self {
31			node,
32		}
33	}
34
35	/// Get or create RowNumbers for a batch of keys
36	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
37	/// where is_new indicates if the row number was newly created
38	pub fn get_or_create_row_numbers_batch<'a, I>(
39		&self,
40		ctx: &mut OperatorContext,
41		keys: I,
42	) -> Result<Vec<(RowNumber, bool)>>
43	where
44		I: IntoIterator<Item = &'a EncodedKey>,
45	{
46		let mut results = Vec::new();
47		let mut counter = self.load_counter(ctx)?;
48		let initial_counter = counter;
49
50		for key in keys {
51			// Check if we already have a row number for this key
52			let map_key = self.make_map_key(key);
53			let internal_key = FlowNodeInternalStateKey::new(self.node, map_key.as_ref().to_vec());
54
55			if let Some(existing_row) = ctx.state().get(&internal_key.encode())? {
56				// Key exists, return existing row number
57				let bytes = existing_row.as_ref();
58				if bytes.len() >= 8 {
59					let row_num = u64::from_be_bytes([
60						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
61						bytes[7],
62					]);
63					results.push((RowNumber(row_num), false));
64					continue;
65				}
66			}
67
68			// Key doesn't exist, allocate a new row number
69			let new_row_number = RowNumber(counter);
70
71			// Save the mapping from key to row number
72			let row_num_bytes = counter.to_be_bytes().to_vec();
73			ctx.state().set(&internal_key.encode(), &EncodedRow(CowVec::new(row_num_bytes)))?;
74
75			results.push((new_row_number, true));
76			counter += 1;
77		}
78
79		// Save the updated counter if we allocated any new row numbers
80		if counter != initial_counter {
81			self.save_counter(ctx, counter)?;
82		}
83
84		Ok(results)
85	}
86
87	pub fn get_or_create_row_number(
88		&self,
89		ctx: &mut OperatorContext,
90		key: &EncodedKey,
91	) -> Result<(RowNumber, bool)> {
92		Ok(self.get_or_create_row_numbers_batch(ctx, iter::once(key))?.into_iter().next().unwrap())
93	}
94
95	fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
96		let key = self.make_counter_key();
97		let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
98		match ctx.state().get(&internal_key.encode())? {
99			None => Ok(1), // First time, start at 1
100			Some(state_row) => {
101				// Parse the stored counter
102				let bytes = state_row.as_ref();
103				if bytes.len() >= 8 {
104					Ok(u64::from_be_bytes([
105						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
106						bytes[7],
107					]))
108				} else {
109					Ok(1)
110				}
111			}
112		}
113	}
114
115	fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
116		let key = self.make_counter_key();
117		let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
118		let value = EncodedRow(CowVec::new(counter.to_be_bytes().to_vec()));
119		ctx.state().set(&internal_key.encode(), &value)?;
120		Ok(())
121	}
122
123	fn make_counter_key(&self) -> EncodedKey {
124		let mut serializer = KeySerializer::new();
125		serializer.extend_u8(b'C'); // 'C' for counter
126		EncodedKey::new(serializer.finish())
127	}
128
129	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
130		let mut serializer = KeySerializer::new();
131		serializer.extend_u8(b'M'); // 'M' for mapping
132		serializer.extend_bytes(key.as_ref());
133		EncodedKey::new(serializer.finish())
134	}
135
136	pub fn remove_by_prefix(&self, ctx: &mut OperatorContext, key_prefix: &[u8]) -> Result<()> {
137		// Create the prefix for scanning (node_id added by FlowNodeInternalStateKey wrapper)
138		let mut prefix = Vec::new();
139		let mut serializer = KeySerializer::new();
140		serializer.extend_u8(b'M'); // 'M' for mapping
141		prefix.extend_from_slice(&serializer.finish());
142		prefix.extend_from_slice(key_prefix);
143
144		// Wrap with FlowNodeInternalStateKey and scan for all keys with this prefix
145		let internal_prefix = FlowNodeInternalStateKey::new(self.node, prefix);
146		let prefix_key = internal_prefix.encode();
147		let entries = ctx.state().scan_prefix(&prefix_key)?;
148
149		for (key, _) in entries {
150			ctx.state().remove(&key)?;
151		}
152
153		Ok(())
154	}
155}
156
157#[cfg(test)]
158pub mod tests {
159	use std::collections::HashMap;
160
161	use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
162	use reifydb_core::{
163		encoded::key::EncodedKey,
164		interface::catalog::flow::FlowNodeId,
165		key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
166	};
167	use reifydb_type::value::{Value, row_number::RowNumber};
168
169	use crate::{
170		error::Result,
171		operator::{
172			FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::OperatorColumn,
173			context::OperatorContext,
174		},
175		state::{FFIRawStatefulOperator, row::RowNumberProvider},
176		testing::{harness::TestHarnessBuilder, helpers::encode_key},
177	};
178
179	struct RowNumberTestOperator;
180
181	impl FFIOperatorMetadata for RowNumberTestOperator {
182		const NAME: &'static str = "row_number_test";
183		const API: u32 = 1;
184		const VERSION: &'static str = "1.0.0";
185		const DESCRIPTION: &'static str = "Test operator for row number provider";
186		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
187		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
188		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
189	}
190
191	impl FFIOperator for RowNumberTestOperator {
192		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
193			Ok(Self)
194		}
195
196		fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
197			Ok(())
198		}
199
200		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
201			Ok(())
202		}
203	}
204
205	impl FFIRawStatefulOperator for RowNumberTestOperator {}
206
207	#[test]
208	fn test_first_row_number_starts_at_one() {
209		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
210			.with_node_id(FlowNodeId(1))
211			.build()
212			.expect("Failed to build harness");
213
214		let key = encode_key("test_key");
215		let mut ctx = harness.create_operator_context();
216		let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
217
218		assert_eq!(row_num.0, 1);
219		assert!(is_new);
220	}
221
222	#[test]
223	fn test_duplicate_key_returns_same_row_number() {
224		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
225			.with_node_id(FlowNodeId(1))
226			.build()
227			.expect("Failed to build harness");
228
229		let key = encode_key("test_key");
230
231		let mut ctx = harness.create_operator_context();
232		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
233
234		let mut ctx = harness.create_operator_context();
235		let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
236
237		assert_eq!(row_num1.0, row_num2.0);
238		assert!(is_new1);
239		assert!(!is_new2);
240	}
241
242	#[test]
243	fn test_sequential_numbering() {
244		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
245			.with_node_id(FlowNodeId(1))
246			.build()
247			.expect("Failed to build harness");
248
249		let key1 = encode_key("key1");
250		let key2 = encode_key("key2");
251		let key3 = encode_key("key3");
252
253		let mut ctx = harness.create_operator_context();
254		let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
255
256		let mut ctx = harness.create_operator_context();
257		let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
258
259		let mut ctx = harness.create_operator_context();
260		let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
261
262		assert_eq!(row_num1.0, 1);
263		assert_eq!(row_num2.0, 2);
264		assert_eq!(row_num3.0, 3);
265	}
266
267	#[test]
268	fn test_operator_isolation() {
269		// Two harnesses with different node IDs share state store but have isolated namespaces
270		let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
271			.with_node_id(FlowNodeId(1))
272			.build()
273			.expect("Failed to build harness1");
274
275		let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
276			.with_node_id(FlowNodeId(2))
277			.build()
278			.expect("Failed to build harness2");
279
280		let key = encode_key("same_key");
281
282		let mut ctx1 = harness1.create_operator_context();
283		let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
284
285		let mut ctx2 = harness2.create_operator_context();
286		let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
287
288		// Both should be new because they're from different operators
289		assert!(is_new1);
290		assert!(is_new2);
291		// Both should start at 1
292		assert_eq!(row_num1.0, 1);
293		assert_eq!(row_num2.0, 1);
294	}
295
296	#[test]
297	fn test_persistence_across_calls() {
298		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
299			.with_node_id(FlowNodeId(1))
300			.build()
301			.expect("Failed to build harness");
302
303		// Create first few row numbers
304		let key1 = encode_key("key1");
305		let key2 = encode_key("key2");
306
307		let mut ctx = harness.create_operator_context();
308		ctx.get_or_create_row_number(&key1).unwrap();
309
310		let mut ctx = harness.create_operator_context();
311		ctx.get_or_create_row_number(&key2).unwrap();
312
313		// New key should continue from where we left off
314		let key3 = encode_key("key3");
315		let mut ctx = harness.create_operator_context();
316		let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
317
318		assert!(is_new3);
319		assert_eq!(row_num3.0, 3);
320
321		// Old keys should still return their original row numbers
322		let mut ctx = harness.create_operator_context();
323		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
324		assert!(!is_new1);
325		assert_eq!(row_num1.0, 1);
326	}
327
328	#[test]
329	fn test_large_scale_row_numbers() {
330		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
331			.with_node_id(FlowNodeId(1))
332			.build()
333			.expect("Failed to build harness");
334
335		// Create 1000 unique keys
336		for i in 0..1000 {
337			let key = encode_key(format!("key_{}", i));
338			let mut ctx = harness.create_operator_context();
339			let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
340			assert!(is_new);
341			assert_eq!(row_num.0, i + 1);
342		}
343
344		// Verify a random sample still works correctly
345		let key_500 = encode_key("key_500");
346		let mut ctx = harness.create_operator_context();
347		let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
348		assert!(!is_new);
349		assert_eq!(row_num.0, 501);
350	}
351
352	#[test]
353	fn test_remove_by_prefix() {
354		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
355			.with_node_id(FlowNodeId(1))
356			.build()
357			.expect("Failed to build harness");
358
359		// Create keys with different prefixes
360		let key_a1 = encode_key("prefix_a_1");
361		let key_a2 = encode_key("prefix_a_2");
362		let key_b1 = encode_key("prefix_b_1");
363
364		let mut ctx = harness.create_operator_context();
365		ctx.get_or_create_row_number(&key_a1).unwrap();
366
367		let mut ctx = harness.create_operator_context();
368		ctx.get_or_create_row_number(&key_a2).unwrap();
369
370		let mut ctx = harness.create_operator_context();
371		ctx.get_or_create_row_number(&key_b1).unwrap();
372
373		// Remove all keys with prefix "prefix_a"
374		let provider = RowNumberProvider::new(FlowNodeId(1));
375		let mut ctx = harness.create_operator_context();
376		provider.remove_by_prefix(&mut ctx, b"prefix_a").unwrap();
377
378		// Keys with prefix_a should be new again
379		let mut ctx = harness.create_operator_context();
380		let (_, is_new_a1) = ctx.get_or_create_row_number(&key_a1).unwrap();
381
382		let mut ctx = harness.create_operator_context();
383		let (_, is_new_a2) = ctx.get_or_create_row_number(&key_a2).unwrap();
384
385		// But they'll get new row numbers (continuing from counter)
386		assert!(is_new_a1);
387		assert!(is_new_a2);
388
389		// Key with prefix_b should still be known
390		let mut ctx = harness.create_operator_context();
391		let (_, is_new_b1) = ctx.get_or_create_row_number(&key_b1).unwrap();
392		assert!(!is_new_b1);
393	}
394
395	#[test]
396	fn test_empty_key() {
397		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
398			.with_node_id(FlowNodeId(1))
399			.build()
400			.expect("Failed to build harness");
401
402		let empty_key = encode_key("");
403
404		let mut ctx = harness.create_operator_context();
405		let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
406		assert!(is_new);
407		assert_eq!(row_num.0, 1);
408
409		// Should work for duplicate empty keys too
410		let mut ctx = harness.create_operator_context();
411		let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
412		assert!(!is_new2);
413		assert_eq!(row_num2.0, 1);
414	}
415
416	#[test]
417	fn test_binary_key_data() {
418		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
419			.with_node_id(FlowNodeId(1))
420			.build()
421			.expect("Failed to build harness");
422
423		// Test with binary data including null bytes
424		let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
425
426		let mut ctx = harness.create_operator_context();
427		let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
428		assert!(is_new);
429		assert_eq!(row_num.0, 1);
430
431		let mut ctx = harness.create_operator_context();
432		let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
433		assert!(!is_new2);
434		assert_eq!(row_num2.0, 1);
435	}
436
437	#[test]
438	fn test_interleaved_operations() {
439		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
440			.with_node_id(FlowNodeId(1))
441			.build()
442			.expect("Failed to build harness");
443
444		let key1 = encode_key("key1");
445		let key2 = encode_key("key2");
446
447		// First access key1
448		let mut ctx = harness.create_operator_context();
449		let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
450		assert_eq!(row_num1_first.0, 1);
451
452		// First access key2
453		let mut ctx = harness.create_operator_context();
454		let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
455		assert_eq!(row_num2_first.0, 2);
456
457		// Second access key1 - should return same number
458		let mut ctx = harness.create_operator_context();
459		let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
460		assert!(!is_new1);
461		assert_eq!(row_num1_second.0, 1);
462
463		// Second access key2 - should return same number
464		let mut ctx = harness.create_operator_context();
465		let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
466		assert!(!is_new2);
467		assert_eq!(row_num2_second.0, 2);
468	}
469
470	#[test]
471	fn test_counter_key_uniqueness_per_node() {
472		// Counter keys for different nodes should be different after wrapping with FlowNodeInternalStateKey
473		let provider1 = RowNumberProvider::new(FlowNodeId(1));
474		let provider2 = RowNumberProvider::new(FlowNodeId(2));
475
476		let internal_key1 = provider1.make_counter_key();
477		let internal_key2 = provider2.make_counter_key();
478
479		// Internal keys are the same (node_id is added by wrapper)
480		assert_eq!(internal_key1, internal_key2);
481
482		// But after wrapping with FlowNodeInternalStateKey, they should be different
483		let final_key1 =
484			FlowNodeInternalStateKey::new(provider1.node, internal_key1.as_ref().to_vec()).encode();
485		let final_key2 =
486			FlowNodeInternalStateKey::new(provider2.node, internal_key2.as_ref().to_vec()).encode();
487
488		assert!(!final_key1.is_empty());
489		assert!(!final_key2.is_empty());
490		assert_ne!(final_key1, final_key2);
491	}
492
493	#[test]
494	fn test_map_key_uniqueness() {
495		let provider = RowNumberProvider::new(FlowNodeId(42));
496		let original_key1 = encode_key("test1");
497		let original_key2 = encode_key("test2");
498
499		let map_key1 = provider.make_map_key(&original_key1);
500		let map_key2 = provider.make_map_key(&original_key2);
501
502		// Map keys should be non-empty and different for different inputs
503		assert!(!map_key1.is_empty());
504		assert!(!map_key2.is_empty());
505		assert_ne!(map_key1, map_key2);
506
507		// Same original key should produce same map key
508		let map_key1_again = provider.make_map_key(&original_key1);
509		assert_eq!(map_key1, map_key1_again);
510	}
511
512	#[test]
513	fn test_counter_key_vs_map_key_separation() {
514		// Counter key and map key should never collide
515		let provider = RowNumberProvider::new(FlowNodeId(1));
516
517		let counter_key = provider.make_counter_key();
518		let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
519
520		// Even with an empty original key, they should be different
521		assert_ne!(counter_key, map_key);
522	}
523
524	#[test]
525	fn test_batch_mixed_existing_and_new_keys() {
526		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
527			.with_node_id(FlowNodeId(1))
528			.build()
529			.expect("Failed to build harness");
530
531		let provider = RowNumberProvider::new(FlowNodeId(1));
532
533		// Create 3 initial keys to establish existing row numbers
534		let key1 = encode_key("batch_key_1");
535		let key2 = encode_key("batch_key_2");
536		let key3 = encode_key("batch_key_3");
537
538		let mut ctx = harness.create_operator_context();
539		let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
540		assert_eq!(rn1.0, 1);
541
542		let mut ctx = harness.create_operator_context();
543		let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
544		assert_eq!(rn2.0, 2);
545
546		let mut ctx = harness.create_operator_context();
547		let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
548		assert_eq!(rn3.0, 3);
549
550		// Now test batch with mix of existing and new keys
551		let key4 = encode_key("batch_key_4");
552		let key5 = encode_key("batch_key_5");
553
554		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
555		let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
556
557		let mut ctx = harness.create_operator_context();
558		let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
559
560		// Verify results are in correct order and have correct values
561		assert_eq!(results.len(), 5);
562
563		// key2 (existing) -> row number 2, not new
564		assert_eq!(results[0].0.0, 2);
565		assert!(!results[0].1);
566
567		// key4 (new) -> row number 4, is new
568		assert_eq!(results[1].0.0, 4);
569		assert!(results[1].1);
570
571		// key1 (existing) -> row number 1, not new
572		assert_eq!(results[2].0.0, 1);
573		assert!(!results[2].1);
574
575		// key5 (new) -> row number 5, is new
576		assert_eq!(results[3].0.0, 5);
577		assert!(results[3].1);
578
579		// key3 (existing) -> row number 3, not new
580		assert_eq!(results[4].0.0, 3);
581		assert!(!results[4].1);
582
583		// Verify that counter was only incremented by 2 (for key4 and key5)
584		// by checking that the next new key gets row number 6
585		let key6 = encode_key("batch_key_6");
586		let mut ctx = harness.create_operator_context();
587		let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
588		assert_eq!(rn6.0, 6);
589		assert!(is_new6);
590
591		// Verify all mappings are still correct by retrieving them individually
592		let mut ctx = harness.create_operator_context();
593		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
594		assert_eq!(check_rn4.0, 4);
595		assert!(!is_new4);
596
597		let mut ctx = harness.create_operator_context();
598		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
599		assert_eq!(check_rn5.0, 5);
600		assert!(!is_new5);
601	}
602}