fjall/tx/
read_tx.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{snapshot_nonce::SnapshotNonce, TxPartitionHandle};
6use lsm_tree::{AbstractTree, KvPair, UserKey, UserValue};
7use std::ops::RangeBounds;
8
9/// A cross-partition, read-only transaction (snapshot)
10pub struct ReadTransaction {
11    nonce: SnapshotNonce,
12}
13
14impl ReadTransaction {
15    pub(crate) fn new(nonce: SnapshotNonce) -> Self {
16        Self { nonce }
17    }
18
19    /// Retrieves an item from the transaction's state.
20    ///
21    /// # Examples
22    ///
23    /// ```
24    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
25    /// #
26    /// # let folder = tempfile::tempdir()?;
27    /// # let keyspace = Config::new(folder).open_transactional()?;
28    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
29    /// partition.insert("a", "my_value")?;
30    ///
31    /// let tx = keyspace.read_tx();
32    /// let item = tx.get(&partition, "a")?;
33    /// assert_eq!(Some("my_value".as_bytes().into()), item);
34    ///
35    /// partition.insert("b", "my_updated_value")?;
36    ///
37    /// // Repeatable read
38    /// let item = tx.get(&partition, "a")?;
39    /// assert_eq!(Some("my_value".as_bytes().into()), item);
40    /// #
41    /// # Ok::<(), fjall::Error>(())
42    /// ```
43    ///
44    /// # Errors
45    ///
46    /// Will return `Err` if an IO error occurs.
47    pub fn get<K: AsRef<[u8]>>(
48        &self,
49        partition: &TxPartitionHandle,
50        key: K,
51    ) -> crate::Result<Option<UserValue>> {
52        partition
53            .inner
54            .tree
55            .snapshot_at(self.nonce.instant)
56            .get(key)
57            .map_err(Into::into)
58    }
59
60    /// Retrieves the size of an item from the transaction's state.
61    ///
62    /// # Examples
63    ///
64    /// ```
65    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
66    /// #
67    /// # let folder = tempfile::tempdir()?;
68    /// # let keyspace = Config::new(folder).open_transactional()?;
69    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
70    /// partition.insert("a", "my_value")?;
71    ///
72    /// let tx = keyspace.read_tx();
73    /// let item = tx.size_of(&partition, "a")?.unwrap_or_default();
74    /// assert_eq!("my_value".len() as u32, item);
75    ///
76    /// partition.insert("b", "my_updated_value")?;
77    ///
78    /// // Repeatable read
79    /// let item = tx.size_of(&partition, "a")?.unwrap_or_default();
80    /// assert_eq!("my_value".len() as u32, item);
81    /// #
82    /// # Ok::<(), fjall::Error>(())
83    /// ```
84    ///
85    /// # Errors
86    ///
87    /// Will return `Err` if an IO error occurs.
88    pub fn size_of<K: AsRef<[u8]>>(
89        &self,
90        partition: &TxPartitionHandle,
91        key: K,
92    ) -> crate::Result<Option<u32>> {
93        partition
94            .inner
95            .tree
96            .snapshot_at(self.nonce.instant)
97            .size_of(key)
98            .map_err(Into::into)
99    }
100
101    /// Returns `true` if the transaction's state contains the specified key.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
107    /// #
108    /// # let folder = tempfile::tempdir()?;
109    /// # let keyspace = Config::new(folder).open_transactional()?;
110    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
111    /// partition.insert("a", "my_value")?;
112    ///
113    /// let tx = keyspace.read_tx();
114    /// assert!(tx.contains_key(&partition, "a")?);
115    /// #
116    /// # Ok::<(), fjall::Error>(())
117    /// ```
118    ///
119    /// # Errors
120    ///
121    /// Will return `Err` if an IO error occurs.
122    pub fn contains_key<K: AsRef<[u8]>>(
123        &self,
124        partition: &TxPartitionHandle,
125        key: K,
126    ) -> crate::Result<bool> {
127        partition
128            .inner
129            .tree
130            .snapshot_at(self.nonce.instant)
131            .contains_key(key)
132            .map_err(Into::into)
133    }
134
135    /// Returns the first key-value pair in the transaction's state.
136    /// The key in this pair is the minimum key in the transaction's state.
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
142    /// #
143    /// # let folder = tempfile::tempdir()?;
144    /// # let keyspace = Config::new(folder).open_transactional()?;
145    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
146    /// partition.insert("1", "abc")?;
147    /// partition.insert("3", "abc")?;
148    /// partition.insert("5", "abc")?;
149    ///
150    /// let (key, _) = keyspace.read_tx().first_key_value(&partition)?.expect("item should exist");
151    /// assert_eq!(&*key, "1".as_bytes());
152    /// #
153    /// # Ok::<(), fjall::Error>(())
154    /// ```
155    ///
156    /// # Errors
157    ///
158    /// Will return `Err` if an IO error occurs.
159    pub fn first_key_value(&self, partition: &TxPartitionHandle) -> crate::Result<Option<KvPair>> {
160        self.iter(partition).next().transpose()
161    }
162
163    /// Returns the last key-value pair in the transaction's state.
164    /// The key in this pair is the maximum key in the transaction's state.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
170    /// #
171    /// # let folder = tempfile::tempdir()?;
172    /// # let keyspace = Config::new(folder).open_transactional()?;
173    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
174    /// partition.insert("1", "abc")?;
175    /// partition.insert("3", "abc")?;
176    /// partition.insert("5", "abc")?;
177    ///
178    /// let (key, _) = keyspace.read_tx().last_key_value(&partition)?.expect("item should exist");
179    /// assert_eq!(&*key, "5".as_bytes());
180    /// #
181    /// # Ok::<(), fjall::Error>(())
182    /// ```
183    ///
184    /// # Errors
185    ///
186    /// Will return `Err` if an IO error occurs.
187    pub fn last_key_value(&self, partition: &TxPartitionHandle) -> crate::Result<Option<KvPair>> {
188        self.iter(partition).next_back().transpose()
189    }
190
191    /// Scans the entire partition, returning the amount of items.
192    ///
193    /// ###### Caution
194    ///
195    /// This operation scans the entire partition: O(n) complexity!
196    ///
197    /// Never, under any circumstances, use .`len()` == 0 to check
198    /// if the partition is empty, use [`ReadTransaction::is_empty`] instead.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
204    /// #
205    /// # let folder = tempfile::tempdir()?;
206    /// # let keyspace = Config::new(folder).open_transactional()?;
207    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
208    /// partition.insert("a", "my_value")?;
209    /// partition.insert("b", "my_value2")?;
210    ///
211    /// let tx = keyspace.read_tx();
212    /// assert_eq!(2, tx.len(&partition)?);
213    ///
214    /// partition.insert("c", "my_value3")?;
215    ///
216    /// // Repeatable read
217    /// assert_eq!(2, tx.len(&partition)?);
218    ///
219    /// // Start new snapshot
220    /// let tx = keyspace.read_tx();
221    /// assert_eq!(3, tx.len(&partition)?);
222    /// #
223    /// # Ok::<(), fjall::Error>(())
224    /// ```
225    ///
226    /// # Errors
227    ///
228    /// Will return `Err` if an IO error occurs.
229    pub fn len(&self, partition: &TxPartitionHandle) -> crate::Result<usize> {
230        let mut count = 0;
231
232        for kv in self.iter(partition) {
233            let _ = kv?;
234            count += 1;
235        }
236
237        Ok(count)
238    }
239
240    /// Returns `true` if the partition is empty.
241    ///
242    /// This operation has O(log N) complexity.
243    ///
244    /// # Examples
245    ///
246    /// ```
247    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
248    /// #
249    /// # let folder = tempfile::tempdir()?;
250    /// # let keyspace = Config::new(folder).open_transactional()?;
251    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
252    /// assert!(keyspace.read_tx().is_empty(&partition)?);
253    ///
254    /// partition.insert("a", "abc")?;
255    /// assert!(!keyspace.read_tx().is_empty(&partition)?);
256    /// #
257    /// # Ok::<(), fjall::Error>(())
258    /// ```
259    ///
260    /// # Errors
261    ///
262    /// Will return `Err` if an IO error occurs.
263    pub fn is_empty(&self, partition: &TxPartitionHandle) -> crate::Result<bool> {
264        self.first_key_value(partition).map(|x| x.is_none())
265    }
266
267    /// Iterates over the transaction's state.
268    ///
269    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
275    /// #
276    /// # let folder = tempfile::tempdir()?;
277    /// # let keyspace = Config::new(folder).open_transactional()?;
278    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
279    /// partition.insert("a", "abc")?;
280    /// partition.insert("f", "abc")?;
281    /// partition.insert("g", "abc")?;
282    ///
283    /// assert_eq!(3, keyspace.read_tx().iter(&partition).count());
284    /// #
285    /// # Ok::<(), fjall::Error>(())
286    /// ```
287    #[must_use]
288    pub fn iter<'a>(
289        &'a self,
290        partition: &'a TxPartitionHandle,
291    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
292        let iter = partition
293            .inner
294            .tree
295            .iter(Some(self.nonce.instant), None)
296            .map(|item| Ok(item?));
297
298        crate::iter::Iter::new(self.nonce.clone(), iter)
299    }
300
301    /// Iterates over the transaction's state, returning keys only.
302    ///
303    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
304    #[must_use]
305    pub fn keys<'a>(
306        &'a self,
307        partition: &'a TxPartitionHandle,
308    ) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
309        let iter = partition
310            .inner
311            .tree
312            .keys(Some(self.nonce.instant), None)
313            .map(|item| Ok(item?));
314
315        crate::iter::Iter::new(self.nonce.clone(), iter)
316    }
317
318    /// Iterates over the transaction's state, returning values only.
319    ///
320    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
321    #[must_use]
322    pub fn values<'a>(
323        &'a self,
324        partition: &'a TxPartitionHandle,
325    ) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
326        let iter = partition
327            .inner
328            .tree
329            .values(Some(self.nonce.instant), None)
330            .map(|item| Ok(item?));
331
332        crate::iter::Iter::new(self.nonce.clone(), iter)
333    }
334
335    /// Iterates over a range of the transaction's state.
336    ///
337    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
343    /// #
344    /// # let folder = tempfile::tempdir()?;
345    /// # let keyspace = Config::new(folder).open_transactional()?;
346    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
347    /// partition.insert("a", "abc")?;
348    /// partition.insert("f", "abc")?;
349    /// partition.insert("g", "abc")?;
350    ///
351    /// assert_eq!(2, keyspace.read_tx().range(&partition, "a"..="f").count());
352    /// #
353    /// # Ok::<(), fjall::Error>(())
354    /// ```
355    #[must_use]
356    pub fn range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
357        &'a self,
358        partition: &'a TxPartitionHandle,
359        range: R,
360    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
361        // TODO: 3.0.0: if we bind the iterator lifetime to ReadTx, we can remove the snapshot nonce from Iter
362        // TODO: for all other ReadTx::iterators too
363
364        let iter = partition
365            .inner
366            .tree
367            .range(range, Some(self.nonce.instant), None)
368            .map(|item| Ok(item?));
369
370        crate::iter::Iter::new(self.nonce.clone(), iter)
371    }
372
373    /// Iterates over a prefixed set of the transaction's state.
374    ///
375    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
381    /// #
382    /// # let folder = tempfile::tempdir()?;
383    /// # let keyspace = Config::new(folder).open_transactional()?;
384    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
385    /// partition.insert("a", "abc")?;
386    /// partition.insert("ab", "abc")?;
387    /// partition.insert("abc", "abc")?;
388    ///
389    /// assert_eq!(2, keyspace.read_tx().prefix(&partition, "ab").count());
390    /// #
391    /// # Ok::<(), fjall::Error>(())
392    /// ```
393    #[must_use]
394    pub fn prefix<'a, K: AsRef<[u8]> + 'a>(
395        &'a self,
396        partition: &'a TxPartitionHandle,
397        prefix: K,
398    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
399        let iter = partition
400            .inner
401            .tree
402            .prefix(prefix, Some(self.nonce.instant), None)
403            .map(|item| Ok(item?));
404
405        crate::iter::Iter::new(self.nonce.clone(), iter)
406    }
407}