1use std::iter::once;
4
5use reifydb_core::{
6 EncodedKey,
7 interface::FlowNodeId,
8 key::{EncodableKey, FlowNodeInternalStateKey},
9 util::{CowVec, encoding::keycode::KeySerializer},
10 value::encoded::{EncodedKeyRange, EncodedValues},
11};
12use reifydb_type::RowNumber;
13
14use crate::{
15 operator::stateful::utils::{internal_state_get, internal_state_set},
16 transaction::FlowTransaction,
17};
18
19pub struct RowNumberProvider {
29 node: FlowNodeId,
30}
31
32impl RowNumberProvider {
33 pub fn new(node: FlowNodeId) -> Self {
35 Self {
36 node,
37 }
38 }
39
40 pub fn get_or_create_row_numbers_batch<'a, I>(
44 &self,
45 txn: &mut FlowTransaction,
46 keys: I,
47 ) -> crate::Result<Vec<(RowNumber, bool)>>
48 where
49 I: IntoIterator<Item = &'a EncodedKey>,
50 {
51 let mut results = Vec::new();
52 let mut counter = self.load_counter(txn)?;
53 let initial_counter = counter;
54
55 for key in keys {
56 let map_key = self.make_map_key(key);
57
58 if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
59 let bytes = existing_row.as_ref();
60 if bytes.len() >= 8 {
61 let row_num = u64::from_be_bytes([
62 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
63 bytes[7],
64 ]);
65 results.push((RowNumber(row_num), false));
66 continue;
67 }
68 }
69
70 let new_row_number = RowNumber(counter);
71
72 let row_num_bytes = counter.to_be_bytes().to_vec();
74 internal_state_set(self.node, txn, &map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
75
76 let reverse_key = self.make_reverse_map_key(new_row_number);
78 internal_state_set(
79 self.node,
80 txn,
81 &reverse_key,
82 EncodedValues(CowVec::new(key.as_ref().to_vec())),
83 )?;
84
85 results.push((new_row_number, true));
86 counter += 1;
87 }
88
89 if counter != initial_counter {
91 self.save_counter(txn, counter)?;
92 }
93
94 Ok(results)
95 }
96
97 pub fn get_or_create_row_number(
101 &self,
102 txn: &mut FlowTransaction,
103 key: &EncodedKey,
104 ) -> crate::Result<(RowNumber, bool)> {
105 Ok(self.get_or_create_row_numbers_batch(txn, once(key))?.into_iter().next().unwrap())
106 }
107
108 pub fn get_key_for_row_number(
110 &self,
111 txn: &mut FlowTransaction,
112 row_number: RowNumber,
113 ) -> crate::Result<Option<EncodedKey>> {
114 let reverse_key = self.make_reverse_map_key(row_number);
115 if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
116 Ok(Some(EncodedKey::new(key_bytes.as_ref().to_vec())))
117 } else {
118 Ok(None)
119 }
120 }
121
122 fn load_counter(&self, txn: &mut FlowTransaction) -> crate::Result<u64> {
124 let key = self.make_counter_key();
125 match internal_state_get(self.node, txn, &key)? {
126 None => Ok(1), Some(state_row) => {
128 let bytes = state_row.as_ref();
130 if bytes.len() >= 8 {
131 Ok(u64::from_be_bytes([
132 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
133 bytes[7],
134 ]))
135 } else {
136 Ok(1)
137 }
138 }
139 }
140 }
141
142 fn save_counter(&self, txn: &mut FlowTransaction, counter: u64) -> crate::Result<()> {
144 let key = self.make_counter_key();
145 let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
146 internal_state_set(self.node, txn, &key, value)?;
147 Ok(())
148 }
149
150 fn make_counter_key(&self) -> EncodedKey {
152 let mut serializer = KeySerializer::new();
153 serializer.extend_u8(b'C'); EncodedKey::new(serializer.finish())
155 }
156
157 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
159 let mut serializer = KeySerializer::new();
160 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
162 EncodedKey::new(serializer.finish())
163 }
164
165 fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
167 let mut serializer = KeySerializer::new();
168 serializer.extend_u8(b'R'); serializer.extend_u64(row_number.0);
170 EncodedKey::new(serializer.finish())
171 }
172
173 pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> crate::Result<()> {
176 let mut prefix = Vec::new();
178 let mut serializer = KeySerializer::new();
179 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
181 prefix.extend_from_slice(key_prefix);
182
183 let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
184 let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
185
186 let keys_to_remove: Vec<_> = txn.range(full_range)?.map(|multi| multi.key).collect();
187
188 for key in keys_to_remove {
189 txn.remove(&key)?;
190 }
191
192 Ok(())
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use reifydb_core::CommitVersion;
199
200 use super::*;
201 use crate::operator::stateful::test_utils::test::*;
202
203 #[test]
204 fn test_first_row_number() {
205 let mut txn = create_test_transaction();
206 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
207 let provider = RowNumberProvider::new(FlowNodeId(1));
208
209 let key = test_key("first");
210 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
211
212 assert_eq!(row_num.0, 1);
213 assert!(is_new);
214 }
215
216 #[test]
217 fn test_duplicate_key_same_row_number() {
218 let mut txn = create_test_transaction();
219 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
220 let provider = RowNumberProvider::new(FlowNodeId(1));
221
222 let key = test_key("duplicate");
223
224 let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
226 assert_eq!(row_num1.0, 1);
227 assert!(is_new1);
228
229 let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
231 assert_eq!(row_num2.0, 1);
232 assert!(!is_new2);
233
234 assert_eq!(row_num1, row_num2);
236 }
237
238 #[test]
239 fn test_sequential_row_numbers() {
240 let mut txn = create_test_transaction();
241 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
242 let provider = RowNumberProvider::new(FlowNodeId(1));
243
244 for i in 1..=5 {
246 let key = test_key(&format!("key_{}", i));
247 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
248
249 assert_eq!(row_num.0, i as u64);
250 assert!(is_new);
251 }
252 }
253
254 #[test]
255 fn test_mixed_new_and_existing() {
256 let mut txn = create_test_transaction();
257 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
258 let provider = RowNumberProvider::new(FlowNodeId(1));
259
260 let key1 = test_key("mixed_1");
262 let key2 = test_key("mixed_2");
263 let key3 = test_key("mixed_3");
264
265 let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
267 let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
268 let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
269
270 assert_eq!(rn1.0, 1);
271 assert!(new1);
272 assert_eq!(rn2.0, 2);
273 assert!(new2);
274 assert_eq!(rn3.0, 3);
275 assert!(new3);
276
277 let key4 = test_key("mixed_4");
279 let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
280 let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
281 let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
282
283 assert_eq!(rn2_again.0, 2);
284 assert!(!new2_again);
285 assert_eq!(rn4.0, 4); assert!(new4);
287 assert_eq!(rn1_again.0, 1);
288 assert!(!new1_again);
289 }
290
291 #[test]
292 fn test_multiple_providers_isolated() {
293 let mut txn = create_test_transaction();
294 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
295 let provider1 = RowNumberProvider::new(FlowNodeId(1));
296 let provider2 = RowNumberProvider::new(FlowNodeId(2));
297
298 let key = test_key("shared_key");
299
300 let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
302 let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
303
304 assert_eq!(rn1.0, 1);
305 assert_eq!(rn2.0, 1);
306
307 let key2 = test_key("key2");
309 let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
310 assert_eq!(rn1_2.0, 2);
311
312 let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
314 assert_eq!(rn2_2.0, 2);
315 }
316
317 #[test]
318 fn test_counter_persistence() {
319 let mut txn = create_test_transaction();
320 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
321 let operator = TestOperator::simple(FlowNodeId(1));
322 let provider = RowNumberProvider::new(FlowNodeId(1));
323
324 for i in 1..=3 {
326 let key = test_key(&format!("persist_{}", i));
327 let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
328 assert_eq!(rn.0, i as u64);
329 }
330
331 let new_key = test_key("persist_new");
333 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
334
335 assert_eq!(rn.0, 4);
337 assert!(is_new);
338 }
339
340 #[test]
341 fn test_large_row_numbers() {
342 let mut txn = create_test_transaction();
343 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
344 let operator = TestOperator::simple(FlowNodeId(1));
345 let provider = RowNumberProvider::new(FlowNodeId(1));
346
347 for i in 1..=1000 {
349 let key = test_key(&format!("large_{}", i));
350 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
351 assert_eq!(rn.0, i as u64);
352 assert!(is_new);
353 }
354
355 let key = test_key("large_1");
357 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
358 assert_eq!(rn.0, 1);
359 assert!(!is_new);
360
361 let key = test_key("large_1001");
363 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
364 assert_eq!(rn.0, 1001);
365 assert!(is_new);
366 }
367
368 #[test]
369 fn test_batch_mixed_existing_and_new_keys() {
370 let mut txn = create_test_transaction();
371 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
372 let operator = TestOperator::simple(FlowNodeId(1));
373 let provider = RowNumberProvider::new(FlowNodeId(1));
374
375 let key1 = test_key("batch_key_1");
377 let key2 = test_key("batch_key_2");
378 let key3 = test_key("batch_key_3");
379
380 let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
381 assert_eq!(rn1.0, 1);
382
383 let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
384 assert_eq!(rn2.0, 2);
385
386 let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
387 assert_eq!(rn3.0, 3);
388
389 let key4 = test_key("batch_key_4");
391 let key5 = test_key("batch_key_5");
392
393 let batch_keys = vec![&key2, &key4, &key1, &key5, &key3];
395
396 let results = provider.get_or_create_row_numbers_batch(&mut txn, batch_keys.into_iter()).unwrap();
397
398 assert_eq!(results.len(), 5);
400
401 assert_eq!(results[0].0.0, 2);
403 assert!(!results[0].1);
404
405 assert_eq!(results[1].0.0, 4);
407 assert!(results[1].1);
408
409 assert_eq!(results[2].0.0, 1);
411 assert!(!results[2].1);
412
413 assert_eq!(results[3].0.0, 5);
415 assert!(results[3].1);
416
417 assert_eq!(results[4].0.0, 3);
419 assert!(!results[4].1);
420
421 let key6 = test_key("batch_key_6");
424 let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
425 assert_eq!(rn6.0, 6);
426 assert!(is_new6);
427
428 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
430 assert_eq!(check_rn4.0, 4);
431 assert!(!is_new4);
432
433 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
434 assert_eq!(check_rn5.0, 5);
435 assert!(!is_new5);
436
437 let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
439 assert_eq!(reverse_key4, Some(key4));
440
441 let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
442 assert_eq!(reverse_key5, Some(key5));
443
444 let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
446 assert_eq!(reverse_key1, Some(key1));
447
448 let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
449 assert_eq!(reverse_key2, Some(key2));
450 }
451}