1use std::iter;
5
6use reifydb_core::{
7 encoded::key::EncodedKey, interface::catalog::flow::FlowNodeId,
8 key::flow_node_internal_state::FlowNodeInternalStateKey, util::encoding::keycode::serializer::KeySerializer,
9};
10use reifydb_type::value::row_number::RowNumber;
11
12use crate::{error::Result, operator::context::OperatorContext};
13
14pub struct RowNumberProvider {
15 _node: FlowNodeId,
16}
17
18impl RowNumberProvider {
19 pub fn new(node: FlowNodeId) -> Self {
20 Self {
21 _node: node,
22 }
23 }
24
25 pub fn get_or_create_row_numbers_batch<'a, I>(
26 &self,
27 ctx: &mut OperatorContext,
28 keys: I,
29 ) -> Result<Vec<(RowNumber, bool)>>
30 where
31 I: IntoIterator<Item = &'a EncodedKey>,
32 {
33 let mut results = Vec::new();
34 let mut counter = self.load_counter(ctx)?;
35 let initial_counter = counter;
36
37 for key in keys {
38 let map_key = self.make_map_key(key);
39
40 if let Some(row_num) = ctx.internal_state().get::<u64>(&map_key)? {
41 results.push((RowNumber(row_num), false));
42 continue;
43 }
44
45 let new_row_number = RowNumber(counter);
46 ctx.internal_state().set::<u64>(&map_key, &counter)?;
47
48 results.push((new_row_number, true));
49 counter += 1;
50 }
51
52 if counter != initial_counter {
53 self.save_counter(ctx, counter)?;
54 }
55
56 Ok(results)
57 }
58
59 pub fn get_or_create_row_number(
60 &self,
61 ctx: &mut OperatorContext,
62 key: &EncodedKey,
63 ) -> Result<(RowNumber, bool)> {
64 Ok(self.get_or_create_row_numbers_batch(ctx, iter::once(key))?.into_iter().next().unwrap())
65 }
66
67 fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
68 Ok(ctx.internal_state().get::<u64>(&self.make_counter_key())?.unwrap_or(1))
69 }
70
71 fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
72 ctx.internal_state().set::<u64>(&self.make_counter_key(), &counter)
73 }
74
75 fn make_counter_key(&self) -> EncodedKey {
76 let mut serializer = KeySerializer::new();
77 serializer.extend_u8(FlowNodeInternalStateKey::ROW_NUMBER_COUNTER_TAG);
78 serializer.finish()
79 }
80
81 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
82 let mut serializer = KeySerializer::new();
83 serializer.extend_u8(FlowNodeInternalStateKey::ROW_NUMBER_MAPPING_TAG);
84 serializer.extend_bytes(key.as_ref());
85 serializer.finish()
86 }
87}
88
89#[cfg(test)]
90pub mod tests {
91 use std::collections::HashMap;
92
93 use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
94 use reifydb_core::{
95 encoded::key::EncodedKey,
96 interface::catalog::flow::FlowNodeId,
97 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
98 };
99 use reifydb_type::value::{Value, row_number::RowNumber};
100
101 use crate::{
102 error::Result,
103 operator::{
104 FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::operator::OperatorColumn,
105 context::OperatorContext,
106 },
107 state::{FFIRawStatefulOperator, row::RowNumberProvider},
108 testing::{harness::TestHarnessBuilder, helpers::encode_key},
109 };
110
111 struct RowNumberTestOperator;
112
113 impl FFIOperatorMetadata for RowNumberTestOperator {
114 const NAME: &'static str = "row_number_test";
115 const API: u32 = 1;
116 const VERSION: &'static str = "1.0.0";
117 const DESCRIPTION: &'static str = "Test operator for row number provider";
118 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
119 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
120 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
121 }
122
123 impl FFIOperator for RowNumberTestOperator {
124 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
125 Ok(Self)
126 }
127
128 fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
129 Ok(())
130 }
131
132 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
133 Ok(())
134 }
135 }
136
137 impl FFIRawStatefulOperator for RowNumberTestOperator {}
138
139 #[test]
140 fn test_first_row_number_starts_at_one() {
141 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
142 .with_node_id(FlowNodeId(1))
143 .build()
144 .expect("Failed to build harness");
145
146 let key = encode_key("test_key");
147 let mut ctx = harness.create_operator_context();
148 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
149
150 assert_eq!(row_num.0, 1);
151 assert!(is_new);
152 }
153
154 #[test]
155 fn test_duplicate_key_returns_same_row_number() {
156 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
157 .with_node_id(FlowNodeId(1))
158 .build()
159 .expect("Failed to build harness");
160
161 let key = encode_key("test_key");
162
163 let mut ctx = harness.create_operator_context();
164 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
165
166 let mut ctx = harness.create_operator_context();
167 let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
168
169 assert_eq!(row_num1.0, row_num2.0);
170 assert!(is_new1);
171 assert!(!is_new2);
172 }
173
174 #[test]
175 fn test_sequential_numbering() {
176 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
177 .with_node_id(FlowNodeId(1))
178 .build()
179 .expect("Failed to build harness");
180
181 let key1 = encode_key("key1");
182 let key2 = encode_key("key2");
183 let key3 = encode_key("key3");
184
185 let mut ctx = harness.create_operator_context();
186 let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
187
188 let mut ctx = harness.create_operator_context();
189 let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
190
191 let mut ctx = harness.create_operator_context();
192 let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
193
194 assert_eq!(row_num1.0, 1);
195 assert_eq!(row_num2.0, 2);
196 assert_eq!(row_num3.0, 3);
197 }
198
199 #[test]
200 fn test_operator_isolation() {
201 let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
203 .with_node_id(FlowNodeId(1))
204 .build()
205 .expect("Failed to build harness1");
206
207 let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
208 .with_node_id(FlowNodeId(2))
209 .build()
210 .expect("Failed to build harness2");
211
212 let key = encode_key("same_key");
213
214 let mut ctx1 = harness1.create_operator_context();
215 let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
216
217 let mut ctx2 = harness2.create_operator_context();
218 let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
219
220 assert!(is_new1);
222 assert!(is_new2);
223 assert_eq!(row_num1.0, 1);
225 assert_eq!(row_num2.0, 1);
226 }
227
228 #[test]
229 fn test_persistence_across_calls() {
230 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
231 .with_node_id(FlowNodeId(1))
232 .build()
233 .expect("Failed to build harness");
234
235 let key1 = encode_key("key1");
237 let key2 = encode_key("key2");
238
239 let mut ctx = harness.create_operator_context();
240 ctx.get_or_create_row_number(&key1).unwrap();
241
242 let mut ctx = harness.create_operator_context();
243 ctx.get_or_create_row_number(&key2).unwrap();
244
245 let key3 = encode_key("key3");
247 let mut ctx = harness.create_operator_context();
248 let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
249
250 assert!(is_new3);
251 assert_eq!(row_num3.0, 3);
252
253 let mut ctx = harness.create_operator_context();
255 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
256 assert!(!is_new1);
257 assert_eq!(row_num1.0, 1);
258 }
259
260 #[test]
261 fn test_large_scale_row_numbers() {
262 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
263 .with_node_id(FlowNodeId(1))
264 .build()
265 .expect("Failed to build harness");
266
267 for i in 0..1000 {
269 let key = encode_key(format!("key_{}", i));
270 let mut ctx = harness.create_operator_context();
271 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
272 assert!(is_new);
273 assert_eq!(row_num.0, i + 1);
274 }
275
276 let key_500 = encode_key("key_500");
278 let mut ctx = harness.create_operator_context();
279 let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
280 assert!(!is_new);
281 assert_eq!(row_num.0, 501);
282 }
283
284 #[test]
285 fn test_empty_key() {
286 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
287 .with_node_id(FlowNodeId(1))
288 .build()
289 .expect("Failed to build harness");
290
291 let empty_key = encode_key("");
292
293 let mut ctx = harness.create_operator_context();
294 let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
295 assert!(is_new);
296 assert_eq!(row_num.0, 1);
297
298 let mut ctx = harness.create_operator_context();
300 let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
301 assert!(!is_new2);
302 assert_eq!(row_num2.0, 1);
303 }
304
305 #[test]
306 fn test_binary_key_data() {
307 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
308 .with_node_id(FlowNodeId(1))
309 .build()
310 .expect("Failed to build harness");
311
312 let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
314
315 let mut ctx = harness.create_operator_context();
316 let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
317 assert!(is_new);
318 assert_eq!(row_num.0, 1);
319
320 let mut ctx = harness.create_operator_context();
321 let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
322 assert!(!is_new2);
323 assert_eq!(row_num2.0, 1);
324 }
325
326 #[test]
327 fn test_interleaved_operations() {
328 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
329 .with_node_id(FlowNodeId(1))
330 .build()
331 .expect("Failed to build harness");
332
333 let key1 = encode_key("key1");
334 let key2 = encode_key("key2");
335
336 let mut ctx = harness.create_operator_context();
338 let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
339 assert_eq!(row_num1_first.0, 1);
340
341 let mut ctx = harness.create_operator_context();
343 let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
344 assert_eq!(row_num2_first.0, 2);
345
346 let mut ctx = harness.create_operator_context();
348 let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
349 assert!(!is_new1);
350 assert_eq!(row_num1_second.0, 1);
351
352 let mut ctx = harness.create_operator_context();
354 let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
355 assert!(!is_new2);
356 assert_eq!(row_num2_second.0, 2);
357 }
358
359 #[test]
360 fn test_counter_key_uniqueness_per_node() {
361 let provider1 = RowNumberProvider::new(FlowNodeId(1));
365 let provider2 = RowNumberProvider::new(FlowNodeId(2));
366
367 let internal_key1 = provider1.make_counter_key();
368 let internal_key2 = provider2.make_counter_key();
369
370 assert_eq!(internal_key1, internal_key2);
371
372 let final_key1 = FlowNodeInternalStateKey::new(FlowNodeId(1), internal_key1.as_ref().to_vec()).encode();
374 let final_key2 = FlowNodeInternalStateKey::new(FlowNodeId(2), internal_key2.as_ref().to_vec()).encode();
375
376 assert!(!final_key1.is_empty());
377 assert!(!final_key2.is_empty());
378 assert_ne!(final_key1, final_key2);
379 }
380
381 #[test]
382 fn test_map_key_uniqueness() {
383 let provider = RowNumberProvider::new(FlowNodeId(42));
384 let original_key1 = encode_key("test1");
385 let original_key2 = encode_key("test2");
386
387 let map_key1 = provider.make_map_key(&original_key1);
388 let map_key2 = provider.make_map_key(&original_key2);
389
390 assert!(!map_key1.is_empty());
392 assert!(!map_key2.is_empty());
393 assert_ne!(map_key1, map_key2);
394
395 let map_key1_again = provider.make_map_key(&original_key1);
397 assert_eq!(map_key1, map_key1_again);
398 }
399
400 #[test]
401 fn test_counter_key_vs_map_key_separation() {
402 let provider = RowNumberProvider::new(FlowNodeId(1));
404
405 let counter_key = provider.make_counter_key();
406 let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
407
408 assert_ne!(counter_key, map_key);
410 }
411
412 #[test]
413 fn test_batch_mixed_existing_and_new_keys() {
414 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
415 .with_node_id(FlowNodeId(1))
416 .build()
417 .expect("Failed to build harness");
418
419 let provider = RowNumberProvider::new(FlowNodeId(1));
420
421 let key1 = encode_key("batch_key_1");
423 let key2 = encode_key("batch_key_2");
424 let key3 = encode_key("batch_key_3");
425
426 let mut ctx = harness.create_operator_context();
427 let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
428 assert_eq!(rn1.0, 1);
429
430 let mut ctx = harness.create_operator_context();
431 let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
432 assert_eq!(rn2.0, 2);
433
434 let mut ctx = harness.create_operator_context();
435 let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
436 assert_eq!(rn3.0, 3);
437
438 let key4 = encode_key("batch_key_4");
440 let key5 = encode_key("batch_key_5");
441
442 let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
444
445 let mut ctx = harness.create_operator_context();
446 let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
447
448 assert_eq!(results.len(), 5);
450
451 assert_eq!(results[0].0.0, 2);
453 assert!(!results[0].1);
454
455 assert_eq!(results[1].0.0, 4);
457 assert!(results[1].1);
458
459 assert_eq!(results[2].0.0, 1);
461 assert!(!results[2].1);
462
463 assert_eq!(results[3].0.0, 5);
465 assert!(results[3].1);
466
467 assert_eq!(results[4].0.0, 3);
469 assert!(!results[4].1);
470
471 let key6 = encode_key("batch_key_6");
474 let mut ctx = harness.create_operator_context();
475 let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
476 assert_eq!(rn6.0, 6);
477 assert!(is_new6);
478
479 let mut ctx = harness.create_operator_context();
481 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
482 assert_eq!(check_rn4.0, 4);
483 assert!(!is_new4);
484
485 let mut ctx = harness.create_operator_context();
486 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
487 assert_eq!(check_rn5.0, 5);
488 assert!(!is_new5);
489 }
490}