reifydb_sub_flow/transaction/
read.rs1use reifydb_core::{
5 EncodedKey, EncodedKeyRange,
6 interface::{BoxedMultiVersionIter, MultiVersionQueryTransaction},
7 value::encoded::EncodedValues,
8};
9
10use super::{FlowTransaction, iter_range::FlowRangeIter};
11
12impl FlowTransaction {
13 pub fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
15 self.metrics.increment_reads();
16
17 if self.pending.is_removed(key) {
19 return Ok(None);
20 }
21
22 if let Some(value) = self.pending.get(key) {
24 return Ok(Some(value.clone()));
25 }
26
27 match self.query.get(key)? {
28 Some(multi) => Ok(Some(multi.values)),
29 None => Ok(None),
30 }
31 }
32
33 pub fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
35 self.metrics.increment_reads();
36
37 if self.pending.is_removed(key) {
39 return Ok(false);
40 }
41
42 if self.pending.get(key).is_some() {
44 return Ok(true);
45 }
46
47 self.query.contains_key(key)
48 }
49
50 pub fn scan(&mut self) -> crate::Result<BoxedMultiVersionIter> {
52 self.range(EncodedKeyRange::all())
53 }
54
55 pub fn range(&mut self, range: EncodedKeyRange) -> crate::Result<BoxedMultiVersionIter> {
57 self.metrics.increment_reads();
58
59 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
61 let committed = self.query.range(range)?;
62
63 Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
64 }
65
66 pub fn range_batched(
68 &mut self,
69 range: EncodedKeyRange,
70 batch_size: u64,
71 ) -> crate::Result<BoxedMultiVersionIter> {
72 self.metrics.increment_reads();
73
74 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
76 let committed = self.query.range_batched(range, batch_size)?;
77
78 Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
79 }
80
81 pub fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
83 self.metrics.increment_reads();
84
85 let range = EncodedKeyRange::prefix(prefix);
87 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
88 let committed = self.query.prefix(prefix)?;
89
90 Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use std::collections::Bound;
97
98 use reifydb_core::{
99 CommitVersion, CowVec, EncodedKey, EncodedKeyRange,
100 interface::{Engine, MultiVersionCommandTransaction, MultiVersionQueryTransaction},
101 value::encoded::EncodedValues,
102 };
103
104 use super::*;
105 use crate::operator::stateful::test_utils::test::create_test_transaction;
106
107 fn make_key(s: &str) -> EncodedKey {
108 EncodedKey::new(s.as_bytes().to_vec())
109 }
110
111 fn make_value(s: &str) -> EncodedValues {
112 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
113 }
114
115 #[test]
116 fn test_get_from_pending() {
117 let parent = create_test_transaction();
118 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
119
120 let key = make_key("key1");
121 let value = make_value("value1");
122
123 txn.set(&key, value.clone()).unwrap();
124
125 let result = txn.get(&key).unwrap();
127 assert_eq!(result, Some(value));
128 }
129
130 #[test]
131 fn test_get_from_committed() {
132 use crate::operator::stateful::test_utils::test::create_test_engine;
133 let engine = create_test_engine();
134
135 let key = make_key("key1");
136 let value = make_value("value1");
137
138 {
140 let mut cmd_txn = engine.begin_command().unwrap();
141 cmd_txn.set(&key, value.clone()).unwrap();
142 cmd_txn.commit().unwrap();
143 }
144
145 let parent = engine.begin_command().unwrap();
147 let version = parent.version();
148
149 let mut txn = FlowTransaction::new(&parent, version);
151
152 let result = txn.get(&key).unwrap();
154 assert_eq!(result, Some(value));
155 }
156
157 #[test]
158 fn test_get_pending_shadows_committed() {
159 let mut parent = create_test_transaction();
160
161 let key = make_key("key1");
162 parent.set(&key, make_value("old")).unwrap();
163 let version = parent.version();
164
165 let mut txn = FlowTransaction::new(&parent, version);
166
167 let new_value = make_value("new");
169 txn.set(&key, new_value.clone()).unwrap();
170
171 let result = txn.get(&key).unwrap();
173 assert_eq!(result, Some(new_value));
174 }
175
176 #[test]
177 fn test_get_removed_returns_none() {
178 let mut parent = create_test_transaction();
179
180 let key = make_key("key1");
181 parent.set(&key, make_value("value1")).unwrap();
182 let version = parent.version();
183
184 let mut txn = FlowTransaction::new(&parent, version);
185
186 txn.remove(&key).unwrap();
188
189 let result = txn.get(&key).unwrap();
191 assert_eq!(result, None);
192 }
193
194 #[test]
195 fn test_get_nonexistent_key() {
196 let parent = create_test_transaction();
197 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
198
199 let result = txn.get(&make_key("missing")).unwrap();
200 assert_eq!(result, None);
201 }
202
203 #[test]
204 fn test_get_increments_reads_metric() {
205 let parent = create_test_transaction();
206 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
207
208 assert_eq!(txn.metrics().reads, 0);
209
210 txn.get(&make_key("key1")).unwrap();
211 assert_eq!(txn.metrics().reads, 1);
212
213 txn.get(&make_key("key2")).unwrap();
214 assert_eq!(txn.metrics().reads, 2);
215 }
216
217 #[test]
218 fn test_contains_key_pending() {
219 let parent = create_test_transaction();
220 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
221
222 let key = make_key("key1");
223 txn.set(&key, make_value("value1")).unwrap();
224
225 assert!(txn.contains_key(&key).unwrap());
226 }
227
228 #[test]
229 fn test_contains_key_committed() {
230 use crate::operator::stateful::test_utils::test::create_test_engine;
231 let engine = create_test_engine();
232
233 let key = make_key("key1");
234
235 {
237 let mut cmd_txn = engine.begin_command().unwrap();
238 cmd_txn.set(&key, make_value("value1")).unwrap();
239 cmd_txn.commit().unwrap();
240 }
241
242 let parent = engine.begin_command().unwrap();
244 let version = parent.version();
245 let mut txn = FlowTransaction::new(&parent, version);
246
247 assert!(txn.contains_key(&key).unwrap());
248 }
249
250 #[test]
251 fn test_contains_key_removed_returns_false() {
252 let mut parent = create_test_transaction();
253
254 let key = make_key("key1");
255 parent.set(&key, make_value("value1")).unwrap();
256 let version = parent.version();
257
258 let mut txn = FlowTransaction::new(&parent, version);
259 txn.remove(&key).unwrap();
260
261 assert!(!txn.contains_key(&key).unwrap());
262 }
263
264 #[test]
265 fn test_contains_key_nonexistent() {
266 let parent = create_test_transaction();
267 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
268
269 assert!(!txn.contains_key(&make_key("missing")).unwrap());
270 }
271
272 #[test]
273 fn test_contains_key_increments_reads_metric() {
274 let parent = create_test_transaction();
275 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
276
277 assert_eq!(txn.metrics().reads, 0);
278
279 txn.contains_key(&make_key("key1")).unwrap();
280 assert_eq!(txn.metrics().reads, 1);
281
282 txn.contains_key(&make_key("key2")).unwrap();
283 assert_eq!(txn.metrics().reads, 2);
284 }
285
286 #[test]
287 fn test_scan_empty() {
288 let parent = create_test_transaction();
289 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
290
291 let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
292 assert!(iter.next().is_none());
293 }
294
295 #[test]
296 fn test_scan_only_pending() {
297 let parent = create_test_transaction();
298 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
299
300 txn.set(&make_key("b"), make_value("2")).unwrap();
301 txn.set(&make_key("a"), make_value("1")).unwrap();
302 txn.set(&make_key("c"), make_value("3")).unwrap();
303
304 let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
305 let items: Vec<_> = iter.by_ref().collect();
306
307 assert_eq!(items.len(), 3);
309 assert_eq!(items[0].key, make_key("a"));
310 assert_eq!(items[1].key, make_key("b"));
311 assert_eq!(items[2].key, make_key("c"));
312 }
313
314 #[test]
315 fn test_scan_filters_removes() {
316 let parent = create_test_transaction();
317 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
318
319 txn.set(&make_key("a"), make_value("1")).unwrap();
320 txn.remove(&make_key("b")).unwrap();
321 txn.set(&make_key("c"), make_value("3")).unwrap();
322
323 let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
324 let items: Vec<_> = iter.by_ref().collect();
325
326 assert_eq!(items.len(), 2);
328 assert_eq!(items[0].key, make_key("a"));
329 assert_eq!(items[1].key, make_key("c"));
330 }
331
332 #[test]
333 fn test_scan_increments_reads_metric() {
334 let parent = create_test_transaction();
335 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
336
337 assert_eq!(txn.metrics().reads, 0);
338 let _ = txn.range(EncodedKeyRange::all()).unwrap();
339 assert_eq!(txn.metrics().reads, 1);
340 }
341
342 #[test]
343 fn test_range_empty() {
344 let parent = create_test_transaction();
345 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
346
347 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
348 let mut iter = txn.range(range).unwrap();
349 assert!(iter.next().is_none());
350 }
351
352 #[test]
353 fn test_range_only_pending() {
354 let parent = create_test_transaction();
355 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
356
357 txn.set(&make_key("a"), make_value("1")).unwrap();
358 txn.set(&make_key("b"), make_value("2")).unwrap();
359 txn.set(&make_key("c"), make_value("3")).unwrap();
360 txn.set(&make_key("d"), make_value("4")).unwrap();
361
362 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
363 let mut iter = txn.range(range).unwrap();
364 let items: Vec<_> = iter.by_ref().collect();
365
366 assert_eq!(items.len(), 2);
368 assert_eq!(items[0].key, make_key("b"));
369 assert_eq!(items[1].key, make_key("c"));
370 }
371
372 #[test]
373 fn test_range_increments_reads_metric() {
374 let parent = create_test_transaction();
375 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
376
377 assert_eq!(txn.metrics().reads, 0);
378
379 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
380 let _ = txn.range(range).unwrap();
381
382 assert_eq!(txn.metrics().reads, 1);
383 }
384
385 #[test]
386 fn test_range_batched_increments_reads_metric() {
387 let parent = create_test_transaction();
388 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
389
390 assert_eq!(txn.metrics().reads, 0);
391
392 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
393 let _ = txn.range_batched(range, 10).unwrap();
394
395 assert_eq!(txn.metrics().reads, 1);
396 }
397
398 #[test]
399 fn test_prefix_empty() {
400 let parent = create_test_transaction();
401 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
402
403 let prefix = make_key("test_");
404 let mut iter = txn.prefix(&prefix).unwrap();
405 assert!(iter.next().is_none());
406 }
407
408 #[test]
409 fn test_prefix_only_pending() {
410 let parent = create_test_transaction();
411 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
412
413 txn.set(&make_key("test_a"), make_value("1")).unwrap();
414 txn.set(&make_key("test_b"), make_value("2")).unwrap();
415 txn.set(&make_key("other_c"), make_value("3")).unwrap();
416
417 let prefix = make_key("test_");
418 let mut iter = txn.prefix(&prefix).unwrap();
419 let items: Vec<_> = iter.by_ref().collect();
420
421 assert_eq!(items.len(), 2);
423 assert_eq!(items[0].key, make_key("test_a"));
424 assert_eq!(items[1].key, make_key("test_b"));
425 }
426
427 #[test]
428 fn test_prefix_increments_reads_metric() {
429 let parent = create_test_transaction();
430 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
431
432 assert_eq!(txn.metrics().reads, 0);
433
434 let prefix = make_key("test_");
435 let _ = txn.prefix(&prefix).unwrap();
436
437 assert_eq!(txn.metrics().reads, 1);
438 }
439
440 #[test]
441 fn test_multiple_read_operations_accumulate_metrics() {
442 let parent = create_test_transaction();
443 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
444
445 txn.get(&make_key("k1")).unwrap();
446 txn.contains_key(&make_key("k2")).unwrap();
447 let _ = txn.range(EncodedKeyRange::all()).unwrap();
448 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
449 let _ = txn.range(range).unwrap();
450
451 assert_eq!(txn.metrics().reads, 4);
452 }
453}