reifydb_flow_operator_sdk/stateful/
row.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Row number provider for stable row numbering in stateful operators
5
6use reifydb_core::{
7	EncodedKey,
8	interface::FlowNodeId,
9	key::{EncodableKey, FlowNodeInternalStateKey},
10	util::{CowVec, encoding::keycode::KeySerializer},
11	value::encoded::EncodedValues,
12};
13use reifydb_type::RowNumber;
14
15use crate::{context::OperatorContext, error::Result};
16
17/// Provides stable row numbers for keys with automatic Insert/Update detection
18///
19/// This component maintains:
20/// - A sequential counter for generating new row numbers
21/// - A mapping from keys to their assigned row numbers
22///
23/// When a key is seen for the first time, it gets a new row number and returns
24/// true. When a key is seen again, it returns the existing row number and false.
25pub struct RowNumberProvider {
26	node: FlowNodeId,
27}
28
29impl RowNumberProvider {
30	/// Create a new RowNumberProvider for the given operator
31	pub fn new(node: FlowNodeId) -> Self {
32		Self {
33			node,
34		}
35	}
36
37	/// Get or create RowNumbers for a batch of keys
38	/// Returns Vec<(RowNumber, is_new)> in the same order as input keys
39	/// where is_new indicates if the row number was newly created
40	pub fn get_or_create_row_numbers_batch<'a, I>(
41		&self,
42		ctx: &mut OperatorContext,
43		keys: I,
44	) -> Result<Vec<(RowNumber, bool)>>
45	where
46		I: IntoIterator<Item = &'a EncodedKey>,
47	{
48		let mut results = Vec::new();
49		let mut counter = self.load_counter(ctx)?;
50		let initial_counter = counter;
51
52		for key in keys {
53			// Check if we already have a row number for this key
54			let map_key = self.make_map_key(key);
55			let internal_key = FlowNodeInternalStateKey::new(self.node, map_key.as_ref().to_vec());
56
57			if let Some(existing_row) = ctx.state().get(&internal_key.encode())? {
58				// Key exists, return existing row number
59				let bytes = existing_row.as_ref();
60				if bytes.len() >= 8 {
61					let row_num = u64::from_be_bytes([
62						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
63						bytes[7],
64					]);
65					results.push((RowNumber(row_num), false));
66					continue;
67				}
68			}
69
70			// Key doesn't exist, allocate a new row number
71			let new_row_number = RowNumber(counter);
72
73			// Save the mapping from key to row number
74			let row_num_bytes = counter.to_be_bytes().to_vec();
75			ctx.state().set(&internal_key.encode(), &EncodedValues(CowVec::new(row_num_bytes)))?;
76
77			results.push((new_row_number, true));
78			counter += 1;
79		}
80
81		// Save the updated counter if we allocated any new row numbers
82		if counter != initial_counter {
83			self.save_counter(ctx, counter)?;
84		}
85
86		Ok(results)
87	}
88
89	/// Get or create a RowNumber for a given key
90	/// Returns (RowNumber, is_new) where is_new indicates if it was newly created
91	pub fn get_or_create_row_number(
92		&self,
93		ctx: &mut OperatorContext,
94		key: &EncodedKey,
95	) -> Result<(RowNumber, bool)> {
96		Ok(self.get_or_create_row_numbers_batch(ctx, std::iter::once(key))?.into_iter().next().unwrap())
97	}
98
99	/// Load the current counter value
100	fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
101		let key = self.make_counter_key();
102		let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
103		match ctx.state().get(&internal_key.encode())? {
104			None => Ok(1), // First time, start at 1
105			Some(state_row) => {
106				// Parse the stored counter
107				let bytes = state_row.as_ref();
108				if bytes.len() >= 8 {
109					Ok(u64::from_be_bytes([
110						bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
111						bytes[7],
112					]))
113				} else {
114					Ok(1)
115				}
116			}
117		}
118	}
119
120	/// Save the counter value
121	fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
122		let key = self.make_counter_key();
123		let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
124		let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
125		ctx.state().set(&internal_key.encode(), &value)?;
126		Ok(())
127	}
128
129	/// Create a key for the counter (node_id added by FlowNodeInternalStateKey wrapper)
130	fn make_counter_key(&self) -> EncodedKey {
131		let mut serializer = KeySerializer::new();
132		serializer.extend_u8(b'C'); // 'C' for counter
133		EncodedKey::new(serializer.finish())
134	}
135
136	/// Create a mapping key for a given encoded key (node_id added by FlowNodeInternalStateKey wrapper)
137	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
138		let mut serializer = KeySerializer::new();
139		serializer.extend_u8(b'M'); // 'M' for mapping
140		serializer.extend_bytes(key.as_ref());
141		EncodedKey::new(serializer.finish())
142	}
143
144	/// Remove all row number mappings with the given prefix
145	/// This is useful for cleaning up all join results from a specific left row
146	pub fn remove_by_prefix(&self, ctx: &mut OperatorContext, key_prefix: &[u8]) -> Result<()> {
147		// Create the prefix for scanning (node_id added by FlowNodeInternalStateKey wrapper)
148		let mut prefix = Vec::new();
149		let mut serializer = KeySerializer::new();
150		serializer.extend_u8(b'M'); // 'M' for mapping
151		prefix.extend_from_slice(&serializer.finish());
152		prefix.extend_from_slice(key_prefix);
153
154		// Wrap with FlowNodeInternalStateKey and scan for all keys with this prefix
155		let internal_prefix = FlowNodeInternalStateKey::new(self.node, prefix);
156		let prefix_key = internal_prefix.encode();
157		let entries = ctx.state().scan_prefix(&prefix_key)?;
158
159		for (key, _) in entries {
160			ctx.state().remove(&key)?;
161		}
162
163		Ok(())
164	}
165}
166
167#[cfg(test)]
168mod tests {
169	use std::collections::HashMap;
170
171	use reifydb_core::{
172		EncodedKey, Row,
173		interface::FlowNodeId,
174		key::{EncodableKey, FlowNodeInternalStateKey},
175	};
176	use reifydb_type::{RowNumber, Value};
177
178	use crate::{
179		FFIOperator, FFIOperatorMetadata, FlowChange,
180		context::OperatorContext,
181		error::Result,
182		stateful::{FFIRawStatefulOperator, RowNumberProvider},
183		testing::{TestHarnessBuilder, helpers::encode_key},
184	};
185
186	/// Test operator for RowNumberProvider tests
187	struct RowNumberTestOperator;
188
189	impl FFIOperatorMetadata for RowNumberTestOperator {
190		const NAME: &'static str = "row_number_test";
191		const API: u32 = 1;
192		const VERSION: &'static str = "1.0.0";
193		const DESCRIPTION: &'static str = "Test operator for row number provider";
194		const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
195		const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
196		const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
197	}
198
199	impl FFIOperator for RowNumberTestOperator {
200		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
201			Ok(Self)
202		}
203
204		fn apply(&mut self, _ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
205			Ok(input)
206		}
207
208		fn get_rows(
209			&mut self,
210			_ctx: &mut OperatorContext,
211			_row_numbers: &[RowNumber],
212		) -> Result<Vec<Option<Row>>> {
213			Ok(vec![])
214		}
215	}
216
217	impl FFIRawStatefulOperator for RowNumberTestOperator {}
218
219	#[test]
220	fn test_first_row_number_starts_at_one() {
221		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
222			.with_node_id(FlowNodeId(1))
223			.build()
224			.expect("Failed to build harness");
225
226		let key = encode_key("test_key");
227		let mut ctx = harness.create_operator_context();
228		let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
229
230		assert_eq!(row_num.0, 1);
231		assert!(is_new);
232	}
233
234	#[test]
235	fn test_duplicate_key_returns_same_row_number() {
236		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
237			.with_node_id(FlowNodeId(1))
238			.build()
239			.expect("Failed to build harness");
240
241		let key = encode_key("test_key");
242
243		let mut ctx = harness.create_operator_context();
244		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
245
246		let mut ctx = harness.create_operator_context();
247		let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
248
249		assert_eq!(row_num1.0, row_num2.0);
250		assert!(is_new1);
251		assert!(!is_new2);
252	}
253
254	#[test]
255	fn test_sequential_numbering() {
256		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
257			.with_node_id(FlowNodeId(1))
258			.build()
259			.expect("Failed to build harness");
260
261		let key1 = encode_key("key1");
262		let key2 = encode_key("key2");
263		let key3 = encode_key("key3");
264
265		let mut ctx = harness.create_operator_context();
266		let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
267
268		let mut ctx = harness.create_operator_context();
269		let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
270
271		let mut ctx = harness.create_operator_context();
272		let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
273
274		assert_eq!(row_num1.0, 1);
275		assert_eq!(row_num2.0, 2);
276		assert_eq!(row_num3.0, 3);
277	}
278
279	#[test]
280	fn test_operator_isolation() {
281		// Two harnesses with different node IDs share state store but have isolated namespaces
282		let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
283			.with_node_id(FlowNodeId(1))
284			.build()
285			.expect("Failed to build harness1");
286
287		let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
288			.with_node_id(FlowNodeId(2))
289			.build()
290			.expect("Failed to build harness2");
291
292		let key = encode_key("same_key");
293
294		let mut ctx1 = harness1.create_operator_context();
295		let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
296
297		let mut ctx2 = harness2.create_operator_context();
298		let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
299
300		// Both should be new because they're from different operators
301		assert!(is_new1);
302		assert!(is_new2);
303		// Both should start at 1
304		assert_eq!(row_num1.0, 1);
305		assert_eq!(row_num2.0, 1);
306	}
307
308	#[test]
309	fn test_persistence_across_calls() {
310		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
311			.with_node_id(FlowNodeId(1))
312			.build()
313			.expect("Failed to build harness");
314
315		// Create first few row numbers
316		let key1 = encode_key("key1");
317		let key2 = encode_key("key2");
318
319		let mut ctx = harness.create_operator_context();
320		ctx.get_or_create_row_number(&key1).unwrap();
321
322		let mut ctx = harness.create_operator_context();
323		ctx.get_or_create_row_number(&key2).unwrap();
324
325		// New key should continue from where we left off
326		let key3 = encode_key("key3");
327		let mut ctx = harness.create_operator_context();
328		let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
329
330		assert!(is_new3);
331		assert_eq!(row_num3.0, 3);
332
333		// Old keys should still return their original row numbers
334		let mut ctx = harness.create_operator_context();
335		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
336		assert!(!is_new1);
337		assert_eq!(row_num1.0, 1);
338	}
339
340	#[test]
341	fn test_large_scale_row_numbers() {
342		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
343			.with_node_id(FlowNodeId(1))
344			.build()
345			.expect("Failed to build harness");
346
347		// Create 1000 unique keys
348		for i in 0..1000 {
349			let key = encode_key(format!("key_{}", i));
350			let mut ctx = harness.create_operator_context();
351			let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
352			assert!(is_new);
353			assert_eq!(row_num.0, i + 1);
354		}
355
356		// Verify a random sample still works correctly
357		let key_500 = encode_key("key_500");
358		let mut ctx = harness.create_operator_context();
359		let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
360		assert!(!is_new);
361		assert_eq!(row_num.0, 501);
362	}
363
364	#[test]
365	fn test_remove_by_prefix() {
366		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
367			.with_node_id(FlowNodeId(1))
368			.build()
369			.expect("Failed to build harness");
370
371		// Create keys with different prefixes
372		let key_a1 = encode_key("prefix_a_1");
373		let key_a2 = encode_key("prefix_a_2");
374		let key_b1 = encode_key("prefix_b_1");
375
376		let mut ctx = harness.create_operator_context();
377		ctx.get_or_create_row_number(&key_a1).unwrap();
378
379		let mut ctx = harness.create_operator_context();
380		ctx.get_or_create_row_number(&key_a2).unwrap();
381
382		let mut ctx = harness.create_operator_context();
383		ctx.get_or_create_row_number(&key_b1).unwrap();
384
385		// Remove all keys with prefix "prefix_a"
386		let provider = RowNumberProvider::new(FlowNodeId(1));
387		let mut ctx = harness.create_operator_context();
388		provider.remove_by_prefix(&mut ctx, b"prefix_a").unwrap();
389
390		// Keys with prefix_a should be new again
391		let mut ctx = harness.create_operator_context();
392		let (_, is_new_a1) = ctx.get_or_create_row_number(&key_a1).unwrap();
393
394		let mut ctx = harness.create_operator_context();
395		let (_, is_new_a2) = ctx.get_or_create_row_number(&key_a2).unwrap();
396
397		// But they'll get new row numbers (continuing from counter)
398		assert!(is_new_a1);
399		assert!(is_new_a2);
400
401		// Key with prefix_b should still be known
402		let mut ctx = harness.create_operator_context();
403		let (_, is_new_b1) = ctx.get_or_create_row_number(&key_b1).unwrap();
404		assert!(!is_new_b1);
405	}
406
407	#[test]
408	fn test_empty_key() {
409		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
410			.with_node_id(FlowNodeId(1))
411			.build()
412			.expect("Failed to build harness");
413
414		let empty_key = encode_key("");
415
416		let mut ctx = harness.create_operator_context();
417		let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
418		assert!(is_new);
419		assert_eq!(row_num.0, 1);
420
421		// Should work for duplicate empty keys too
422		let mut ctx = harness.create_operator_context();
423		let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
424		assert!(!is_new2);
425		assert_eq!(row_num2.0, 1);
426	}
427
428	#[test]
429	fn test_binary_key_data() {
430		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
431			.with_node_id(FlowNodeId(1))
432			.build()
433			.expect("Failed to build harness");
434
435		// Test with binary data including null bytes
436		let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
437
438		let mut ctx = harness.create_operator_context();
439		let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
440		assert!(is_new);
441		assert_eq!(row_num.0, 1);
442
443		let mut ctx = harness.create_operator_context();
444		let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
445		assert!(!is_new2);
446		assert_eq!(row_num2.0, 1);
447	}
448
449	#[test]
450	fn test_interleaved_operations() {
451		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
452			.with_node_id(FlowNodeId(1))
453			.build()
454			.expect("Failed to build harness");
455
456		let key1 = encode_key("key1");
457		let key2 = encode_key("key2");
458
459		// First access key1
460		let mut ctx = harness.create_operator_context();
461		let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
462		assert_eq!(row_num1_first.0, 1);
463
464		// First access key2
465		let mut ctx = harness.create_operator_context();
466		let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
467		assert_eq!(row_num2_first.0, 2);
468
469		// Second access key1 - should return same number
470		let mut ctx = harness.create_operator_context();
471		let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
472		assert!(!is_new1);
473		assert_eq!(row_num1_second.0, 1);
474
475		// Second access key2 - should return same number
476		let mut ctx = harness.create_operator_context();
477		let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
478		assert!(!is_new2);
479		assert_eq!(row_num2_second.0, 2);
480	}
481
482	#[test]
483	fn test_counter_key_uniqueness_per_node() {
484		// Counter keys for different nodes should be different after wrapping with FlowNodeInternalStateKey
485		let provider1 = RowNumberProvider::new(FlowNodeId(1));
486		let provider2 = RowNumberProvider::new(FlowNodeId(2));
487
488		let internal_key1 = provider1.make_counter_key();
489		let internal_key2 = provider2.make_counter_key();
490
491		// Internal keys are the same (node_id is added by wrapper)
492		assert_eq!(internal_key1, internal_key2);
493
494		// But after wrapping with FlowNodeInternalStateKey, they should be different
495		let final_key1 =
496			FlowNodeInternalStateKey::new(provider1.node, internal_key1.as_ref().to_vec()).encode();
497		let final_key2 =
498			FlowNodeInternalStateKey::new(provider2.node, internal_key2.as_ref().to_vec()).encode();
499
500		assert!(!final_key1.is_empty());
501		assert!(!final_key2.is_empty());
502		assert_ne!(final_key1, final_key2);
503	}
504
505	#[test]
506	fn test_map_key_uniqueness() {
507		let provider = RowNumberProvider::new(FlowNodeId(42));
508		let original_key1 = encode_key("test1");
509		let original_key2 = encode_key("test2");
510
511		let map_key1 = provider.make_map_key(&original_key1);
512		let map_key2 = provider.make_map_key(&original_key2);
513
514		// Map keys should be non-empty and different for different inputs
515		assert!(!map_key1.is_empty());
516		assert!(!map_key2.is_empty());
517		assert_ne!(map_key1, map_key2);
518
519		// Same original key should produce same map key
520		let map_key1_again = provider.make_map_key(&original_key1);
521		assert_eq!(map_key1, map_key1_again);
522	}
523
524	#[test]
525	fn test_counter_key_vs_map_key_separation() {
526		// Counter key and map key should never collide
527		let provider = RowNumberProvider::new(FlowNodeId(1));
528
529		let counter_key = provider.make_counter_key();
530		let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
531
532		// Even with an empty original key, they should be different
533		assert_ne!(counter_key, map_key);
534	}
535
536	#[test]
537	fn test_batch_mixed_existing_and_new_keys() {
538		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
539			.with_node_id(FlowNodeId(1))
540			.build()
541			.expect("Failed to build harness");
542
543		let provider = RowNumberProvider::new(FlowNodeId(1));
544
545		// Create 3 initial keys to establish existing row numbers
546		let key1 = encode_key("batch_key_1");
547		let key2 = encode_key("batch_key_2");
548		let key3 = encode_key("batch_key_3");
549
550		let mut ctx = harness.create_operator_context();
551		let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
552		assert_eq!(rn1.0, 1);
553
554		let mut ctx = harness.create_operator_context();
555		let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
556		assert_eq!(rn2.0, 2);
557
558		let mut ctx = harness.create_operator_context();
559		let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
560		assert_eq!(rn3.0, 3);
561
562		// Now test batch with mix of existing and new keys
563		let key4 = encode_key("batch_key_4");
564		let key5 = encode_key("batch_key_5");
565
566		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
567		let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
568
569		let mut ctx = harness.create_operator_context();
570		let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
571
572		// Verify results are in correct order and have correct values
573		assert_eq!(results.len(), 5);
574
575		// key2 (existing) -> row number 2, not new
576		assert_eq!(results[0].0.0, 2);
577		assert!(!results[0].1);
578
579		// key4 (new) -> row number 4, is new
580		assert_eq!(results[1].0.0, 4);
581		assert!(results[1].1);
582
583		// key1 (existing) -> row number 1, not new
584		assert_eq!(results[2].0.0, 1);
585		assert!(!results[2].1);
586
587		// key5 (new) -> row number 5, is new
588		assert_eq!(results[3].0.0, 5);
589		assert!(results[3].1);
590
591		// key3 (existing) -> row number 3, not new
592		assert_eq!(results[4].0.0, 3);
593		assert!(!results[4].1);
594
595		// Verify that counter was only incremented by 2 (for key4 and key5)
596		// by checking that the next new key gets row number 6
597		let key6 = encode_key("batch_key_6");
598		let mut ctx = harness.create_operator_context();
599		let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
600		assert_eq!(rn6.0, 6);
601		assert!(is_new6);
602
603		// Verify all mappings are still correct by retrieving them individually
604		let mut ctx = harness.create_operator_context();
605		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
606		assert_eq!(check_rn4.0, 4);
607		assert!(!is_new4);
608
609		let mut ctx = harness.create_operator_context();
610		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
611		assert_eq!(check_rn5.0, 5);
612		assert!(!is_new5);
613	}
614}