reifydb_sub_flow/operator/stateful/
row_number.rs1use reifydb_core::{
4 EncodedKey,
5 interface::FlowNodeId,
6 key::{EncodableKey, FlowNodeStateKey},
7 util::{CowVec, encoding::keycode::KeySerializer},
8 value::encoded::{EncodedKeyRange, EncodedValues},
9};
10use reifydb_type::RowNumber;
11
12use crate::{operator::stateful::RawStatefulOperator, transaction::FlowTransaction};
13
14pub struct RowNumberProvider {
24 node: FlowNodeId,
25}
26
27impl RowNumberProvider {
28 pub fn new(node: FlowNodeId) -> Self {
30 Self {
31 node,
32 }
33 }
34
35 pub fn get_or_create_row_number<O: RawStatefulOperator>(
39 &self,
40 txn: &mut FlowTransaction,
41 operator: &O,
42 key: &EncodedKey,
43 ) -> crate::Result<(RowNumber, bool)> {
44 let map_key = self.make_map_key(key);
46 let encoded_map_key = EncodedKey::new(map_key.clone());
47
48 if let Some(existing_row) = operator.state_get(txn, &encoded_map_key)? {
49 let bytes = existing_row.as_ref();
51 if bytes.len() >= 8 {
52 let row_num = u64::from_be_bytes([
53 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
54 ]);
55 return Ok((RowNumber(row_num), false));
56 }
57 }
58
59 let counter = self.load_counter::<O>(txn, operator)?;
61 let new_row_number = RowNumber(counter);
62
63 self.save_counter::<O>(txn, operator, counter + 1)?;
65
66 let row_num_bytes = counter.to_be_bytes().to_vec();
68 operator.state_set(txn, &encoded_map_key, EncodedValues(CowVec::new(row_num_bytes)))?;
69
70 Ok((new_row_number, true))
71 }
72
73 fn load_counter<O: RawStatefulOperator>(&self, txn: &mut FlowTransaction, operator: &O) -> crate::Result<u64> {
75 let key = self.make_counter_key();
76 let encoded_key = EncodedKey::new(key);
77 match operator.state_get(txn, &encoded_key)? {
78 None => Ok(1), Some(state_row) => {
80 let bytes = state_row.as_ref();
82 if bytes.len() >= 8 {
83 Ok(u64::from_be_bytes([
84 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
85 bytes[7],
86 ]))
87 } else {
88 Ok(1)
89 }
90 }
91 }
92 }
93
94 fn save_counter<O: RawStatefulOperator>(
96 &self,
97 txn: &mut FlowTransaction,
98 operator: &O,
99 counter: u64,
100 ) -> crate::Result<()> {
101 let key = self.make_counter_key();
102 let encoded_key = EncodedKey::new(key);
103 let value = EncodedValues(CowVec::new(counter.to_be_bytes().to_vec()));
104 operator.state_set(txn, &encoded_key, value)?;
105 Ok(())
106 }
107
108 fn make_counter_key(&self) -> Vec<u8> {
110 let mut serializer = KeySerializer::new();
111 serializer.extend_u64(self.node.0);
112 serializer.extend_u8(b'C'); serializer.finish()
114 }
115
116 fn make_map_key(&self, key: &EncodedKey) -> Vec<u8> {
118 let mut serializer = KeySerializer::new();
119 serializer.extend_u64(self.node.0);
120 serializer.extend_u8(b'M'); serializer.extend_bytes(key.as_ref());
122 serializer.finish()
123 }
124
125 pub fn remove_by_prefix<O: RawStatefulOperator>(
128 &self,
129 txn: &mut FlowTransaction,
130 operator: &O,
131 key_prefix: &[u8],
132 ) -> crate::Result<()> {
133 let mut prefix = Vec::new();
135 let mut serializer = KeySerializer::new();
136 serializer.extend_u64(self.node.0);
137 serializer.extend_u8(b'M'); prefix.extend_from_slice(&serializer.finish());
139 prefix.extend_from_slice(key_prefix);
140
141 let state_prefix = FlowNodeStateKey::new(operator.id(), prefix.clone());
143 let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
144
145 let keys_to_remove: Vec<_> = txn.range(full_range)?.map(|multi| multi.key).collect();
147
148 for key in keys_to_remove {
149 txn.remove(&key)?;
150 }
151
152 Ok(())
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use reifydb_core::CommitVersion;
159
160 use super::*;
161 use crate::operator::stateful::test_utils::test::*;
162
163 #[test]
166 fn test_first_row_number() {
167 let mut txn = create_test_transaction();
168 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
169 let operator = TestOperator::simple(FlowNodeId(1));
170 let provider = RowNumberProvider::new(FlowNodeId(1));
171
172 let key = test_key("first");
173 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
174
175 assert_eq!(row_num.0, 1);
176 assert!(is_new);
177 }
178
179 #[test]
180 fn test_duplicate_key_same_row_number() {
181 let mut txn = create_test_transaction();
182 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
183 let operator = TestOperator::simple(FlowNodeId(1));
184 let provider = RowNumberProvider::new(FlowNodeId(1));
185
186 let key = test_key("duplicate");
187
188 let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
190 assert_eq!(row_num1.0, 1);
191 assert!(is_new1);
192
193 let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
195 assert_eq!(row_num2.0, 1);
196 assert!(!is_new2);
197
198 assert_eq!(row_num1, row_num2);
200 }
201
202 #[test]
203 fn test_sequential_row_numbers() {
204 let mut txn = create_test_transaction();
205 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
206 let operator = TestOperator::simple(FlowNodeId(1));
207 let provider = RowNumberProvider::new(FlowNodeId(1));
208
209 for i in 1..=5 {
211 let key = test_key(&format!("key_{}", i));
212 let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
213
214 assert_eq!(row_num.0, i as u64);
215 assert!(is_new);
216 }
217 }
218
219 #[test]
220 fn test_mixed_new_and_existing() {
221 let mut txn = create_test_transaction();
222 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
223 let operator = TestOperator::simple(FlowNodeId(1));
224 let provider = RowNumberProvider::new(FlowNodeId(1));
225
226 let key1 = test_key("mixed_1");
228 let key2 = test_key("mixed_2");
229 let key3 = test_key("mixed_3");
230
231 let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &operator, &key1).unwrap();
233 let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &operator, &key2).unwrap();
234 let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &operator, &key3).unwrap();
235
236 assert_eq!(rn1.0, 1);
237 assert!(new1);
238 assert_eq!(rn2.0, 2);
239 assert!(new2);
240 assert_eq!(rn3.0, 3);
241 assert!(new3);
242
243 let key4 = test_key("mixed_4");
245 let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &operator, &key2).unwrap();
246 let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &operator, &key4).unwrap();
247 let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &operator, &key1).unwrap();
248
249 assert_eq!(rn2_again.0, 2);
250 assert!(!new2_again);
251 assert_eq!(rn4.0, 4); assert!(new4);
253 assert_eq!(rn1_again.0, 1);
254 assert!(!new1_again);
255 }
256
257 #[test]
258 fn test_multiple_providers_isolated() {
259 let mut txn = create_test_transaction();
260 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
261 let operator1 = TestOperator::simple(FlowNodeId(1));
262 let operator2 = TestOperator::simple(FlowNodeId(2));
263 let provider1 = RowNumberProvider::new(FlowNodeId(1));
264 let provider2 = RowNumberProvider::new(FlowNodeId(2));
265
266 let key = test_key("shared_key");
267
268 let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &operator1, &key).unwrap();
270 let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &operator2, &key).unwrap();
271
272 assert_eq!(rn1.0, 1);
273 assert_eq!(rn2.0, 1);
274
275 let key2 = test_key("key2");
277 let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &operator1, &key2).unwrap();
278 assert_eq!(rn1_2.0, 2);
279
280 let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &operator2, &key2).unwrap();
282 assert_eq!(rn2_2.0, 2);
283 }
284
285 #[test]
286 fn test_counter_persistence() {
287 let mut txn = create_test_transaction();
288 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
289 let operator = TestOperator::simple(FlowNodeId(1));
290 let provider = RowNumberProvider::new(FlowNodeId(1));
291
292 for i in 1..=3 {
294 let key = test_key(&format!("persist_{}", i));
295 let (rn, _) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
296 assert_eq!(rn.0, i as u64);
297 }
298
299 let new_key = test_key("persist_new");
301 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &new_key).unwrap();
302
303 assert_eq!(rn.0, 4);
305 assert!(is_new);
306 }
307
308 #[test]
309 fn test_large_row_numbers() {
310 let mut txn = create_test_transaction();
311 let mut txn = FlowTransaction::new(&mut txn, CommitVersion(1));
312 let operator = TestOperator::simple(FlowNodeId(1));
313 let provider = RowNumberProvider::new(FlowNodeId(1));
314
315 for i in 1..=1000 {
317 let key = test_key(&format!("large_{}", i));
318 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
319 assert_eq!(rn.0, i as u64);
320 assert!(is_new);
321 }
322
323 let key = test_key("large_1");
325 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
326 assert_eq!(rn.0, 1);
327 assert!(!is_new);
328
329 let key = test_key("large_1001");
331 let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &operator, &key).unwrap();
332 assert_eq!(rn.0, 1001);
333 assert!(is_new);
334 }
335}