fjall/tx/
partition.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace};
6use lsm_tree::{gc::Report as GcReport, KvPair, UserKey, UserValue};
7use std::path::PathBuf;
8
9/// Access to a partition of a transactional keyspace
10#[derive(Clone)]
11pub struct TransactionalPartitionHandle {
12    pub(crate) inner: PartitionHandle,
13    pub(crate) keyspace: TxKeyspace,
14}
15
16impl GarbageCollection for TransactionalPartitionHandle {
17    fn gc_scan(&self) -> crate::Result<GcReport> {
18        self.inner().gc_scan()
19    }
20
21    fn gc_with_space_amp_target(&self, factor: f32) -> crate::Result<u64> {
22        self.inner().gc_with_space_amp_target(factor)
23    }
24
25    fn gc_with_staleness_threshold(&self, threshold: f32) -> crate::Result<u64> {
26        self.inner().gc_with_staleness_threshold(threshold)
27    }
28
29    fn gc_drop_stale_segments(&self) -> crate::Result<u64> {
30        self.inner().gc_drop_stale_segments()
31    }
32}
33
34impl TransactionalPartitionHandle {
35    /// Returns the underlying LSM-tree's path
36    #[must_use]
37    pub fn path(&self) -> PathBuf {
38        self.inner.path().into()
39    }
40
41    /// Approximates the amount of items in the partition.
42    ///
43    /// For update- or delete-heavy workloads, this value will
44    /// diverge from the real value, but is a O(1) operation.
45    ///
46    /// For insert-only workloads (e.g. logs, time series)
47    /// this value is reliable.
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
53    /// #
54    /// # let folder = tempfile::tempdir()?;
55    /// # let keyspace = Config::new(folder).open_transactional()?;
56    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
57    /// assert_eq!(partition.approximate_len(), 0);
58    ///
59    /// partition.insert("1", "abc")?;
60    /// assert_eq!(partition.approximate_len(), 1);
61    ///
62    /// partition.remove("1")?;
63    /// // Oops! approximate_len will not be reliable here
64    /// assert_eq!(partition.approximate_len(), 2);
65    /// #
66    /// # Ok::<(), fjall::Error>(())
67    /// ```
68    #[must_use]
69    pub fn approximate_len(&self) -> usize {
70        self.inner.approximate_len()
71    }
72
73    /// Removes an item and returns its value if it existed.
74    ///
75    /// The operation will run wrapped in a transaction.
76    ///
77    /// ```
78    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
79    /// # use std::sync::Arc;
80    /// #
81    /// # let folder = tempfile::tempdir()?;
82    /// # let keyspace = Config::new(folder).open_transactional()?;
83    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
84    /// partition.insert("a", "abc")?;
85    ///
86    /// let taken = partition.take("a")?.unwrap();
87    /// assert_eq!(b"abc", &*taken);
88    ///
89    /// let item = partition.get("a")?;
90    /// assert!(item.is_none());
91    /// #
92    /// # Ok::<(), fjall::Error>(())
93    /// ```
94    ///
95    /// # Errors
96    ///
97    /// Will return `Err` if an IO error occurs.
98    pub fn take<K: Into<UserKey>>(&self, key: K) -> crate::Result<Option<UserValue>> {
99        self.fetch_update(key, |_| None)
100    }
101
102    /// Atomically updates an item and returns the previous value.
103    ///
104    /// Returning `None` removes the item if it existed before.
105    ///
106    /// The operation will run wrapped in a transaction.
107    ///
108    /// # Note
109    ///
110    /// The provided closure can be called multiple times as this function
111    /// automatically retries on conflict. Since this is an `FnMut`, make sure
112    /// it is idempotent and will not cause side-effects.
113    ///
114    /// # Examples
115    ///
116    /// ```
117    /// # use fjall::{Config, Keyspace, Slice, PartitionCreateOptions};
118    /// # use std::sync::Arc;
119    /// #
120    /// # let folder = tempfile::tempdir()?;
121    /// # let keyspace = Config::new(folder).open_transactional()?;
122    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
123    /// partition.insert("a", "abc")?;
124    ///
125    /// let prev = partition.fetch_update("a", |_| Some(Slice::from(*b"def")))?.unwrap();
126    /// assert_eq!(b"abc", &*prev);
127    ///
128    /// let item = partition.get("a")?;
129    /// assert_eq!(Some("def".as_bytes().into()), item);
130    /// #
131    /// # Ok::<(), fjall::Error>(())
132    /// ```
133    ///
134    /// ```
135    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
136    /// # use std::sync::Arc;
137    /// #
138    /// # let folder = tempfile::tempdir()?;
139    /// # let keyspace = Config::new(folder).open_transactional()?;
140    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
141    /// partition.insert("a", "abc")?;
142    ///
143    /// let prev = partition.fetch_update("a", |_| None)?.unwrap();
144    /// assert_eq!(b"abc", &*prev);
145    ///
146    /// let item = partition.get("a")?;
147    /// assert!(item.is_none());
148    /// #
149    /// # Ok::<(), fjall::Error>(())
150    /// ```
151    ///
152    /// # Errors
153    ///
154    /// Will return `Err` if an IO error occurs.
155    #[allow(unused_mut)]
156    pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
157        &self,
158        key: K,
159        mut f: F,
160    ) -> crate::Result<Option<UserValue>> {
161        let key: UserKey = key.into();
162
163        #[cfg(feature = "single_writer_tx")]
164        {
165            let mut tx = self.keyspace.write_tx();
166
167            let prev = tx.fetch_update(self, key, f)?;
168            tx.commit()?;
169
170            Ok(prev)
171        }
172
173        #[cfg(feature = "ssi_tx")]
174        loop {
175            let mut tx = self.keyspace.write_tx()?;
176            let prev = tx.fetch_update(self, key.clone(), &mut f)?;
177            if tx.commit()?.is_ok() {
178                return Ok(prev);
179            }
180        }
181    }
182
183    /// Atomically updates an item and returns the new value.
184    ///
185    /// Returning `None` removes the item if it existed before.
186    ///
187    /// The operation will run wrapped in a transaction.
188    ///
189    /// # Note
190    ///
191    /// The provided closure can be called multiple times as this function
192    /// automatically retries on conflict. Since this is an `FnMut`, make sure
193    /// it is idempotent and will not cause side-effects.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// # use fjall::{Config, Keyspace, Slice, PartitionCreateOptions};
199    /// # use std::sync::Arc;
200    /// #
201    /// # let folder = tempfile::tempdir()?;
202    /// # let keyspace = Config::new(folder).open_transactional()?;
203    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
204    /// partition.insert("a", "abc")?;
205    ///
206    /// let updated = partition.update_fetch("a", |_| Some(Slice::from(*b"def")))?.unwrap();
207    /// assert_eq!(b"def", &*updated);
208    ///
209    /// let item = partition.get("a")?;
210    /// assert_eq!(Some("def".as_bytes().into()), item);
211    /// #
212    /// # Ok::<(), fjall::Error>(())
213    /// ```
214    ///
215    /// ```
216    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
217    /// # use std::sync::Arc;
218    /// #
219    /// # let folder = tempfile::tempdir()?;
220    /// # let keyspace = Config::new(folder).open_transactional()?;
221    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
222    /// partition.insert("a", "abc")?;
223    ///
224    /// let updated = partition.update_fetch("a", |_| None)?;
225    /// assert!(updated.is_none());
226    ///
227    /// let item = partition.get("a")?;
228    /// assert!(item.is_none());
229    /// #
230    /// # Ok::<(), fjall::Error>(())
231    /// ```
232    ///
233    /// # Errors
234    ///
235    /// Will return `Err` if an IO error occurs.
236    #[allow(unused_mut)]
237    pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
238        &self,
239        key: K,
240        mut f: F,
241    ) -> crate::Result<Option<UserValue>> {
242        let key = key.into();
243
244        #[cfg(feature = "single_writer_tx")]
245        {
246            let mut tx = self.keyspace.write_tx();
247            let updated = tx.update_fetch(self, key, f)?;
248            tx.commit()?;
249
250            Ok(updated)
251        }
252
253        #[cfg(feature = "ssi_tx")]
254        loop {
255            let mut tx = self.keyspace.write_tx()?;
256            let updated = tx.update_fetch(self, key.clone(), &mut f)?;
257            if tx.commit()?.is_ok() {
258                return Ok(updated);
259            }
260        }
261    }
262
263    /// Inserts a key-value pair into the partition.
264    ///
265    /// Keys may be up to 65536 bytes long, values up to 2^32 bytes.
266    /// Shorter keys and values result in better performance.
267    ///
268    /// If the key already exists, the item will be overwritten.
269    ///
270    /// The operation will run wrapped in a transaction.
271    ///
272    /// # Examples
273    ///
274    /// ```
275    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
276    /// #
277    /// # let folder = tempfile::tempdir()?;
278    /// # let keyspace = Config::new(folder).open_transactional()?;
279    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
280    /// partition.insert("a", "abc")?;
281    ///
282    /// assert!(!keyspace.read_tx().is_empty(&partition)?);
283    /// #
284    /// # Ok::<(), fjall::Error>(())
285    /// ```
286    ///
287    /// # Errors
288    ///
289    /// Will return `Err` if an IO error occurs.
290    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
291        &self,
292        key: K,
293        value: V,
294    ) -> crate::Result<()> {
295        #[cfg(feature = "single_writer_tx")]
296        {
297            let mut tx = self.keyspace.write_tx();
298            tx.insert(self, key, value);
299            tx.commit()?;
300            Ok(())
301        }
302
303        #[cfg(feature = "ssi_tx")]
304        {
305            let mut tx = self.keyspace.write_tx()?;
306            tx.insert(self, key, value);
307            tx.commit()?.expect("blind insert should not conflict ever");
308            Ok(())
309        }
310    }
311
312    /// Removes an item from the partition.
313    ///
314    /// The key may be up to 65536 bytes long.
315    /// Shorter keys result in better performance.
316    ///
317    /// The operation will run wrapped in a transaction.
318    ///
319    /// # Examples
320    ///
321    /// ```
322    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
323    /// #
324    /// # let folder = tempfile::tempdir()?;
325    /// # let keyspace = Config::new(folder).open_transactional()?;
326    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
327    /// partition.insert("a", "abc")?;
328    /// assert!(!keyspace.read_tx().is_empty(&partition)?);
329    ///
330    /// partition.remove("a")?;
331    /// assert!(keyspace.read_tx().is_empty(&partition)?);
332    /// #
333    /// # Ok::<(), fjall::Error>(())
334    /// ```
335    ///
336    /// # Errors
337    ///
338    /// Will return `Err` if an IO error occurs.
339    pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
340        #[cfg(feature = "single_writer_tx")]
341        {
342            let mut tx = self.keyspace.write_tx();
343            tx.remove(self, key);
344            tx.commit()?;
345            Ok(())
346        }
347
348        #[cfg(feature = "ssi_tx")]
349        {
350            let mut tx = self.keyspace.write_tx()?;
351            tx.remove(self, key);
352            tx.commit()?.expect("blind remove should not conflict ever");
353            Ok(())
354        }
355    }
356
357    /// Removes an item from the partition, leaving behind a weak tombstone.
358    ///
359    /// The tombstone marker of this delete operation will vanish when it
360    /// collides with its corresponding insertion.
361    /// This may cause older versions of the value to be resurrected, so it should
362    /// only be used and preferred in scenarios where a key is only ever written once.
363    ///
364    /// The key may be up to 65536 bytes long.
365    /// Shorter keys result in better performance.
366    ///
367    /// The operation will run wrapped in a transaction.
368    ///
369    /// # Experimental
370    ///
371    /// This function is currently experimental.
372    ///
373    /// # Examples
374    ///
375    /// ```
376    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
377    /// #
378    /// # let folder = tempfile::tempdir()?;
379    /// # let keyspace = Config::new(folder).open_transactional()?;
380    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
381    /// partition.insert("a", "abc")?;
382    /// assert!(!keyspace.read_tx().is_empty(&partition)?);
383    ///
384    /// partition.remove_weak("a")?;
385    /// assert!(keyspace.read_tx().is_empty(&partition)?);
386    /// #
387    /// # Ok::<(), fjall::Error>(())
388    /// ```
389    ///
390    /// # Errors
391    ///
392    /// Will return `Err` if an IO error occurs.
393    #[doc(hidden)]
394    pub fn remove_weak<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
395        #[cfg(feature = "single_writer_tx")]
396        {
397            let mut tx = self.keyspace.write_tx();
398            tx.remove_weak(self, key);
399            tx.commit()?;
400            Ok(())
401        }
402
403        #[cfg(feature = "ssi_tx")]
404        {
405            let mut tx = self.keyspace.write_tx()?;
406            tx.remove_weak(self, key);
407            tx.commit()?.expect("blind remove should not conflict ever");
408            Ok(())
409        }
410    }
411
412    /// Retrieves an item from the partition.
413    ///
414    /// The operation will run wrapped in a read snapshot.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
420    /// #
421    /// # let folder = tempfile::tempdir()?;
422    /// # let keyspace = Config::new(folder).open_transactional()?;
423    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
424    /// partition.insert("a", "my_value")?;
425    ///
426    /// let item = partition.get("a")?;
427    /// assert_eq!(Some("my_value".as_bytes().into()), item);
428    /// #
429    /// # Ok::<(), fjall::Error>(())
430    /// ```
431    ///
432    /// # Errors
433    ///
434    /// Will return `Err` if an IO error occurs.
435    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<lsm_tree::UserValue>> {
436        self.inner.get(key)
437    }
438
439    /// Retrieves the size of an item from the partition.
440    ///
441    /// The operation will run wrapped in a read snapshot.
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
447    /// #
448    /// # let folder = tempfile::tempdir()?;
449    /// # let keyspace = Config::new(folder).open_transactional()?;
450    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
451    /// partition.insert("a", "my_value")?;
452    ///
453    /// let len = partition.size_of("a")?.unwrap_or_default();
454    /// assert_eq!("my_value".len() as u32, len);
455    /// #
456    /// # Ok::<(), fjall::Error>(())
457    /// ```
458    ///
459    /// # Errors
460    ///
461    /// Will return `Err` if an IO error occurs.
462    pub fn size_of<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<u32>> {
463        self.inner.size_of(key)
464    }
465
466    /// Returns the first key-value pair in the partition.
467    /// The key in this pair is the minimum key in the partition.
468    ///
469    /// The operation will run wrapped in a read snapshot.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
475    /// #
476    /// # let folder = tempfile::tempdir()?;
477    /// # let keyspace = Config::new(folder).open_transactional()?;
478    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
479    /// partition.insert("a", "my_value")?;
480    /// partition.insert("b", "my_value")?;
481    ///
482    /// assert_eq!(b"a", &*partition.first_key_value()?.unwrap().0);
483    /// #
484    /// # Ok::<(), fjall::Error>(())
485    /// ```
486    ///
487    /// # Errors
488    ///
489    /// Will return `Err` if an IO error occurs.
490    pub fn first_key_value(&self) -> crate::Result<Option<KvPair>> {
491        let read_tx = self.keyspace.read_tx();
492        read_tx.first_key_value(self)
493    }
494
495    /// Returns the last key-value pair in the partition.
496    /// The key in this pair is the maximum key in the partition.
497    ///
498    /// The operation will run wrapped in a read snapshot.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
504    /// #
505    /// # let folder = tempfile::tempdir()?;
506    /// # let keyspace = Config::new(folder).open_transactional()?;
507    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
508    /// partition.insert("a", "my_value")?;
509    /// partition.insert("b", "my_value")?;
510    ///
511    /// assert_eq!(b"b", &*partition.last_key_value()?.unwrap().0);
512    /// #
513    /// # Ok::<(), fjall::Error>(())
514    /// ```
515    ///
516    /// # Errors
517    ///
518    /// Will return `Err` if an IO error occurs.
519    pub fn last_key_value(&self) -> crate::Result<Option<KvPair>> {
520        let read_tx = self.keyspace.read_tx();
521        read_tx.last_key_value(self)
522    }
523
524    /// Returns `true` if the partition contains the specified key.
525    ///
526    /// The operation will run wrapped in a read snapshot.
527    ///
528    /// # Examples
529    ///
530    /// ```
531    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
532    /// #
533    /// # let folder = tempfile::tempdir()?;
534    /// # let keyspace = Config::new(folder).open_transactional()?;
535    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
536    /// partition.insert("a", "my_value")?;
537    ///
538    /// assert!(partition.contains_key("a")?);
539    /// assert!(!partition.contains_key("b")?);
540    /// #
541    /// # Ok::<(), fjall::Error>(())
542    /// ```
543    ///
544    /// # Errors
545    ///
546    /// Will return `Err` if an IO error occurs.
547    pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
548        self.inner.contains_key(key)
549    }
550
551    /// Allows access to the inner partition handle, allowing to
552    /// escape from the transactional context.
553    #[doc(hidden)]
554    #[must_use]
555    pub fn inner(&self) -> &PartitionHandle {
556        &self.inner
557    }
558}