reifydb_flow_operator_sdk/stateful/
row.rs1use reifydb_core::{
7 EncodedKey,
8 interface::FlowNodeId,
9 key::{EncodableKey, FlowNodeInternalStateKey},
10 util::{CowVec, encoding::keycode::KeySerializer},
11 value::encoded::EncodedValues,
12};
13use reifydb_type::RowNumber;
14
15use crate::{context::OperatorContext, error::Result};
16
17pub struct RowNumberProvider {
26 node: FlowNodeId,
27}
28
29impl RowNumberProvider {
30 pub fn new(node: FlowNodeId) -> Self {
32 Self {
33 node,
34 }
35 }
36
37 pub fn get_or_create_row_numbers_batch<'a, I>(
41 &self,
42 ctx: &mut OperatorContext,
43 keys: I,
44 ) -> Result<Vec<(RowNumber, bool)>>
45 where
46 I: IntoIterator<Item = &'a EncodedKey>,
47 {
48 let mut results = Vec::new();
49 let mut counter = self.load_counter(ctx)?;
50 let initial_counter = counter;
51
52 for key in keys {
53 let map_key = self.make_map_key(key);
55 let internal_key = FlowNodeInternalStateKey::new(self.node, map_key.as_ref().to_vec());
56
57 if let Some(existing_row) = ctx.state().get(&internal_key.encode())? {
58 let bytes = existing_row.as_ref();
60 if bytes.len() >= 8 {
61 let row_num = u64::from_be_bytes([
62 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
63 bytes[7],
64 ]);
65 results.push((RowNumber(row_num), false));
66 continue;
67 }
68 }
69
70 let new_row_number = RowNumber(counter);
72
73 let row_num_bytes = counter.to_be_bytes().to_vec();
75 ctx.state().set(&internal_key.encode(), &EncodedValues(CowVec::new(row_num_bytes)))?;
76
77 results.push((new_row_number, true));
78 counter += 1;
79 }
80
81 if counter != initial_counter {
83 self.save_counter(ctx, counter)?;
84 }
85
86 Ok(results)
87 }
88
89 pub fn get_or_create_row_number(
92 &self,
93 ctx: &mut OperatorContext,
94 key: &EncodedKey,
95 ) -> Result<(RowNumber, bool)> {
96 Ok(self.get_or_create_row_numbers_batch(ctx, std::iter::once(key))?.into_iter().next().unwrap())
97 }
98
99 fn load_counter(&self, ctx: &mut OperatorContext) -> Result<u64> {
101 let key = self.make_counter_key();
102 let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
103 match ctx.state().get(&internal_key.encode())? {
104 None => Ok(1), Some(state_row) => {
106 let bytes = state_row.as_ref();
108 if bytes.len() >= 8 {
109 Ok(u64::from_be_bytes([
110 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
111 bytes[7],
112 ]))
113 } else {
114 Ok(1)
115 }
116 }
117 }
118 }
119
120 fn save_counter(&self, ctx: &mut OperatorContext, counter: u64) -> Result<()> {
122 let key = self.make_counter_key();
123 let internal_key = FlowNodeInternalStateKey::new(self.node, key.as_ref().to_vec());
124 let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
125 ctx.state().set(&internal_key.encode(), &value)?;
126 Ok(())
127 }
128
129 fn make_counter_key(&self) -> EncodedKey {
131 let mut serializer = KeySerializer::new();
132 serializer.extend_u8(b'C'); EncodedKey::new(serializer.finish())
134 }
135
136 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
138 let mut serializer = KeySerializer::new();
139 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
141 EncodedKey::new(serializer.finish())
142 }
143
144 pub fn remove_by_prefix(&self, ctx: &mut OperatorContext, key_prefix: &[u8]) -> Result<()> {
147 let mut prefix = Vec::new();
149 let mut serializer = KeySerializer::new();
150 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
152 prefix.extend_from_slice(key_prefix);
153
154 let internal_prefix = FlowNodeInternalStateKey::new(self.node, prefix);
156 let prefix_key = internal_prefix.encode();
157 let entries = ctx.state().scan_prefix(&prefix_key)?;
158
159 for (key, _) in entries {
160 ctx.state().remove(&key)?;
161 }
162
163 Ok(())
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::collections::HashMap;
170
171 use reifydb_core::{
172 EncodedKey, Row,
173 interface::FlowNodeId,
174 key::{EncodableKey, FlowNodeInternalStateKey},
175 };
176 use reifydb_type::{RowNumber, Value};
177
178 use crate::{
179 FFIOperator, FFIOperatorMetadata, FlowChange,
180 context::OperatorContext,
181 error::Result,
182 stateful::{FFIRawStatefulOperator, RowNumberProvider},
183 testing::{TestHarnessBuilder, helpers::encode_key},
184 };
185
186 struct RowNumberTestOperator;
188
189 impl FFIOperatorMetadata for RowNumberTestOperator {
190 const NAME: &'static str = "row_number_test";
191 const API: u32 = 1;
192 const VERSION: &'static str = "1.0.0";
193 const DESCRIPTION: &'static str = "Test operator for row number provider";
194 const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
195 const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
196 const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
197 }
198
199 impl FFIOperator for RowNumberTestOperator {
200 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
201 Ok(Self)
202 }
203
204 fn apply(&mut self, _ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
205 Ok(input)
206 }
207
208 fn get_rows(
209 &mut self,
210 _ctx: &mut OperatorContext,
211 _row_numbers: &[RowNumber],
212 ) -> Result<Vec<Option<Row>>> {
213 Ok(vec![])
214 }
215 }
216
217 impl FFIRawStatefulOperator for RowNumberTestOperator {}
218
219 #[test]
220 fn test_first_row_number_starts_at_one() {
221 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
222 .with_node_id(FlowNodeId(1))
223 .build()
224 .expect("Failed to build harness");
225
226 let key = encode_key("test_key");
227 let mut ctx = harness.create_operator_context();
228 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
229
230 assert_eq!(row_num.0, 1);
231 assert!(is_new);
232 }
233
234 #[test]
235 fn test_duplicate_key_returns_same_row_number() {
236 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
237 .with_node_id(FlowNodeId(1))
238 .build()
239 .expect("Failed to build harness");
240
241 let key = encode_key("test_key");
242
243 let mut ctx = harness.create_operator_context();
244 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key).unwrap();
245
246 let mut ctx = harness.create_operator_context();
247 let (row_num2, is_new2) = ctx.get_or_create_row_number(&key).unwrap();
248
249 assert_eq!(row_num1.0, row_num2.0);
250 assert!(is_new1);
251 assert!(!is_new2);
252 }
253
254 #[test]
255 fn test_sequential_numbering() {
256 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
257 .with_node_id(FlowNodeId(1))
258 .build()
259 .expect("Failed to build harness");
260
261 let key1 = encode_key("key1");
262 let key2 = encode_key("key2");
263 let key3 = encode_key("key3");
264
265 let mut ctx = harness.create_operator_context();
266 let (row_num1, _) = ctx.get_or_create_row_number(&key1).unwrap();
267
268 let mut ctx = harness.create_operator_context();
269 let (row_num2, _) = ctx.get_or_create_row_number(&key2).unwrap();
270
271 let mut ctx = harness.create_operator_context();
272 let (row_num3, _) = ctx.get_or_create_row_number(&key3).unwrap();
273
274 assert_eq!(row_num1.0, 1);
275 assert_eq!(row_num2.0, 2);
276 assert_eq!(row_num3.0, 3);
277 }
278
279 #[test]
280 fn test_operator_isolation() {
281 let mut harness1 = TestHarnessBuilder::<RowNumberTestOperator>::new()
283 .with_node_id(FlowNodeId(1))
284 .build()
285 .expect("Failed to build harness1");
286
287 let mut harness2 = TestHarnessBuilder::<RowNumberTestOperator>::new()
288 .with_node_id(FlowNodeId(2))
289 .build()
290 .expect("Failed to build harness2");
291
292 let key = encode_key("same_key");
293
294 let mut ctx1 = harness1.create_operator_context();
295 let (row_num1, is_new1) = ctx1.get_or_create_row_number(&key).unwrap();
296
297 let mut ctx2 = harness2.create_operator_context();
298 let (row_num2, is_new2) = ctx2.get_or_create_row_number(&key).unwrap();
299
300 assert!(is_new1);
302 assert!(is_new2);
303 assert_eq!(row_num1.0, 1);
305 assert_eq!(row_num2.0, 1);
306 }
307
308 #[test]
309 fn test_persistence_across_calls() {
310 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
311 .with_node_id(FlowNodeId(1))
312 .build()
313 .expect("Failed to build harness");
314
315 let key1 = encode_key("key1");
317 let key2 = encode_key("key2");
318
319 let mut ctx = harness.create_operator_context();
320 ctx.get_or_create_row_number(&key1).unwrap();
321
322 let mut ctx = harness.create_operator_context();
323 ctx.get_or_create_row_number(&key2).unwrap();
324
325 let key3 = encode_key("key3");
327 let mut ctx = harness.create_operator_context();
328 let (row_num3, is_new3) = ctx.get_or_create_row_number(&key3).unwrap();
329
330 assert!(is_new3);
331 assert_eq!(row_num3.0, 3);
332
333 let mut ctx = harness.create_operator_context();
335 let (row_num1, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
336 assert!(!is_new1);
337 assert_eq!(row_num1.0, 1);
338 }
339
340 #[test]
341 fn test_large_scale_row_numbers() {
342 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
343 .with_node_id(FlowNodeId(1))
344 .build()
345 .expect("Failed to build harness");
346
347 for i in 0..1000 {
349 let key = encode_key(format!("key_{}", i));
350 let mut ctx = harness.create_operator_context();
351 let (row_num, is_new) = ctx.get_or_create_row_number(&key).unwrap();
352 assert!(is_new);
353 assert_eq!(row_num.0, i + 1);
354 }
355
356 let key_500 = encode_key("key_500");
358 let mut ctx = harness.create_operator_context();
359 let (row_num, is_new) = ctx.get_or_create_row_number(&key_500).unwrap();
360 assert!(!is_new);
361 assert_eq!(row_num.0, 501);
362 }
363
364 #[test]
365 fn test_remove_by_prefix() {
366 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
367 .with_node_id(FlowNodeId(1))
368 .build()
369 .expect("Failed to build harness");
370
371 let key_a1 = encode_key("prefix_a_1");
373 let key_a2 = encode_key("prefix_a_2");
374 let key_b1 = encode_key("prefix_b_1");
375
376 let mut ctx = harness.create_operator_context();
377 ctx.get_or_create_row_number(&key_a1).unwrap();
378
379 let mut ctx = harness.create_operator_context();
380 ctx.get_or_create_row_number(&key_a2).unwrap();
381
382 let mut ctx = harness.create_operator_context();
383 ctx.get_or_create_row_number(&key_b1).unwrap();
384
385 let provider = RowNumberProvider::new(FlowNodeId(1));
387 let mut ctx = harness.create_operator_context();
388 provider.remove_by_prefix(&mut ctx, b"prefix_a").unwrap();
389
390 let mut ctx = harness.create_operator_context();
392 let (_, is_new_a1) = ctx.get_or_create_row_number(&key_a1).unwrap();
393
394 let mut ctx = harness.create_operator_context();
395 let (_, is_new_a2) = ctx.get_or_create_row_number(&key_a2).unwrap();
396
397 assert!(is_new_a1);
399 assert!(is_new_a2);
400
401 let mut ctx = harness.create_operator_context();
403 let (_, is_new_b1) = ctx.get_or_create_row_number(&key_b1).unwrap();
404 assert!(!is_new_b1);
405 }
406
407 #[test]
408 fn test_empty_key() {
409 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
410 .with_node_id(FlowNodeId(1))
411 .build()
412 .expect("Failed to build harness");
413
414 let empty_key = encode_key("");
415
416 let mut ctx = harness.create_operator_context();
417 let (row_num, is_new) = ctx.get_or_create_row_number(&empty_key).unwrap();
418 assert!(is_new);
419 assert_eq!(row_num.0, 1);
420
421 let mut ctx = harness.create_operator_context();
423 let (row_num2, is_new2) = ctx.get_or_create_row_number(&empty_key).unwrap();
424 assert!(!is_new2);
425 assert_eq!(row_num2.0, 1);
426 }
427
428 #[test]
429 fn test_binary_key_data() {
430 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
431 .with_node_id(FlowNodeId(1))
432 .build()
433 .expect("Failed to build harness");
434
435 let binary_key = EncodedKey::new(vec![0x00, 0xFF, 0x00, 0xAB, 0xCD]);
437
438 let mut ctx = harness.create_operator_context();
439 let (row_num, is_new) = ctx.get_or_create_row_number(&binary_key).unwrap();
440 assert!(is_new);
441 assert_eq!(row_num.0, 1);
442
443 let mut ctx = harness.create_operator_context();
444 let (row_num2, is_new2) = ctx.get_or_create_row_number(&binary_key).unwrap();
445 assert!(!is_new2);
446 assert_eq!(row_num2.0, 1);
447 }
448
449 #[test]
450 fn test_interleaved_operations() {
451 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
452 .with_node_id(FlowNodeId(1))
453 .build()
454 .expect("Failed to build harness");
455
456 let key1 = encode_key("key1");
457 let key2 = encode_key("key2");
458
459 let mut ctx = harness.create_operator_context();
461 let (row_num1_first, _) = ctx.get_or_create_row_number(&key1).unwrap();
462 assert_eq!(row_num1_first.0, 1);
463
464 let mut ctx = harness.create_operator_context();
466 let (row_num2_first, _) = ctx.get_or_create_row_number(&key2).unwrap();
467 assert_eq!(row_num2_first.0, 2);
468
469 let mut ctx = harness.create_operator_context();
471 let (row_num1_second, is_new1) = ctx.get_or_create_row_number(&key1).unwrap();
472 assert!(!is_new1);
473 assert_eq!(row_num1_second.0, 1);
474
475 let mut ctx = harness.create_operator_context();
477 let (row_num2_second, is_new2) = ctx.get_or_create_row_number(&key2).unwrap();
478 assert!(!is_new2);
479 assert_eq!(row_num2_second.0, 2);
480 }
481
482 #[test]
483 fn test_counter_key_uniqueness_per_node() {
484 let provider1 = RowNumberProvider::new(FlowNodeId(1));
486 let provider2 = RowNumberProvider::new(FlowNodeId(2));
487
488 let internal_key1 = provider1.make_counter_key();
489 let internal_key2 = provider2.make_counter_key();
490
491 assert_eq!(internal_key1, internal_key2);
493
494 let final_key1 =
496 FlowNodeInternalStateKey::new(provider1.node, internal_key1.as_ref().to_vec()).encode();
497 let final_key2 =
498 FlowNodeInternalStateKey::new(provider2.node, internal_key2.as_ref().to_vec()).encode();
499
500 assert!(!final_key1.is_empty());
501 assert!(!final_key2.is_empty());
502 assert_ne!(final_key1, final_key2);
503 }
504
505 #[test]
506 fn test_map_key_uniqueness() {
507 let provider = RowNumberProvider::new(FlowNodeId(42));
508 let original_key1 = encode_key("test1");
509 let original_key2 = encode_key("test2");
510
511 let map_key1 = provider.make_map_key(&original_key1);
512 let map_key2 = provider.make_map_key(&original_key2);
513
514 assert!(!map_key1.is_empty());
516 assert!(!map_key2.is_empty());
517 assert_ne!(map_key1, map_key2);
518
519 let map_key1_again = provider.make_map_key(&original_key1);
521 assert_eq!(map_key1, map_key1_again);
522 }
523
524 #[test]
525 fn test_counter_key_vs_map_key_separation() {
526 let provider = RowNumberProvider::new(FlowNodeId(1));
528
529 let counter_key = provider.make_counter_key();
530 let map_key = provider.make_map_key(&EncodedKey::new(Vec::new()));
531
532 assert_ne!(counter_key, map_key);
534 }
535
536 #[test]
537 fn test_batch_mixed_existing_and_new_keys() {
538 let mut harness = TestHarnessBuilder::<RowNumberTestOperator>::new()
539 .with_node_id(FlowNodeId(1))
540 .build()
541 .expect("Failed to build harness");
542
543 let provider = RowNumberProvider::new(FlowNodeId(1));
544
545 let key1 = encode_key("batch_key_1");
547 let key2 = encode_key("batch_key_2");
548 let key3 = encode_key("batch_key_3");
549
550 let mut ctx = harness.create_operator_context();
551 let (rn1, _) = provider.get_or_create_row_number(&mut ctx, &key1).unwrap();
552 assert_eq!(rn1.0, 1);
553
554 let mut ctx = harness.create_operator_context();
555 let (rn2, _) = provider.get_or_create_row_number(&mut ctx, &key2).unwrap();
556 assert_eq!(rn2.0, 2);
557
558 let mut ctx = harness.create_operator_context();
559 let (rn3, _) = provider.get_or_create_row_number(&mut ctx, &key3).unwrap();
560 assert_eq!(rn3.0, 3);
561
562 let key4 = encode_key("batch_key_4");
564 let key5 = encode_key("batch_key_5");
565
566 let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
568
569 let mut ctx = harness.create_operator_context();
570 let results = provider.get_or_create_row_numbers_batch(&mut ctx, batch_keys.into_iter()).unwrap();
571
572 assert_eq!(results.len(), 5);
574
575 assert_eq!(results[0].0.0, 2);
577 assert!(!results[0].1);
578
579 assert_eq!(results[1].0.0, 4);
581 assert!(results[1].1);
582
583 assert_eq!(results[2].0.0, 1);
585 assert!(!results[2].1);
586
587 assert_eq!(results[3].0.0, 5);
589 assert!(results[3].1);
590
591 assert_eq!(results[4].0.0, 3);
593 assert!(!results[4].1);
594
595 let key6 = encode_key("batch_key_6");
598 let mut ctx = harness.create_operator_context();
599 let (rn6, is_new6) = provider.get_or_create_row_number(&mut ctx, &key6).unwrap();
600 assert_eq!(rn6.0, 6);
601 assert!(is_new6);
602
603 let mut ctx = harness.create_operator_context();
605 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut ctx, &key4).unwrap();
606 assert_eq!(check_rn4.0, 4);
607 assert!(!is_new4);
608
609 let mut ctx = harness.create_operator_context();
610 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut ctx, &key5).unwrap();
611 assert_eq!(check_rn5.0, 5);
612 assert!(!is_new5);
613 }
614}