1use std::iter::once;
4
5use reifydb_core::{
6 encoded::{
7 encoded::EncodedValues,
8 key::{EncodedKey, EncodedKeyRange},
9 },
10 interface::catalog::flow::FlowNodeId,
11 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12 util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17 operator::stateful::{
18 counter::{Counter, CounterDirection},
19 utils::{internal_state_get, internal_state_set},
20 },
21 transaction::FlowTransaction,
22};
23
24pub struct RowNumberProvider {
34 node: FlowNodeId,
35 counter: Counter,
36}
37
38impl RowNumberProvider {
39 pub fn new(node: FlowNodeId) -> Self {
41 Self {
42 node,
43 counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44 }
45 }
46
47 pub fn get_or_create_row_numbers<'a, I>(
51 &self,
52 txn: &mut FlowTransaction,
53 keys: I,
54 ) -> reifydb_type::Result<Vec<(RowNumber, bool)>>
55 where
56 I: IntoIterator<Item = &'a EncodedKey>,
57 {
58 let mut results = Vec::new();
59
60 for key in keys {
61 let map_key = self.make_map_key(key);
62
63 if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64 let bytes = existing_row.as_ref();
65 if bytes.len() >= 8 {
66 let row_num = u64::from_be_bytes([
67 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68 bytes[7],
69 ]);
70 results.push((RowNumber(row_num), false));
71 continue;
72 }
73 }
74
75 let new_row_number = self.counter.next(txn)?;
76
77 let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79 internal_state_set(self.node, txn, &map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
80
81 let reverse_key = self.make_reverse_map_key(new_row_number);
83 internal_state_set(
84 self.node,
85 txn,
86 &reverse_key,
87 EncodedValues(CowVec::new(key.as_ref().to_vec())),
88 )?;
89
90 results.push((new_row_number, true));
91 }
92
93 Ok(results)
94 }
95
96 pub fn get_or_create_row_number(
100 &self,
101 txn: &mut FlowTransaction,
102 key: &EncodedKey,
103 ) -> reifydb_type::Result<(RowNumber, bool)> {
104 Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105 }
106
107 pub fn get_key_for_row_number(
109 &self,
110 txn: &mut FlowTransaction,
111 row_number: RowNumber,
112 ) -> reifydb_type::Result<Option<EncodedKey>> {
113 let reverse_key = self.make_reverse_map_key(row_number);
114 if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115 Ok(Some(EncodedKey::new(key_bytes.as_ref().to_vec())))
116 } else {
117 Ok(None)
118 }
119 }
120
121 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123 let mut serializer = KeySerializer::new();
124 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
126 EncodedKey::new(serializer.finish())
127 }
128
129 fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131 let mut serializer = KeySerializer::new();
132 serializer.extend_u8(b'R'); serializer.extend_u64(row_number.0);
134 EncodedKey::new(serializer.finish())
135 }
136
137 pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> reifydb_type::Result<()> {
140 let mut prefix = Vec::new();
142 let mut serializer = KeySerializer::new();
143 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
145 prefix.extend_from_slice(key_prefix);
146
147 let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148 let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150 let keys_to_remove = {
151 let mut stream = txn.range(full_range, 1024);
152 let mut keys = Vec::new();
153 while let Some(result) = stream.next() {
154 let multi = result?;
155 keys.push(multi.key);
156 }
157 keys
158 };
159
160 for key in keys_to_remove {
161 txn.remove(&key)?;
162 }
163
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169pub mod tests {
170 use reifydb_catalog::catalog::Catalog;
171 use reifydb_core::common::CommitVersion;
172 use reifydb_transaction::interceptor::interceptors::Interceptors;
173
174 use super::*;
175 use crate::operator::stateful::test_utils::test::*;
176
177 #[test]
178 fn test_first_row_number() {
179 let mut txn = create_test_transaction();
180 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
181 let provider = RowNumberProvider::new(FlowNodeId(1));
182
183 let key = test_key("first");
184 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
185
186 assert_eq!(row_num.0, 1);
187 assert!(is_new);
188 }
189
190 #[test]
191 fn test_duplicate_key_same_row_number() {
192 let mut txn = create_test_transaction();
193 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
194 let provider = RowNumberProvider::new(FlowNodeId(1));
195
196 let key = test_key("duplicate");
197
198 let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
200 assert_eq!(row_num1.0, 1);
201 assert!(is_new1);
202
203 let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
205 assert_eq!(row_num2.0, 1);
206 assert!(!is_new2);
207
208 assert_eq!(row_num1, row_num2);
210 }
211
212 #[test]
213 fn test_sequential_row_numbers() {
214 let mut txn = create_test_transaction();
215 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
216 let provider = RowNumberProvider::new(FlowNodeId(1));
217
218 for i in 1..=5 {
220 let key = test_key(&format!("key_{}", i));
221 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
222
223 assert_eq!(row_num.0, i as u64);
224 assert!(is_new);
225 }
226 }
227
228 #[test]
229 fn test_mixed_new_and_existing() {
230 let mut txn = create_test_transaction();
231 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
232 let provider = RowNumberProvider::new(FlowNodeId(1));
233
234 let key1 = test_key("mixed_1");
236 let key2 = test_key("mixed_2");
237 let key3 = test_key("mixed_3");
238
239 let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
241 let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
242 let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
243
244 assert_eq!(rn1.0, 1);
245 assert!(new1);
246 assert_eq!(rn2.0, 2);
247 assert!(new2);
248 assert_eq!(rn3.0, 3);
249 assert!(new3);
250
251 let key4 = test_key("mixed_4");
253 let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
254 let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
255 let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
256
257 assert_eq!(rn2_again.0, 2);
258 assert!(!new2_again);
259 assert_eq!(rn4.0, 4); assert!(new4);
261 assert_eq!(rn1_again.0, 1);
262 assert!(!new1_again);
263 }
264
265 #[test]
266 fn test_multiple_providers_isolated() {
267 let mut txn = create_test_transaction();
268 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
269 let provider1 = RowNumberProvider::new(FlowNodeId(1));
270 let provider2 = RowNumberProvider::new(FlowNodeId(2));
271
272 let key = test_key("shared_key");
273
274 let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
276 let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
277
278 assert_eq!(rn1.0, 1);
279 assert_eq!(rn2.0, 1);
280
281 let key2 = test_key("key2");
283 let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
284 assert_eq!(rn1_2.0, 2);
285
286 let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
288 assert_eq!(rn2_2.0, 2);
289 }
290
291 #[test]
292 fn test_counter_persistence() {
293 let mut txn = create_test_transaction();
294 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
295 let provider = RowNumberProvider::new(FlowNodeId(1));
296
297 for i in 1..=3 {
299 let key = test_key(&format!("persist_{}", i));
300 let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
301 assert_eq!(rn.0, i as u64);
302 }
303
304 let new_key = test_key("persist_new");
306 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
307
308 assert_eq!(rn.0, 4);
310 assert!(is_new);
311 }
312
313 #[test]
314 fn test_large_row_numbers() {
315 let mut txn = create_test_transaction();
316 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
317 let provider = RowNumberProvider::new(FlowNodeId(1));
318
319 for i in 1..=1000 {
321 let key = test_key(&format!("large_{}", i));
322 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
323 assert_eq!(rn.0, i as u64);
324 assert!(is_new);
325 }
326
327 let key = test_key("large_1");
329 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
330 assert_eq!(rn.0, 1);
331 assert!(!is_new);
332
333 let key = test_key("large_1001");
335 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
336 assert_eq!(rn.0, 1001);
337 assert!(is_new);
338 }
339
340 #[test]
341 fn test_mixed_existing_and_new_keys() {
342 let mut txn = create_test_transaction();
343 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
344 let provider = RowNumberProvider::new(FlowNodeId(1));
345
346 let key1 = test_key("key_1");
348 let key2 = test_key("key_2");
349 let key3 = test_key("key_3");
350
351 let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
352 assert_eq!(rn1.0, 1);
353
354 let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
355 assert_eq!(rn2.0, 2);
356
357 let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
358 assert_eq!(rn3.0, 3);
359
360 let key4 = test_key("key_4");
362 let key5 = test_key("key_5");
363
364 let keys = vec![&key2, &key4, &key1, &key5, &key3];
366
367 let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
368
369 assert_eq!(results.len(), 5);
371
372 assert_eq!(results[0].0.0, 2);
374 assert!(!results[0].1);
375
376 assert_eq!(results[1].0.0, 4);
378 assert!(results[1].1);
379
380 assert_eq!(results[2].0.0, 1);
382 assert!(!results[2].1);
383
384 assert_eq!(results[3].0.0, 5);
386 assert!(results[3].1);
387
388 assert_eq!(results[4].0.0, 3);
390 assert!(!results[4].1);
391
392 let key6 = test_key("key_6");
395 let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
396 assert_eq!(rn6.0, 6);
397 assert!(is_new6);
398
399 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
401 assert_eq!(check_rn4.0, 4);
402 assert!(!is_new4);
403
404 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
405 assert_eq!(check_rn5.0, 5);
406 assert!(!is_new5);
407
408 let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
410 assert_eq!(reverse_key4, Some(key4));
411
412 let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
413 assert_eq!(reverse_key5, Some(key5));
414
415 let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
417 assert_eq!(reverse_key1, Some(key1));
418
419 let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
420 assert_eq!(reverse_key2, Some(key2));
421 }
422}