1use std::iter::once;
4
5use reifydb_core::{
6 encoded::{
7 key::{EncodedKey, EncodedKeyRange},
8 row::EncodedRow,
9 },
10 interface::catalog::flow::FlowNodeId,
11 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12 util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17 operator::stateful::{
18 counter::{Counter, CounterDirection},
19 utils::{internal_state_get, internal_state_set},
20 },
21 transaction::FlowTransaction,
22};
23
24pub struct RowNumberProvider {
34 node: FlowNodeId,
35 counter: Counter,
36}
37
38impl RowNumberProvider {
39 pub fn new(node: FlowNodeId) -> Self {
41 Self {
42 node,
43 counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44 }
45 }
46
47 pub fn get_or_create_row_numbers<'a, I>(
51 &self,
52 txn: &mut FlowTransaction,
53 keys: I,
54 ) -> Result<Vec<(RowNumber, bool)>>
55 where
56 I: IntoIterator<Item = &'a EncodedKey>,
57 {
58 let mut results = Vec::new();
59
60 for key in keys {
61 let map_key = self.make_map_key(key);
62
63 if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64 let bytes = existing_row.as_slice();
65 if bytes.len() >= 8 {
66 let row_num = u64::from_be_bytes([
67 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68 bytes[7],
69 ]);
70 results.push((RowNumber(row_num), false));
71 continue;
72 }
73 }
74
75 let new_row_number = self.counter.next(txn)?;
76
77 let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79 internal_state_set(self.node, txn, &map_key, EncodedRow(CowVec::new(row_num_bytes)))?;
80
81 let reverse_key = self.make_reverse_map_key(new_row_number);
83 internal_state_set(
84 self.node,
85 txn,
86 &reverse_key,
87 EncodedRow(CowVec::new(key.as_ref().to_vec())),
88 )?;
89
90 results.push((new_row_number, true));
91 }
92
93 Ok(results)
94 }
95
96 pub fn get_or_create_row_number(
100 &self,
101 txn: &mut FlowTransaction,
102 key: &EncodedKey,
103 ) -> Result<(RowNumber, bool)> {
104 Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105 }
106
107 pub fn get_key_for_row_number(
109 &self,
110 txn: &mut FlowTransaction,
111 row_number: RowNumber,
112 ) -> Result<Option<EncodedKey>> {
113 let reverse_key = self.make_reverse_map_key(row_number);
114 if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115 Ok(Some(EncodedKey::new(key_bytes.to_vec())))
116 } else {
117 Ok(None)
118 }
119 }
120
121 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123 let mut serializer = KeySerializer::new();
124 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
126 EncodedKey::new(serializer.finish())
127 }
128
129 fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131 let mut serializer = KeySerializer::new();
132 serializer.extend_u8(b'R'); serializer.extend_u64(row_number.0);
134 EncodedKey::new(serializer.finish())
135 }
136
137 pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> Result<()> {
140 let mut prefix = Vec::new();
142 let mut serializer = KeySerializer::new();
143 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
145 prefix.extend_from_slice(key_prefix);
146
147 let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148 let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150 let keys_to_remove = {
151 let stream = txn.range(full_range, 1024);
152 let mut keys = Vec::new();
153 for result in stream {
154 let multi = result?;
155 keys.push(multi.key);
156 }
157 keys
158 };
159
160 for key in keys_to_remove {
161 txn.remove(&key)?;
162 }
163
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169pub mod tests {
170 use reifydb_catalog::catalog::Catalog;
171 use reifydb_core::common::CommitVersion;
172 use reifydb_runtime::context::clock::{Clock, MockClock};
173 use reifydb_transaction::interceptor::interceptors::Interceptors;
174
175 use super::*;
176 use crate::operator::stateful::test_utils::test::*;
177
178 #[test]
179 fn test_first_row_number() {
180 let mut txn = create_test_transaction();
181 let mut txn = FlowTransaction::deferred(
182 &mut txn,
183 CommitVersion(1),
184 Catalog::testing(),
185 Interceptors::new(),
186 Clock::Mock(MockClock::from_millis(1000)),
187 );
188 let provider = RowNumberProvider::new(FlowNodeId(1));
189
190 let key = test_key("first");
191 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
192
193 assert_eq!(row_num.0, 1);
194 assert!(is_new);
195 }
196
197 #[test]
198 fn test_duplicate_key_same_row_number() {
199 let mut txn = create_test_transaction();
200 let mut txn = FlowTransaction::deferred(
201 &mut txn,
202 CommitVersion(1),
203 Catalog::testing(),
204 Interceptors::new(),
205 Clock::Mock(MockClock::from_millis(1000)),
206 );
207 let provider = RowNumberProvider::new(FlowNodeId(1));
208
209 let key = test_key("duplicate");
210
211 let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
213 assert_eq!(row_num1.0, 1);
214 assert!(is_new1);
215
216 let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
218 assert_eq!(row_num2.0, 1);
219 assert!(!is_new2);
220
221 assert_eq!(row_num1, row_num2);
223 }
224
225 #[test]
226 fn test_sequential_row_numbers() {
227 let mut txn = create_test_transaction();
228 let mut txn = FlowTransaction::deferred(
229 &mut txn,
230 CommitVersion(1),
231 Catalog::testing(),
232 Interceptors::new(),
233 Clock::Mock(MockClock::from_millis(1000)),
234 );
235 let provider = RowNumberProvider::new(FlowNodeId(1));
236
237 for i in 1..=5 {
239 let key = test_key(&format!("key_{}", i));
240 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
241
242 assert_eq!(row_num.0, i as u64);
243 assert!(is_new);
244 }
245 }
246
247 #[test]
248 fn test_mixed_new_and_existing() {
249 let mut txn = create_test_transaction();
250 let mut txn = FlowTransaction::deferred(
251 &mut txn,
252 CommitVersion(1),
253 Catalog::testing(),
254 Interceptors::new(),
255 Clock::Mock(MockClock::from_millis(1000)),
256 );
257 let provider = RowNumberProvider::new(FlowNodeId(1));
258
259 let key1 = test_key("mixed_1");
261 let key2 = test_key("mixed_2");
262 let key3 = test_key("mixed_3");
263
264 let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
266 let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
267 let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
268
269 assert_eq!(rn1.0, 1);
270 assert!(new1);
271 assert_eq!(rn2.0, 2);
272 assert!(new2);
273 assert_eq!(rn3.0, 3);
274 assert!(new3);
275
276 let key4 = test_key("mixed_4");
278 let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
279 let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
280 let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
281
282 assert_eq!(rn2_again.0, 2);
283 assert!(!new2_again);
284 assert_eq!(rn4.0, 4); assert!(new4);
286 assert_eq!(rn1_again.0, 1);
287 assert!(!new1_again);
288 }
289
290 #[test]
291 fn test_multiple_providers_isolated() {
292 let mut txn = create_test_transaction();
293 let mut txn = FlowTransaction::deferred(
294 &mut txn,
295 CommitVersion(1),
296 Catalog::testing(),
297 Interceptors::new(),
298 Clock::Mock(MockClock::from_millis(1000)),
299 );
300 let provider1 = RowNumberProvider::new(FlowNodeId(1));
301 let provider2 = RowNumberProvider::new(FlowNodeId(2));
302
303 let key = test_key("shared_key");
304
305 let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
307 let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
308
309 assert_eq!(rn1.0, 1);
310 assert_eq!(rn2.0, 1);
311
312 let key2 = test_key("key2");
314 let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
315 assert_eq!(rn1_2.0, 2);
316
317 let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
319 assert_eq!(rn2_2.0, 2);
320 }
321
322 #[test]
323 fn test_counter_persistence() {
324 let mut txn = create_test_transaction();
325 let mut txn = FlowTransaction::deferred(
326 &mut txn,
327 CommitVersion(1),
328 Catalog::testing(),
329 Interceptors::new(),
330 Clock::Mock(MockClock::from_millis(1000)),
331 );
332 let provider = RowNumberProvider::new(FlowNodeId(1));
333
334 for i in 1..=3 {
336 let key = test_key(&format!("persist_{}", i));
337 let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
338 assert_eq!(rn.0, i as u64);
339 }
340
341 let new_key = test_key("persist_new");
343 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
344
345 assert_eq!(rn.0, 4);
347 assert!(is_new);
348 }
349
350 #[test]
351 fn test_large_row_numbers() {
352 let mut txn = create_test_transaction();
353 let mut txn = FlowTransaction::deferred(
354 &mut txn,
355 CommitVersion(1),
356 Catalog::testing(),
357 Interceptors::new(),
358 Clock::Mock(MockClock::from_millis(1000)),
359 );
360 let provider = RowNumberProvider::new(FlowNodeId(1));
361
362 for i in 1..=1000 {
364 let key = test_key(&format!("large_{}", i));
365 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
366 assert_eq!(rn.0, i as u64);
367 assert!(is_new);
368 }
369
370 let key = test_key("large_1");
372 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
373 assert_eq!(rn.0, 1);
374 assert!(!is_new);
375
376 let key = test_key("large_1001");
378 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
379 assert_eq!(rn.0, 1001);
380 assert!(is_new);
381 }
382
383 #[test]
384 fn test_mixed_existing_and_new_keys() {
385 let mut txn = create_test_transaction();
386 let mut txn = FlowTransaction::deferred(
387 &mut txn,
388 CommitVersion(1),
389 Catalog::testing(),
390 Interceptors::new(),
391 Clock::Mock(MockClock::from_millis(1000)),
392 );
393 let provider = RowNumberProvider::new(FlowNodeId(1));
394
395 let key1 = test_key("key_1");
397 let key2 = test_key("key_2");
398 let key3 = test_key("key_3");
399
400 let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
401 assert_eq!(rn1.0, 1);
402
403 let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
404 assert_eq!(rn2.0, 2);
405
406 let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
407 assert_eq!(rn3.0, 3);
408
409 let key4 = test_key("key_4");
411 let key5 = test_key("key_5");
412
413 let keys = vec![&key2, &key4, &key1, &key5, &key3];
415
416 let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
417
418 assert_eq!(results.len(), 5);
420
421 assert_eq!(results[0].0.0, 2);
423 assert!(!results[0].1);
424
425 assert_eq!(results[1].0.0, 4);
427 assert!(results[1].1);
428
429 assert_eq!(results[2].0.0, 1);
431 assert!(!results[2].1);
432
433 assert_eq!(results[3].0.0, 5);
435 assert!(results[3].1);
436
437 assert_eq!(results[4].0.0, 3);
439 assert!(!results[4].1);
440
441 let key6 = test_key("key_6");
444 let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
445 assert_eq!(rn6.0, 6);
446 assert!(is_new6);
447
448 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
450 assert_eq!(check_rn4.0, 4);
451 assert!(!is_new4);
452
453 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
454 assert_eq!(check_rn5.0, 5);
455 assert!(!is_new5);
456
457 let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
459 assert_eq!(reverse_key4, Some(key4));
460
461 let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
462 assert_eq!(reverse_key5, Some(key5));
463
464 let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
466 assert_eq!(reverse_key1, Some(key1));
467
468 let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
469 assert_eq!(reverse_key2, Some(key2));
470 }
471}