1use std::iter::once;
4
5use reifydb_core::{
6 encoded::{
7 key::{EncodedKey, EncodedKeyRange},
8 row::EncodedRow,
9 },
10 interface::catalog::flow::FlowNodeId,
11 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
12 util::encoding::keycode::serializer::KeySerializer,
13};
14use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
15
16use crate::{
17 operator::stateful::{
18 counter::{Counter, CounterDirection},
19 utils::{internal_state_get, internal_state_set},
20 },
21 transaction::FlowTransaction,
22};
23
24pub struct RowNumberProvider {
34 node: FlowNodeId,
35 counter: Counter,
36}
37
38impl RowNumberProvider {
39 pub fn new(node: FlowNodeId) -> Self {
41 Self {
42 node,
43 counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
44 }
45 }
46
47 pub fn get_or_create_row_numbers<'a, I>(
51 &self,
52 txn: &mut FlowTransaction,
53 keys: I,
54 ) -> Result<Vec<(RowNumber, bool)>>
55 where
56 I: IntoIterator<Item = &'a EncodedKey>,
57 {
58 let mut results = Vec::new();
59
60 for key in keys {
61 let map_key = self.make_map_key(key);
62
63 if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
64 let bytes = existing_row.as_slice();
65 if bytes.len() >= 8 {
66 let row_num = u64::from_be_bytes([
67 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
68 bytes[7],
69 ]);
70 results.push((RowNumber(row_num), false));
71 continue;
72 }
73 }
74
75 let new_row_number = self.counter.next(txn)?;
76
77 let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
79 internal_state_set(self.node, txn, &map_key, EncodedRow(CowVec::new(row_num_bytes)))?;
80
81 let reverse_key = self.make_reverse_map_key(new_row_number);
83 internal_state_set(
84 self.node,
85 txn,
86 &reverse_key,
87 EncodedRow(CowVec::new(key.as_ref().to_vec())),
88 )?;
89
90 results.push((new_row_number, true));
91 }
92
93 Ok(results)
94 }
95
96 pub fn get_or_create_row_number(
100 &self,
101 txn: &mut FlowTransaction,
102 key: &EncodedKey,
103 ) -> Result<(RowNumber, bool)> {
104 Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
105 }
106
107 pub fn get_key_for_row_number(
109 &self,
110 txn: &mut FlowTransaction,
111 row_number: RowNumber,
112 ) -> Result<Option<EncodedKey>> {
113 let reverse_key = self.make_reverse_map_key(row_number);
114 if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
115 Ok(Some(EncodedKey::new(key_bytes.to_vec())))
116 } else {
117 Ok(None)
118 }
119 }
120
121 fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
123 let mut serializer = KeySerializer::new();
124 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
126 EncodedKey::new(serializer.finish())
127 }
128
129 fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
131 let mut serializer = KeySerializer::new();
132 serializer.extend_u8(b'R'); serializer.extend_u64(row_number.0);
134 EncodedKey::new(serializer.finish())
135 }
136
137 pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> Result<()> {
140 let mut prefix = Vec::new();
142 let mut serializer = KeySerializer::new();
143 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
145 prefix.extend_from_slice(key_prefix);
146
147 let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
148 let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
149
150 let keys_to_remove = {
151 let mut stream = txn.range(full_range, 1024);
152 let mut keys = Vec::new();
153 while let Some(result) = stream.next() {
154 let multi = result?;
155 keys.push(multi.key);
156 }
157 keys
158 };
159
160 for key in keys_to_remove {
161 txn.remove(&key)?;
162 }
163
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169pub mod tests {
170 use reifydb_catalog::catalog::Catalog;
171 use reifydb_core::common::CommitVersion;
172 use reifydb_transaction::interceptor::interceptors::Interceptors;
173
174 use super::*;
175 use crate::operator::stateful::test_utils::test::*;
176
177 #[test]
178 fn test_first_row_number() {
179 let mut txn = create_test_transaction();
180 let mut txn =
181 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
182 let provider = RowNumberProvider::new(FlowNodeId(1));
183
184 let key = test_key("first");
185 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
186
187 assert_eq!(row_num.0, 1);
188 assert!(is_new);
189 }
190
191 #[test]
192 fn test_duplicate_key_same_row_number() {
193 let mut txn = create_test_transaction();
194 let mut txn =
195 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196 let provider = RowNumberProvider::new(FlowNodeId(1));
197
198 let key = test_key("duplicate");
199
200 let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
202 assert_eq!(row_num1.0, 1);
203 assert!(is_new1);
204
205 let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
207 assert_eq!(row_num2.0, 1);
208 assert!(!is_new2);
209
210 assert_eq!(row_num1, row_num2);
212 }
213
214 #[test]
215 fn test_sequential_row_numbers() {
216 let mut txn = create_test_transaction();
217 let mut txn =
218 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
219 let provider = RowNumberProvider::new(FlowNodeId(1));
220
221 for i in 1..=5 {
223 let key = test_key(&format!("key_{}", i));
224 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
225
226 assert_eq!(row_num.0, i as u64);
227 assert!(is_new);
228 }
229 }
230
231 #[test]
232 fn test_mixed_new_and_existing() {
233 let mut txn = create_test_transaction();
234 let mut txn =
235 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
236 let provider = RowNumberProvider::new(FlowNodeId(1));
237
238 let key1 = test_key("mixed_1");
240 let key2 = test_key("mixed_2");
241 let key3 = test_key("mixed_3");
242
243 let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
245 let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
246 let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
247
248 assert_eq!(rn1.0, 1);
249 assert!(new1);
250 assert_eq!(rn2.0, 2);
251 assert!(new2);
252 assert_eq!(rn3.0, 3);
253 assert!(new3);
254
255 let key4 = test_key("mixed_4");
257 let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
258 let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
259 let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
260
261 assert_eq!(rn2_again.0, 2);
262 assert!(!new2_again);
263 assert_eq!(rn4.0, 4); assert!(new4);
265 assert_eq!(rn1_again.0, 1);
266 assert!(!new1_again);
267 }
268
269 #[test]
270 fn test_multiple_providers_isolated() {
271 let mut txn = create_test_transaction();
272 let mut txn =
273 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
274 let provider1 = RowNumberProvider::new(FlowNodeId(1));
275 let provider2 = RowNumberProvider::new(FlowNodeId(2));
276
277 let key = test_key("shared_key");
278
279 let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
281 let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
282
283 assert_eq!(rn1.0, 1);
284 assert_eq!(rn2.0, 1);
285
286 let key2 = test_key("key2");
288 let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
289 assert_eq!(rn1_2.0, 2);
290
291 let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
293 assert_eq!(rn2_2.0, 2);
294 }
295
296 #[test]
297 fn test_counter_persistence() {
298 let mut txn = create_test_transaction();
299 let mut txn =
300 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
301 let provider = RowNumberProvider::new(FlowNodeId(1));
302
303 for i in 1..=3 {
305 let key = test_key(&format!("persist_{}", i));
306 let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
307 assert_eq!(rn.0, i as u64);
308 }
309
310 let new_key = test_key("persist_new");
312 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
313
314 assert_eq!(rn.0, 4);
316 assert!(is_new);
317 }
318
319 #[test]
320 fn test_large_row_numbers() {
321 let mut txn = create_test_transaction();
322 let mut txn =
323 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
324 let provider = RowNumberProvider::new(FlowNodeId(1));
325
326 for i in 1..=1000 {
328 let key = test_key(&format!("large_{}", i));
329 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
330 assert_eq!(rn.0, i as u64);
331 assert!(is_new);
332 }
333
334 let key = test_key("large_1");
336 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
337 assert_eq!(rn.0, 1);
338 assert!(!is_new);
339
340 let key = test_key("large_1001");
342 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
343 assert_eq!(rn.0, 1001);
344 assert!(is_new);
345 }
346
347 #[test]
348 fn test_mixed_existing_and_new_keys() {
349 let mut txn = create_test_transaction();
350 let mut txn =
351 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
352 let provider = RowNumberProvider::new(FlowNodeId(1));
353
354 let key1 = test_key("key_1");
356 let key2 = test_key("key_2");
357 let key3 = test_key("key_3");
358
359 let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
360 assert_eq!(rn1.0, 1);
361
362 let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
363 assert_eq!(rn2.0, 2);
364
365 let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
366 assert_eq!(rn3.0, 3);
367
368 let key4 = test_key("key_4");
370 let key5 = test_key("key_5");
371
372 let keys = vec![&key2, &key4, &key1, &key5, &key3];
374
375 let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
376
377 assert_eq!(results.len(), 5);
379
380 assert_eq!(results[0].0.0, 2);
382 assert!(!results[0].1);
383
384 assert_eq!(results[1].0.0, 4);
386 assert!(results[1].1);
387
388 assert_eq!(results[2].0.0, 1);
390 assert!(!results[2].1);
391
392 assert_eq!(results[3].0.0, 5);
394 assert!(results[3].1);
395
396 assert_eq!(results[4].0.0, 3);
398 assert!(!results[4].1);
399
400 let key6 = test_key("key_6");
403 let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
404 assert_eq!(rn6.0, 6);
405 assert!(is_new6);
406
407 let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
409 assert_eq!(check_rn4.0, 4);
410 assert!(!is_new4);
411
412 let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
413 assert_eq!(check_rn5.0, 5);
414 assert!(!is_new5);
415
416 let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
418 assert_eq!(reverse_key4, Some(key4));
419
420 let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
421 assert_eq!(reverse_key5, Some(key5));
422
423 let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
425 assert_eq!(reverse_key1, Some(key1));
426
427 let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
428 assert_eq!(reverse_key2, Some(key2));
429 }
430}