Skip to main content

reifydb_sdk/state/
row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::iter;
5
6use reifydb_core::{
7	encoded::key::EncodedKey, interface::catalog::flow::FlowNodeId,
8	key::flow_node_internal_state::FlowNodeInternalStateKey, util::encoding::keycode::serializer::KeySerializer,
9};
10use reifydb_type::value::row_number::RowNumber;
11
12use crate::{error::Result, operator::context::OperatorContext};
13
14pub struct RowNumberProvider {
15	_node: FlowNodeId,
16}
17
18impl RowNumberProvider {
19	pub fn new(node: FlowNodeId) -> Self {
20		Self {
21			_node: node,
22		}
23	}
24
25	pub fn get_or_create_row_numbers_batch<'a, I>(
26		&self,
27		ctx: &mut OperatorContext,
28		keys: I,
29	) -> Result<Vec<(RowNumber, bool)>>
30	where
31		I: IntoIterator<Item = &'a EncodedKey>,
32	{
33		let mut results = Vec::new();
34		let mut counter = self.load_counter(ctx)?;
35		let initial_counter = counter;
36
37		for key in keys {
38			let map_key = self.make_map_key(key);
39
40			if let Some(row_num) = ctx.internal_state().get::<u64>(&map_key)? {
41				results.push((RowNumber(row_num), false));
42				continue;
43			}
44
45			let new_row_number = RowNumber(counter);
46			ctx.internal_state().set::<u64>(&map_key, &counter)?;
47
48			results.push((new_row_number, true));
49			counter += 1;
50		}
51
52		if counter != initial_counter {
53			self.save_counter(ctx, counter)?;
54		}
55
56		Ok(results)
57	}
58
59	pub fn get_or_create_row_number(
60		&self,
61		ctx: &mut OperatorContext,
62		key: &EncodedKey,
63	) -> Result<(RowNumber, bool)> {
64		Ok(self.get_or_create_row_numbers_batch(ctx, iter::once(key))?.into_iter().next().unwrap())
65	}
66
67	fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
68		Ok(ctx.internal_state().get::<u64>(&self.make_counter_key())?.unwrap_or(1))
69	}
70
71	fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
72		ctx.internal_state().set::<u64>(&self.make_counter_key(), &counter)
73	}
74
75	fn make_counter_key(&self) -> EncodedKey {
76		let mut serializer = KeySerializer::new();
77		serializer.extend_u8(FlowNodeInternalStateKey::ROW_NUMBER_COUNTER_TAG);
78		serializer.finish()
79	}
80
81	fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
82		let mut serializer = KeySerializer::new();
83		serializer.extend_u8(FlowNodeInternalStateKey::ROW_NUMBER_MAPPING_TAG);
84		serializer.extend_bytes(key.as_ref());
85		serializer.finish()
86	}
87}
88
89#[cfg(test)]
90pub mod tests {
91	use std::collections::HashMap;
92
93	use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
94	use reifydb_core::{
95		encoded::key::EncodedKey,
96		interface::catalog::flow::FlowNodeId,
97		key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
98	};
99	use reifydb_type::value::{Value, row_number::RowNumber};
100
101	use crate::{
102		error::Result,
103		operator::{
104			FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::operator::OperatorColumn,
105			context::OperatorContext,
106		},
107		state::{FFIRawStatefulOperator, row::RowNumberProvider},
108		testing::{harness::TestHarnessBuilder, helpers::encode_key},
109	};
110
111	struct RowNumberTestOperator;
112
113	impl FFIOperatorMetadata for RowNumberTestOperator {
114		const NAME: &'static str = "row_number_test";
115		const API: u32 = 1;
116		const VERSION: &'static str = "1.0.0";
117		const DESCRIPTION: &'static str = "Test operator for row number provider";
118		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
119		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
120		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
121	}
122
123	impl FFIOperator for RowNumberTestOperator {
124		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
125			Ok(Self)
126		}
127
128		fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
129			Ok(())
130		}
131
132		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
133			Ok(())
134		}
135	}
136
137	impl FFIRawStatefulOperator for RowNumberTestOperator {}
138
139	#[test]
140	fn test_first_row_number_starts_at_one() {
141		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
142			.with_node_id(FlowNodeId(1))
143			.build()
144			.expect("Failed to build harness");
145
146		let key = encode_key("test_key");
147		let mut ctx = harness.create_operator_context();
148		let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
149
150		assert_eq!(row_num.0, 1);
151		assert!(is_new);
152	}
153
154	#[test]
155	fn test_duplicate_key_returns_same_row_number() {
156		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
157			.with_node_id(FlowNodeId(1))
158			.build()
159			.expect("Failed to build harness");
160
161		let key = encode_key("test_key");
162
163		let mut ctx = harness.create_operator_context();
164		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
165
166		let mut ctx = harness.create_operator_context();
167		let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
168
169		assert_eq!(row_num1.0, row_num2.0);
170		assert!(is_new1);
171		assert!(!is_new2);
172	}
173
174	#[test]
175	fn test_sequential_numbering() {
176		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
177			.with_node_id(FlowNodeId(1))
178			.build()
179			.expect("Failed to build harness");
180
181		let key1 = encode_key("key1");
182		let key2 = encode_key("key2");
183		let key3 = encode_key("key3");
184
185		let mut ctx = harness.create_operator_context();
186		let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
187
188		let mut ctx = harness.create_operator_context();
189		let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
190
191		let mut ctx = harness.create_operator_context();
192		let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
193
194		assert_eq!(row_num1.0, 1);
195		assert_eq!(row_num2.0, 2);
196		assert_eq!(row_num3.0, 3);
197	}
198
199	#[test]
200	fn test_operator_isolation() {
201		// Two harnesses with different node IDs share state store but have isolated namespaces
202		let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
203			.with_node_id(FlowNodeId(1))
204			.build()
205			.expect("Failed to build harness1");
206
207		let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
208			.with_node_id(FlowNodeId(2))
209			.build()
210			.expect("Failed to build harness2");
211
212		let key = encode_key("same_key");
213
214		let mut ctx1 = harness1.create_operator_context();
215		let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
216
217		let mut ctx2 = harness2.create_operator_context();
218		let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
219
220		// Both should be new because they're from different operators
221		assert!(is_new1);
222		assert!(is_new2);
223		// Both should start at 1
224		assert_eq!(row_num1.0, 1);
225		assert_eq!(row_num2.0, 1);
226	}
227
228	#[test]
229	fn test_persistence_across_calls() {
230		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
231			.with_node_id(FlowNodeId(1))
232			.build()
233			.expect("Failed to build harness");
234
235		// Create first few row numbers
236		let key1 = encode_key("key1");
237		let key2 = encode_key("key2");
238
239		let mut ctx = harness.create_operator_context();
240		ctx.get_or_create_row_number(&key1).unwrap();
241
242		let mut ctx = harness.create_operator_context();
243		ctx.get_or_create_row_number(&key2).unwrap();
244
245		// New key should continue from where we left off
246		let key3 = encode_key("key3");
247		let mut ctx = harness.create_operator_context();
248		let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
249
250		assert!(is_new3);
251		assert_eq!(row_num3.0, 3);
252
253		// Old keys should still return their original row numbers
254		let mut ctx = harness.create_operator_context();
255		let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
256		assert!(!is_new1);
257		assert_eq!(row_num1.0, 1);
258	}
259
260	#[test]
261	fn test_large_scale_row_numbers() {
262		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
263			.with_node_id(FlowNodeId(1))
264			.build()
265			.expect("Failed to build harness");
266
267		// Create 1000 unique keys
268		for i in 0..1000 {
269			let key = encode_key(format!("key_{}", i));
270			let mut ctx = harness.create_operator_context();
271			let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
272			assert!(is_new);
273			assert_eq!(row_num.0, i + 1);
274		}
275
276		// Verify a random sample still works correctly
277		let key_500 = encode_key("key_500");
278		let mut ctx = harness.create_operator_context();
279		let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
280		assert!(!is_new);
281		assert_eq!(row_num.0, 501);
282	}
283
284	#[test]
285	fn test_empty_key() {
286		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
287			.with_node_id(FlowNodeId(1))
288			.build()
289			.expect("Failed to build harness");
290
291		let empty_key = encode_key("");
292
293		let mut ctx = harness.create_operator_context();
294		let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
295		assert!(is_new);
296		assert_eq!(row_num.0, 1);
297
298		// Should work for duplicate empty keys too
299		let mut ctx = harness.create_operator_context();
300		let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
301		assert!(!is_new2);
302		assert_eq!(row_num2.0, 1);
303	}
304
305	#[test]
306	fn test_binary_key_data() {
307		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
308			.with_node_id(FlowNodeId(1))
309			.build()
310			.expect("Failed to build harness");
311
312		// Test with binary data including null bytes
313		let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
314
315		let mut ctx = harness.create_operator_context();
316		let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
317		assert!(is_new);
318		assert_eq!(row_num.0, 1);
319
320		let mut ctx = harness.create_operator_context();
321		let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
322		assert!(!is_new2);
323		assert_eq!(row_num2.0, 1);
324	}
325
326	#[test]
327	fn test_interleaved_operations() {
328		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
329			.with_node_id(FlowNodeId(1))
330			.build()
331			.expect("Failed to build harness");
332
333		let key1 = encode_key("key1");
334		let key2 = encode_key("key2");
335
336		// First access key1
337		let mut ctx = harness.create_operator_context();
338		let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
339		assert_eq!(row_num1_first.0, 1);
340
341		// First access key2
342		let mut ctx = harness.create_operator_context();
343		let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
344		assert_eq!(row_num2_first.0, 2);
345
346		// Second access key1 - should return same number
347		let mut ctx = harness.create_operator_context();
348		let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
349		assert!(!is_new1);
350		assert_eq!(row_num1_second.0, 1);
351
352		// Second access key2 - should return same number
353		let mut ctx = harness.create_operator_context();
354		let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
355		assert!(!is_new2);
356		assert_eq!(row_num2_second.0, 2);
357	}
358
359	#[test]
360	fn test_counter_key_uniqueness_per_node() {
361		// Counter inner-keys are identical across providers (just the tag byte) -
362		// the host's FlowNodeInternalStateKey wrapper adds the FlowNodeId at storage
363		// time, keeping different operators' counters in disjoint storage ranges.
364		let provider1 = RowNumberProvider::new(FlowNodeId(1));
365		let provider2 = RowNumberProvider::new(FlowNodeId(2));
366
367		let internal_key1 = provider1.make_counter_key();
368		let internal_key2 = provider2.make_counter_key();
369
370		assert_eq!(internal_key1, internal_key2);
371
372		// And after wrapping with FlowNodeInternalStateKey, they differ:
373		let final_key1 = FlowNodeInternalStateKey::new(FlowNodeId(1), internal_key1.as_ref().to_vec()).encode();
374		let final_key2 = FlowNodeInternalStateKey::new(FlowNodeId(2), internal_key2.as_ref().to_vec()).encode();
375
376		assert!(!final_key1.is_empty());
377		assert!(!final_key2.is_empty());
378		assert_ne!(final_key1, final_key2);
379	}
380
381	#[test]
382	fn test_map_key_uniqueness() {
383		let provider = RowNumberProvider::new(FlowNodeId(42));
384		let original_key1 = encode_key("test1");
385		let original_key2 = encode_key("test2");
386
387		let map_key1 = provider.make_map_key(&original_key1);
388		let map_key2 = provider.make_map_key(&original_key2);
389
390		// Map keys should be non-empty and different for different inputs
391		assert!(!map_key1.is_empty());
392		assert!(!map_key2.is_empty());
393		assert_ne!(map_key1, map_key2);
394
395		// Same original key should produce same map key
396		let map_key1_again = provider.make_map_key(&original_key1);
397		assert_eq!(map_key1, map_key1_again);
398	}
399
400	#[test]
401	fn test_counter_key_vs_map_key_separation() {
402		// Counter key and map key should never collide
403		let provider = RowNumberProvider::new(FlowNodeId(1));
404
405		let counter_key = provider.make_counter_key();
406		let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
407
408		// Even with an empty original key, they should be different
409		assert_ne!(counter_key, map_key);
410	}
411
412	#[test]
413	fn test_batch_mixed_existing_and_new_keys() {
414		let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
415			.with_node_id(FlowNodeId(1))
416			.build()
417			.expect("Failed to build harness");
418
419		let provider = RowNumberProvider::new(FlowNodeId(1));
420
421		// Create 3 initial keys to establish existing row numbers
422		let key1 = encode_key("batch_key_1");
423		let key2 = encode_key("batch_key_2");
424		let key3 = encode_key("batch_key_3");
425
426		let mut ctx = harness.create_operator_context();
427		let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
428		assert_eq!(rn1.0, 1);
429
430		let mut ctx = harness.create_operator_context();
431		let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
432		assert_eq!(rn2.0, 2);
433
434		let mut ctx = harness.create_operator_context();
435		let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
436		assert_eq!(rn3.0, 3);
437
438		// Now test batch with mix of existing and new keys
439		let key4 = encode_key("batch_key_4");
440		let key5 = encode_key("batch_key_5");
441
442		// Batch: [existing key2, new key4, existing key1, new key5, existing key3]
443		let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
444
445		let mut ctx = harness.create_operator_context();
446		let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
447
448		// Verify results are in correct order and have correct values
449		assert_eq!(results.len(), 5);
450
451		// key2 (existing) -> row number 2, not new
452		assert_eq!(results[0].0.0, 2);
453		assert!(!results[0].1);
454
455		// key4 (new) -> row number 4, is new
456		assert_eq!(results[1].0.0, 4);
457		assert!(results[1].1);
458
459		// key1 (existing) -> row number 1, not new
460		assert_eq!(results[2].0.0, 1);
461		assert!(!results[2].1);
462
463		// key5 (new) -> row number 5, is new
464		assert_eq!(results[3].0.0, 5);
465		assert!(results[3].1);
466
467		// key3 (existing) -> row number 3, not new
468		assert_eq!(results[4].0.0, 3);
469		assert!(!results[4].1);
470
471		// Verify that counter was only incremented by 2 (for key4 and key5)
472		// by checking that the next new key gets row number 6
473		let key6 = encode_key("batch_key_6");
474		let mut ctx = harness.create_operator_context();
475		let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
476		assert_eq!(rn6.0, 6);
477		assert!(is_new6);
478
479		// Verify all mappings are still correct by retrieving them individually
480		let mut ctx = harness.create_operator_context();
481		let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
482		assert_eq!(check_rn4.0, 4);
483		assert!(!is_new4);
484
485		let mut ctx = harness.create_operator_context();
486		let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
487		assert_eq!(check_rn5.0, 5);
488		assert!(!is_new5);
489	}
490}