1use std::iter;
5
6use reifydb_core::{
7 encoded::{key::EncodedKey, row::EncodedRow},
8 interface::catalog::flow::FlowNodeId,
9 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
10 util::encoding::keycode::serializer::KeySerializer,
11};
12use reifydb_type::{util::cowvec::CowVec, value::row_number::RowNumber};
13
14use crate::{error::Result, operator::context::OperatorContext};
15
16pub struct RowNumberProvider {
25 node: FlowNodeId,
26}
27
28impl RowNumberProvider {
29 pub fn new(node: FlowNodeId) -> Self {
30 Self {
31 node,
32 }
33 }
34
35 pub fn get_or_create_row_numbers_batch<'a, I>(
39 &self,
40 ctx: &mut OperatorContext,
41 keys: I,
42 ) -> Result<Vec<(RowNumber, bool)>>
43 where
44 I: IntoIterator<Item = &'a EncodedKey>,
45 {
46 let mut results = Vec::new();
47 let mut counter = self.load_counter(ctx)?;
48 let initial_counter = counter;
49
50 for key in keys {
51 let map_key = self.make_map_key(key);
53 let internal_key = FlowNodeInternalStateKey::new(self.node, map_key.as_ref().to_vec());
54
55 if let Some(existing_row) = ctx.state().get(&internal_key.encode())? {
56 let bytes = existing_row.as_ref();
58 if bytes.len() >= 8 {
59 let row_num = u64::from_be_bytes([
60 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
61 bytes[7],
62 ]);
63 results.push((RowNumber(row_num), false));
64 continue;
65 }
66 }
67
68 let new_row_number = RowNumber(counter);
70
71 let row_num_bytes = counter.to_be_bytes().to_vec();
73 ctx.state().set(&internal_key.encode(), &EncodedRow(CowVec::new(row_num_bytes)))?;
74
75 results.push((new_row_number, true));
76 counter += 1;
77 }
78
79 if counter != initial_counter {
81 self.save_counter(ctx, counter)?;
82 }
83
84 Ok(results)
85 }
86
87 pub fn get_or_create_row_number(
88 &self,
89 ctx: &mut OperatorContext,
90 key: &EncodedKey,
91 ) -> Result<(RowNumber, bool)> {
92 Ok(self.get_or_create_row_numbers_batch(ctx, iter::once(key))?.into_iter().next().unwrap())
93 }
94
95 fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
96 let key = self.make_counter_key();
97 let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
98 match ctx.state().get(&internal_key.encode())? {
99 None => Ok(1), Some(state_row) => {
101 let bytes = state_row.as_ref();
103 if bytes.len() >= 8 {
104 Ok(u64::from_be_bytes([
105 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
106 bytes[7],
107 ]))
108 } else {
109 Ok(1)
110 }
111 }
112 }
113 }
114
115 fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
116 let key = self.make_counter_key();
117 let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
118 let value = EncodedRow(CowVec::new(counter.to_be_bytes().to_vec()));
119 ctx.state().set(&internal_key.encode(), &value)?;
120 Ok(())
121 }
122
123 fn make_counter_key(&self) -> EncodedKey {
124 let mut serializer = KeySerializer::new();
125 serializer.extend_u8(b'C'); EncodedKey::new(serializer.finish())
127 }
128
129 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
130 let mut serializer = KeySerializer::new();
131 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
133 EncodedKey::new(serializer.finish())
134 }
135
136 pub fn remove_by_prefix(&self, ctx: &mut OperatorContext, key_prefix: &[u8]) -> Result<()> {
137 let mut prefix = Vec::new();
139 let mut serializer = KeySerializer::new();
140 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
142 prefix.extend_from_slice(key_prefix);
143
144 let internal_prefix = FlowNodeInternalStateKey::new(self.node, prefix);
146 let prefix_key = internal_prefix.encode();
147 let entries = ctx.state().scan_prefix(&prefix_key)?;
148
149 for (key, _) in entries {
150 ctx.state().remove(&key)?;
151 }
152
153 Ok(())
154 }
155}
156
157#[cfg(test)]
158pub mod tests {
159 use std::collections::HashMap;
160
161 use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
162 use reifydb_core::{
163 encoded::key::EncodedKey,
164 interface::catalog::flow::FlowNodeId,
165 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
166 };
167 use reifydb_type::value::{Value, row_number::RowNumber};
168
169 use crate::{
170 error::Result,
171 operator::{
172 FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::OperatorColumn,
173 context::OperatorContext,
174 },
175 state::{FFIRawStatefulOperator, row::RowNumberProvider},
176 testing::{harness::TestHarnessBuilder, helpers::encode_key},
177 };
178
179 struct RowNumberTestOperator;
180
181 impl FFIOperatorMetadata for RowNumberTestOperator {
182 const NAME: &'static str = "row_number_test";
183 const API: u32 = 1;
184 const VERSION: &'static str = "1.0.0";
185 const DESCRIPTION: &'static str = "Test operator for row number provider";
186 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
187 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
188 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
189 }
190
191 impl FFIOperator for RowNumberTestOperator {
192 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
193 Ok(Self)
194 }
195
196 fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
197 Ok(())
198 }
199
200 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
201 Ok(())
202 }
203 }
204
205 impl FFIRawStatefulOperator for RowNumberTestOperator {}
206
207 #[test]
208 fn test_first_row_number_starts_at_one() {
209 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
210 .with_node_id(FlowNodeId(1))
211 .build()
212 .expect("Failed to build harness");
213
214 let key = encode_key("test_key");
215 let mut ctx = harness.create_operator_context();
216 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
217
218 assert_eq!(row_num.0, 1);
219 assert!(is_new);
220 }
221
222 #[test]
223 fn test_duplicate_key_returns_same_row_number() {
224 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
225 .with_node_id(FlowNodeId(1))
226 .build()
227 .expect("Failed to build harness");
228
229 let key = encode_key("test_key");
230
231 let mut ctx = harness.create_operator_context();
232 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
233
234 let mut ctx = harness.create_operator_context();
235 let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
236
237 assert_eq!(row_num1.0, row_num2.0);
238 assert!(is_new1);
239 assert!(!is_new2);
240 }
241
242 #[test]
243 fn test_sequential_numbering() {
244 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
245 .with_node_id(FlowNodeId(1))
246 .build()
247 .expect("Failed to build harness");
248
249 let key1 = encode_key("key1");
250 let key2 = encode_key("key2");
251 let key3 = encode_key("key3");
252
253 let mut ctx = harness.create_operator_context();
254 let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
255
256 let mut ctx = harness.create_operator_context();
257 let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
258
259 let mut ctx = harness.create_operator_context();
260 let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
261
262 assert_eq!(row_num1.0, 1);
263 assert_eq!(row_num2.0, 2);
264 assert_eq!(row_num3.0, 3);
265 }
266
267 #[test]
268 fn test_operator_isolation() {
269 let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
271 .with_node_id(FlowNodeId(1))
272 .build()
273 .expect("Failed to build harness1");
274
275 let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
276 .with_node_id(FlowNodeId(2))
277 .build()
278 .expect("Failed to build harness2");
279
280 let key = encode_key("same_key");
281
282 let mut ctx1 = harness1.create_operator_context();
283 let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
284
285 let mut ctx2 = harness2.create_operator_context();
286 let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
287
288 assert!(is_new1);
290 assert!(is_new2);
291 assert_eq!(row_num1.0, 1);
293 assert_eq!(row_num2.0, 1);
294 }
295
296 #[test]
297 fn test_persistence_across_calls() {
298 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
299 .with_node_id(FlowNodeId(1))
300 .build()
301 .expect("Failed to build harness");
302
303 let key1 = encode_key("key1");
305 let key2 = encode_key("key2");
306
307 let mut ctx = harness.create_operator_context();
308 ctx.get_or_create_row_number(&key1).unwrap();
309
310 let mut ctx = harness.create_operator_context();
311 ctx.get_or_create_row_number(&key2).unwrap();
312
313 let key3 = encode_key("key3");
315 let mut ctx = harness.create_operator_context();
316 let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
317
318 assert!(is_new3);
319 assert_eq!(row_num3.0, 3);
320
321 let mut ctx = harness.create_operator_context();
323 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
324 assert!(!is_new1);
325 assert_eq!(row_num1.0, 1);
326 }
327
328 #[test]
329 fn test_large_scale_row_numbers() {
330 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
331 .with_node_id(FlowNodeId(1))
332 .build()
333 .expect("Failed to build harness");
334
335 for i in 0..1000 {
337 let key = encode_key(format!("key_{}", i));
338 let mut ctx = harness.create_operator_context();
339 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
340 assert!(is_new);
341 assert_eq!(row_num.0, i + 1);
342 }
343
344 let key_500 = encode_key("key_500");
346 let mut ctx = harness.create_operator_context();
347 let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
348 assert!(!is_new);
349 assert_eq!(row_num.0, 501);
350 }
351
352 #[test]
353 fn test_remove_by_prefix() {
354 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
355 .with_node_id(FlowNodeId(1))
356 .build()
357 .expect("Failed to build harness");
358
359 let key_a1 = encode_key("prefix_a_1");
361 let key_a2 = encode_key("prefix_a_2");
362 let key_b1 = encode_key("prefix_b_1");
363
364 let mut ctx = harness.create_operator_context();
365 ctx.get_or_create_row_number(&key_a1).unwrap();
366
367 let mut ctx = harness.create_operator_context();
368 ctx.get_or_create_row_number(&key_a2).unwrap();
369
370 let mut ctx = harness.create_operator_context();
371 ctx.get_or_create_row_number(&key_b1).unwrap();
372
373 let provider = RowNumberProvider::new(FlowNodeId(1));
375 let mut ctx = harness.create_operator_context();
376 provider.remove_by_prefix(&mut ctx, b"prefix_a").unwrap();
377
378 let mut ctx = harness.create_operator_context();
380 let (_, is_new_a1) = ctx.get_or_create_row_number(&key_a1).unwrap();
381
382 let mut ctx = harness.create_operator_context();
383 let (_, is_new_a2) = ctx.get_or_create_row_number(&key_a2).unwrap();
384
385 assert!(is_new_a1);
387 assert!(is_new_a2);
388
389 let mut ctx = harness.create_operator_context();
391 let (_, is_new_b1) = ctx.get_or_create_row_number(&key_b1).unwrap();
392 assert!(!is_new_b1);
393 }
394
395 #[test]
396 fn test_empty_key() {
397 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
398 .with_node_id(FlowNodeId(1))
399 .build()
400 .expect("Failed to build harness");
401
402 let empty_key = encode_key("");
403
404 let mut ctx = harness.create_operator_context();
405 let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
406 assert!(is_new);
407 assert_eq!(row_num.0, 1);
408
409 let mut ctx = harness.create_operator_context();
411 let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
412 assert!(!is_new2);
413 assert_eq!(row_num2.0, 1);
414 }
415
416 #[test]
417 fn test_binary_key_data() {
418 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
419 .with_node_id(FlowNodeId(1))
420 .build()
421 .expect("Failed to build harness");
422
423 let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
425
426 let mut ctx = harness.create_operator_context();
427 let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
428 assert!(is_new);
429 assert_eq!(row_num.0, 1);
430
431 let mut ctx = harness.create_operator_context();
432 let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
433 assert!(!is_new2);
434 assert_eq!(row_num2.0, 1);
435 }
436
437 #[test]
438 fn test_interleaved_operations() {
439 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
440 .with_node_id(FlowNodeId(1))
441 .build()
442 .expect("Failed to build harness");
443
444 let key1 = encode_key("key1");
445 let key2 = encode_key("key2");
446
447 let mut ctx = harness.create_operator_context();
449 let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
450 assert_eq!(row_num1_first.0, 1);
451
452 let mut ctx = harness.create_operator_context();
454 let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
455 assert_eq!(row_num2_first.0, 2);
456
457 let mut ctx = harness.create_operator_context();
459 let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
460 assert!(!is_new1);
461 assert_eq!(row_num1_second.0, 1);
462
463 let mut ctx = harness.create_operator_context();
465 let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
466 assert!(!is_new2);
467 assert_eq!(row_num2_second.0, 2);
468 }
469
470 #[test]
471 fn test_counter_key_uniqueness_per_node() {
472 let provider1 = RowNumberProvider::new(FlowNodeId(1));
474 let provider2 = RowNumberProvider::new(FlowNodeId(2));
475
476 let internal_key1 = provider1.make_counter_key();
477 let internal_key2 = provider2.make_counter_key();
478
479 assert_eq!(internal_key1, internal_key2);
481
482 let final_key1 =
484 FlowNodeInternalStateKey::new(provider1.node, internal_key1.as_ref().to_vec()).encode();
485 let final_key2 =
486 FlowNodeInternalStateKey::new(provider2.node, internal_key2.as_ref().to_vec()).encode();
487
488 assert!(!final_key1.is_empty());
489 assert!(!final_key2.is_empty());
490 assert_ne!(final_key1, final_key2);
491 }
492
493 #[test]
494 fn test_map_key_uniqueness() {
495 let provider = RowNumberProvider::new(FlowNodeId(42));
496 let original_key1 = encode_key("test1");
497 let original_key2 = encode_key("test2");
498
499 let map_key1 = provider.make_map_key(&original_key1);
500 let map_key2 = provider.make_map_key(&original_key2);
501
502 assert!(!map_key1.is_empty());
504 assert!(!map_key2.is_empty());
505 assert_ne!(map_key1, map_key2);
506
507 let map_key1_again = provider.make_map_key(&original_key1);
509 assert_eq!(map_key1, map_key1_again);
510 }
511
512 #[test]
513 fn test_counter_key_vs_map_key_separation() {
514 let provider = RowNumberProvider::new(FlowNodeId(1));
516
517 let counter_key = provider.make_counter_key();
518 let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
519
520 assert_ne!(counter_key, map_key);
522 }
523
524 #[test]
525 fn test_batch_mixed_existing_and_new_keys() {
526 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
527 .with_node_id(FlowNodeId(1))
528 .build()
529 .expect("Failed to build harness");
530
531 let provider = RowNumberProvider::new(FlowNodeId(1));
532
533 let key1 = encode_key("batch_key_1");
535 let key2 = encode_key("batch_key_2");
536 let key3 = encode_key("batch_key_3");
537
538 let mut ctx = harness.create_operator_context();
539 let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
540 assert_eq!(rn1.0, 1);
541
542 let mut ctx = harness.create_operator_context();
543 let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
544 assert_eq!(rn2.0, 2);
545
546 let mut ctx = harness.create_operator_context();
547 let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
548 assert_eq!(rn3.0, 3);
549
550 let key4 = encode_key("batch_key_4");
552 let key5 = encode_key("batch_key_5");
553
554 let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
556
557 let mut ctx = harness.create_operator_context();
558 let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
559
560 assert_eq!(results.len(), 5);
562
563 assert_eq!(results[0].0.0, 2);
565 assert!(!results[0].1);
566
567 assert_eq!(results[1].0.0, 4);
569 assert!(results[1].1);
570
571 assert_eq!(results[2].0.0, 1);
573 assert!(!results[2].1);
574
575 assert_eq!(results[3].0.0, 5);
577 assert!(results[3].1);
578
579 assert_eq!(results[4].0.0, 3);
581 assert!(!results[4].1);
582
583 let key6 = encode_key("batch_key_6");
586 let mut ctx = harness.create_operator_context();
587 let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
588 assert_eq!(rn6.0, 6);
589 assert!(is_new6);
590
591 let mut ctx = harness.create_operator_context();
593 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
594 assert_eq!(check_rn4.0, 4);
595 assert!(!is_new4);
596
597 let mut ctx = harness.create_operator_context();
598 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
599 assert_eq!(check_rn5.0, 5);
600 assert!(!is_new5);
601 }
602}