fjall/tx/read_tx.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{snapshot_nonce::SnapshotNonce, TxPartitionHandle};
6use lsm_tree::{AbstractTree, KvPair, UserKey, UserValue};
7use std::ops::RangeBounds;
8
9/// A cross-partition, read-only transaction (snapshot)
10pub struct ReadTransaction {
11 nonce: SnapshotNonce,
12}
13
14impl ReadTransaction {
15 pub(crate) fn new(nonce: SnapshotNonce) -> Self {
16 Self { nonce }
17 }
18
19 /// Retrieves an item from the transaction's state.
20 ///
21 /// # Examples
22 ///
23 /// ```
24 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
25 /// #
26 /// # let folder = tempfile::tempdir()?;
27 /// # let keyspace = Config::new(folder).open_transactional()?;
28 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
29 /// partition.insert("a", "my_value")?;
30 ///
31 /// let tx = keyspace.read_tx();
32 /// let item = tx.get(&partition, "a")?;
33 /// assert_eq!(Some("my_value".as_bytes().into()), item);
34 ///
35 /// partition.insert("b", "my_updated_value")?;
36 ///
37 /// // Repeatable read
38 /// let item = tx.get(&partition, "a")?;
39 /// assert_eq!(Some("my_value".as_bytes().into()), item);
40 /// #
41 /// # Ok::<(), fjall::Error>(())
42 /// ```
43 ///
44 /// # Errors
45 ///
46 /// Will return `Err` if an IO error occurs.
47 pub fn get<K: AsRef<[u8]>>(
48 &self,
49 partition: &TxPartitionHandle,
50 key: K,
51 ) -> crate::Result<Option<UserValue>> {
52 partition
53 .inner
54 .tree
55 .snapshot_at(self.nonce.instant)
56 .get(key)
57 .map_err(Into::into)
58 }
59
60 /// Retrieves the size of an item from the transaction's state.
61 ///
62 /// # Examples
63 ///
64 /// ```
65 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
66 /// #
67 /// # let folder = tempfile::tempdir()?;
68 /// # let keyspace = Config::new(folder).open_transactional()?;
69 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
70 /// partition.insert("a", "my_value")?;
71 ///
72 /// let tx = keyspace.read_tx();
73 /// let item = tx.size_of(&partition, "a")?.unwrap_or_default();
74 /// assert_eq!("my_value".len() as u32, item);
75 ///
76 /// partition.insert("b", "my_updated_value")?;
77 ///
78 /// // Repeatable read
79 /// let item = tx.size_of(&partition, "a")?.unwrap_or_default();
80 /// assert_eq!("my_value".len() as u32, item);
81 /// #
82 /// # Ok::<(), fjall::Error>(())
83 /// ```
84 ///
85 /// # Errors
86 ///
87 /// Will return `Err` if an IO error occurs.
88 pub fn size_of<K: AsRef<[u8]>>(
89 &self,
90 partition: &TxPartitionHandle,
91 key: K,
92 ) -> crate::Result<Option<u32>> {
93 partition
94 .inner
95 .tree
96 .snapshot_at(self.nonce.instant)
97 .size_of(key)
98 .map_err(Into::into)
99 }
100
101 /// Returns `true` if the transaction's state contains the specified key.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
107 /// #
108 /// # let folder = tempfile::tempdir()?;
109 /// # let keyspace = Config::new(folder).open_transactional()?;
110 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
111 /// partition.insert("a", "my_value")?;
112 ///
113 /// let tx = keyspace.read_tx();
114 /// assert!(tx.contains_key(&partition, "a")?);
115 /// #
116 /// # Ok::<(), fjall::Error>(())
117 /// ```
118 ///
119 /// # Errors
120 ///
121 /// Will return `Err` if an IO error occurs.
122 pub fn contains_key<K: AsRef<[u8]>>(
123 &self,
124 partition: &TxPartitionHandle,
125 key: K,
126 ) -> crate::Result<bool> {
127 partition
128 .inner
129 .tree
130 .snapshot_at(self.nonce.instant)
131 .contains_key(key)
132 .map_err(Into::into)
133 }
134
135 /// Returns the first key-value pair in the transaction's state.
136 /// The key in this pair is the minimum key in the transaction's state.
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
142 /// #
143 /// # let folder = tempfile::tempdir()?;
144 /// # let keyspace = Config::new(folder).open_transactional()?;
145 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
146 /// partition.insert("1", "abc")?;
147 /// partition.insert("3", "abc")?;
148 /// partition.insert("5", "abc")?;
149 ///
150 /// let (key, _) = keyspace.read_tx().first_key_value(&partition)?.expect("item should exist");
151 /// assert_eq!(&*key, "1".as_bytes());
152 /// #
153 /// # Ok::<(), fjall::Error>(())
154 /// ```
155 ///
156 /// # Errors
157 ///
158 /// Will return `Err` if an IO error occurs.
159 pub fn first_key_value(&self, partition: &TxPartitionHandle) -> crate::Result<Option<KvPair>> {
160 self.iter(partition).next().transpose()
161 }
162
163 /// Returns the last key-value pair in the transaction's state.
164 /// The key in this pair is the maximum key in the transaction's state.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
170 /// #
171 /// # let folder = tempfile::tempdir()?;
172 /// # let keyspace = Config::new(folder).open_transactional()?;
173 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
174 /// partition.insert("1", "abc")?;
175 /// partition.insert("3", "abc")?;
176 /// partition.insert("5", "abc")?;
177 ///
178 /// let (key, _) = keyspace.read_tx().last_key_value(&partition)?.expect("item should exist");
179 /// assert_eq!(&*key, "5".as_bytes());
180 /// #
181 /// # Ok::<(), fjall::Error>(())
182 /// ```
183 ///
184 /// # Errors
185 ///
186 /// Will return `Err` if an IO error occurs.
187 pub fn last_key_value(&self, partition: &TxPartitionHandle) -> crate::Result<Option<KvPair>> {
188 self.iter(partition).next_back().transpose()
189 }
190
191 /// Scans the entire partition, returning the amount of items.
192 ///
193 /// ###### Caution
194 ///
195 /// This operation scans the entire partition: O(n) complexity!
196 ///
197 /// Never, under any circumstances, use .`len()` == 0 to check
198 /// if the partition is empty, use [`ReadTransaction::is_empty`] instead.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
204 /// #
205 /// # let folder = tempfile::tempdir()?;
206 /// # let keyspace = Config::new(folder).open_transactional()?;
207 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
208 /// partition.insert("a", "my_value")?;
209 /// partition.insert("b", "my_value2")?;
210 ///
211 /// let tx = keyspace.read_tx();
212 /// assert_eq!(2, tx.len(&partition)?);
213 ///
214 /// partition.insert("c", "my_value3")?;
215 ///
216 /// // Repeatable read
217 /// assert_eq!(2, tx.len(&partition)?);
218 ///
219 /// // Start new snapshot
220 /// let tx = keyspace.read_tx();
221 /// assert_eq!(3, tx.len(&partition)?);
222 /// #
223 /// # Ok::<(), fjall::Error>(())
224 /// ```
225 ///
226 /// # Errors
227 ///
228 /// Will return `Err` if an IO error occurs.
229 pub fn len(&self, partition: &TxPartitionHandle) -> crate::Result<usize> {
230 let mut count = 0;
231
232 for kv in self.iter(partition) {
233 let _ = kv?;
234 count += 1;
235 }
236
237 Ok(count)
238 }
239
240 /// Returns `true` if the partition is empty.
241 ///
242 /// This operation has O(log N) complexity.
243 ///
244 /// # Examples
245 ///
246 /// ```
247 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
248 /// #
249 /// # let folder = tempfile::tempdir()?;
250 /// # let keyspace = Config::new(folder).open_transactional()?;
251 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
252 /// assert!(keyspace.read_tx().is_empty(&partition)?);
253 ///
254 /// partition.insert("a", "abc")?;
255 /// assert!(!keyspace.read_tx().is_empty(&partition)?);
256 /// #
257 /// # Ok::<(), fjall::Error>(())
258 /// ```
259 ///
260 /// # Errors
261 ///
262 /// Will return `Err` if an IO error occurs.
263 pub fn is_empty(&self, partition: &TxPartitionHandle) -> crate::Result<bool> {
264 self.first_key_value(partition).map(|x| x.is_none())
265 }
266
267 /// Iterates over the transaction's state.
268 ///
269 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
275 /// #
276 /// # let folder = tempfile::tempdir()?;
277 /// # let keyspace = Config::new(folder).open_transactional()?;
278 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
279 /// partition.insert("a", "abc")?;
280 /// partition.insert("f", "abc")?;
281 /// partition.insert("g", "abc")?;
282 ///
283 /// assert_eq!(3, keyspace.read_tx().iter(&partition).count());
284 /// #
285 /// # Ok::<(), fjall::Error>(())
286 /// ```
287 #[must_use]
288 pub fn iter<'a>(
289 &'a self,
290 partition: &'a TxPartitionHandle,
291 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
292 let iter = partition
293 .inner
294 .tree
295 .iter(Some(self.nonce.instant), None)
296 .map(|item| Ok(item?));
297
298 crate::iter::Iter::new(self.nonce.clone(), iter)
299 }
300
301 /// Iterates over the transaction's state, returning keys only.
302 ///
303 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
304 #[must_use]
305 pub fn keys<'a>(
306 &'a self,
307 partition: &'a TxPartitionHandle,
308 ) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
309 let iter = partition
310 .inner
311 .tree
312 .keys(Some(self.nonce.instant), None)
313 .map(|item| Ok(item?));
314
315 crate::iter::Iter::new(self.nonce.clone(), iter)
316 }
317
318 /// Iterates over the transaction's state, returning values only.
319 ///
320 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
321 #[must_use]
322 pub fn values<'a>(
323 &'a self,
324 partition: &'a TxPartitionHandle,
325 ) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
326 let iter = partition
327 .inner
328 .tree
329 .values(Some(self.nonce.instant), None)
330 .map(|item| Ok(item?));
331
332 crate::iter::Iter::new(self.nonce.clone(), iter)
333 }
334
335 /// Iterates over a range of the transaction's state.
336 ///
337 /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
343 /// #
344 /// # let folder = tempfile::tempdir()?;
345 /// # let keyspace = Config::new(folder).open_transactional()?;
346 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
347 /// partition.insert("a", "abc")?;
348 /// partition.insert("f", "abc")?;
349 /// partition.insert("g", "abc")?;
350 ///
351 /// assert_eq!(2, keyspace.read_tx().range(&partition, "a"..="f").count());
352 /// #
353 /// # Ok::<(), fjall::Error>(())
354 /// ```
355 #[must_use]
356 pub fn range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
357 &'a self,
358 partition: &'a TxPartitionHandle,
359 range: R,
360 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
361 // TODO: 3.0.0: if we bind the iterator lifetime to ReadTx, we can remove the snapshot nonce from Iter
362 // TODO: for all other ReadTx::iterators too
363
364 let iter = partition
365 .inner
366 .tree
367 .range(range, Some(self.nonce.instant), None)
368 .map(|item| Ok(item?));
369
370 crate::iter::Iter::new(self.nonce.clone(), iter)
371 }
372
373 /// Iterates over a prefixed set of the transaction's state.
374 ///
375 /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
381 /// #
382 /// # let folder = tempfile::tempdir()?;
383 /// # let keyspace = Config::new(folder).open_transactional()?;
384 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
385 /// partition.insert("a", "abc")?;
386 /// partition.insert("ab", "abc")?;
387 /// partition.insert("abc", "abc")?;
388 ///
389 /// assert_eq!(2, keyspace.read_tx().prefix(&partition, "ab").count());
390 /// #
391 /// # Ok::<(), fjall::Error>(())
392 /// ```
393 #[must_use]
394 pub fn prefix<'a, K: AsRef<[u8]> + 'a>(
395 &'a self,
396 partition: &'a TxPartitionHandle,
397 prefix: K,
398 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
399 let iter = partition
400 .inner
401 .tree
402 .prefix(prefix, Some(self.nonce.instant), None)
403 .map(|item| Ok(item?));
404
405 crate::iter::Iter::new(self.nonce.clone(), iter)
406 }
407}