Skip to main content

reifydb_transaction/multi/transaction/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4// This file includes and modifies code from the skipdb project (https://github.com/al8n/skipdb),
5// originally licensed under the Apache License, Version 2.0.
6// Original copyright:
7//   Copyright (c) 2024 Al Liu
8//
9// The original Apache License can be found at:
10//   http://www.apache.org/licenses/LICENSE-2.0
11
12use reifydb_core::{
13	common::CommitVersion,
14	encoded::key::{EncodedKey, EncodedKeyRange},
15	interface::store::{MultiVersionBatch, MultiVersionValues},
16};
17use reifydb_type::Result;
18
19use super::{MultiTransaction, manager::TransactionManagerQuery, version::StandardVersionProvider};
20use crate::multi::types::TransactionValue;
21
22pub struct MultiReadTransaction {
23	pub(crate) engine: MultiTransaction,
24	pub(crate) tm: TransactionManagerQuery<StandardVersionProvider>,
25}
26
27impl MultiReadTransaction {
28	pub fn new(engine: MultiTransaction, version: Option<CommitVersion>) -> Result<Self> {
29		let tm = engine.tm.query(version)?;
30		Ok(Self {
31			engine,
32			tm,
33		})
34	}
35}
36
37impl MultiReadTransaction {
38	pub fn version(&self) -> CommitVersion {
39		self.tm.version()
40	}
41
42	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) {
43		self.tm.read_as_of_version_exclusive(version);
44	}
45
46	pub fn read_as_of_version_inclusive(&mut self, version: CommitVersion) {
47		self.read_as_of_version_exclusive(CommitVersion(version.0 + 1))
48	}
49
50	pub fn get(&self, key: &EncodedKey) -> Result<Option<TransactionValue>> {
51		let version = self.tm.version();
52		Ok(self.engine.get(key, version)?.map(Into::into))
53	}
54
55	pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
56		let version = self.tm.version();
57		Ok(self.engine.contains_key(key, version)?)
58	}
59
60	pub fn scan(&self) -> Result<MultiVersionBatch> {
61		let items: Vec<_> = self.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>()?;
62		Ok(MultiVersionBatch {
63			items,
64			has_more: false,
65		})
66	}
67
68	pub fn scan_rev(&self) -> Result<MultiVersionBatch> {
69		let items: Vec<_> = self.range_rev(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>()?;
70		Ok(MultiVersionBatch {
71			items,
72			has_more: false,
73		})
74	}
75
76	pub fn prefix(&self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
77		let items: Vec<_> = self.range(EncodedKeyRange::prefix(prefix), 1024).collect::<Result<Vec<_>>>()?;
78		Ok(MultiVersionBatch {
79			items,
80			has_more: false,
81		})
82	}
83
84	pub fn prefix_rev(&self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
85		let items: Vec<_> =
86			self.range_rev(EncodedKeyRange::prefix(prefix), 1024).collect::<Result<Vec<_>>>()?;
87		Ok(MultiVersionBatch {
88			items,
89			has_more: false,
90		})
91	}
92
93	/// Create a streaming iterator for forward range queries.
94	///
95	/// This properly handles high version density by scanning until batch_size
96	/// unique logical keys are collected. The stream yields individual entries
97	/// and maintains cursor state internally.
98	pub fn range(
99		&self,
100		range: EncodedKeyRange,
101		batch_size: usize,
102	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
103		let version = self.tm.version();
104		Box::new(self.engine.store.range(range, version, batch_size))
105	}
106
107	/// Create a streaming iterator for reverse range queries.
108	///
109	/// This properly handles high version density by scanning until batch_size
110	/// unique logical keys are collected. The stream yields individual entries
111	/// in reverse key order and maintains cursor state internally.
112	pub fn range_rev(
113		&self,
114		range: EncodedKeyRange,
115		batch_size: usize,
116	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
117		let version = self.tm.version();
118		Box::new(self.engine.store.range_rev(range, version, batch_size))
119	}
120}