fjall/tx/
keyspace.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{read_tx::ReadTransaction, write_tx::WriteTransaction};
6use crate::{
7    batch::PartitionKey, snapshot_nonce::SnapshotNonce, Config, Keyspace, PartitionCreateOptions,
8    PersistMode, TxPartitionHandle,
9};
10use std::sync::{Arc, Mutex};
11
12#[cfg(feature = "ssi_tx")]
13use super::oracle::Oracle;
14
15/// Transactional keyspace
16#[derive(Clone)]
17#[allow(clippy::module_name_repetitions)]
18pub struct TransactionalKeyspace {
19    pub(crate) inner: Keyspace,
20
21    #[cfg(feature = "ssi_tx")]
22    pub(super) oracle: Arc<Oracle>,
23
24    #[cfg(feature = "single_writer_tx")]
25    single_writer_lock: Arc<Mutex<()>>,
26}
27
28/// Alias for [`TransactionalKeyspace`]
29#[allow(clippy::module_name_repetitions)]
30pub type TxKeyspace = TransactionalKeyspace;
31
32impl TxKeyspace {
33    #[doc(hidden)]
34    #[must_use]
35    pub fn inner(&self) -> &Keyspace {
36        &self.inner
37    }
38
39    /// Starts a new writeable transaction.
40    #[cfg(feature = "single_writer_tx")]
41    #[must_use]
42    pub fn write_tx(&self) -> WriteTransaction {
43        let guard = self.single_writer_lock.lock().expect("poisoned tx lock");
44        let instant = self.inner.instant();
45
46        let mut write_tx = WriteTransaction::new(
47            self.clone(),
48            SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()),
49            guard,
50        );
51
52        if !self.inner.config.manual_journal_persist {
53            write_tx = write_tx.durability(Some(PersistMode::Buffer));
54        }
55
56        write_tx
57    }
58
59    /// Starts a new writeable transaction.
60    ///
61    /// # Errors
62    ///
63    /// Will return `Err` if creation failed.
64    #[cfg(feature = "ssi_tx")]
65    pub fn write_tx(&self) -> crate::Result<WriteTransaction> {
66        let instant = {
67            // acquire a lock here to prevent getting a stale snapshot seqno
68            // this will drain at least part of the commit queue, but ordering
69            // is platform-dependent since we use std::sync::Mutex
70            let _guard = self.oracle.write_serialize_lock()?;
71
72            self.inner.instant()
73        };
74
75        let mut write_tx = WriteTransaction::new(
76            self.clone(),
77            SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()),
78        );
79
80        if !self.inner.config.manual_journal_persist {
81            write_tx = write_tx.durability(Some(PersistMode::Buffer));
82        }
83
84        Ok(write_tx)
85    }
86
87    /// Starts a new read-only transaction.
88    #[must_use]
89    pub fn read_tx(&self) -> ReadTransaction {
90        let instant = self.inner.instant();
91
92        ReadTransaction::new(SnapshotNonce::new(
93            instant,
94            self.inner.snapshot_tracker.clone(),
95        ))
96    }
97
98    /// Flushes the active journal. The durability depends on the [`PersistMode`]
99    /// used.
100    ///
101    /// Persisting only affects durability, NOT consistency! Even without flushing
102    /// data is crash-safe.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// # use fjall::{Config, PersistMode, Keyspace, PartitionCreateOptions};
108    /// # let folder = tempfile::tempdir()?;
109    /// let keyspace = Config::new(folder).open_transactional()?;
110    /// let items = keyspace.open_partition("my_items", PartitionCreateOptions::default())?;
111    ///
112    /// items.insert("a", "hello")?;
113    ///
114    /// keyspace.persist(PersistMode::SyncAll)?;
115    /// #
116    /// # Ok::<_, fjall::Error>(())
117    /// ```
118    ///
119    /// # Errors
120    ///
121    /// Returns error, if an IO error occurred.
122    pub fn persist(&self, mode: PersistMode) -> crate::Result<()> {
123        self.inner.persist(mode)
124    }
125
126    /// Creates or opens a keyspace partition.
127    ///
128    /// If the partition does not yet exist, it will be created configured with `create_options`.
129    /// Otherwise simply a handle to the existing partition will be returned.
130    ///
131    /// Partition names can be up to 255 characters long, can not be empty and
132    /// can only contain alphanumerics, underscore (`_`), dash (`-`), hash tag (`#`) and dollar (`$`).
133    ///
134    /// # Errors
135    ///
136    /// Returns error, if an IO error occurred.
137    ///
138    /// # Panics
139    ///
140    /// Panics if the partition name is invalid.
141    pub fn open_partition(
142        &self,
143        name: &str,
144        create_options: PartitionCreateOptions,
145    ) -> crate::Result<TxPartitionHandle> {
146        let partition = self.inner.open_partition(name, create_options)?;
147
148        Ok(TxPartitionHandle {
149            inner: partition,
150            keyspace: self.clone(),
151        })
152    }
153
154    /// Returns the amount of partitions
155    #[must_use]
156    pub fn partition_count(&self) -> usize {
157        self.inner.partition_count()
158    }
159
160    /// Gets a list of all partition names in the keyspace
161    #[must_use]
162    pub fn list_partitions(&self) -> Vec<PartitionKey> {
163        self.inner.list_partitions()
164    }
165
166    /// Returns `true` if the partition with the given name exists.
167    #[must_use]
168    pub fn partition_exists(&self, name: &str) -> bool {
169        self.inner.partition_exists(name)
170    }
171
172    /// Destroys the partition, removing all data associated with it.
173    ///
174    /// # Errors
175    ///
176    /// Will return `Err` if an IO error occurs.
177    pub fn delete_partition(&self, handle: TxPartitionHandle) -> crate::Result<()> {
178        self.inner.delete_partition(handle.inner)
179    }
180
181    /// Returns the current write buffer size (active + sealed memtables).
182    #[must_use]
183    pub fn write_buffer_size(&self) -> u64 {
184        self.inner.write_buffer_size()
185    }
186
187    /// Returns the amount of journals on disk.
188    #[must_use]
189    pub fn journal_count(&self) -> usize {
190        self.inner.journal_count()
191    }
192
193    /// Returns the disk space usage of the entire keyspace.
194    #[must_use]
195    pub fn disk_space(&self) -> u64 {
196        self.inner.disk_space()
197    }
198
199    /// Opens a keyspace in the given directory.
200    ///
201    /// # Errors
202    ///
203    /// Returns error, if an IO error occurred.
204    pub fn open(config: Config) -> crate::Result<Self> {
205        let inner = Keyspace::create_or_recover(config)?;
206        inner.start_background_threads()?;
207
208        Ok(Self {
209            #[cfg(feature = "ssi_tx")]
210            oracle: Arc::new(Oracle {
211                write_serialize_lock: Mutex::default(),
212                seqno: inner.seqno.clone(),
213                snapshot_tracker: inner.snapshot_tracker.clone(),
214            }),
215            inner,
216            #[cfg(feature = "single_writer_tx")]
217            single_writer_lock: Default::default(),
218        })
219    }
220}