Skip to main content

fjall/tx/optimistic/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod conflict_manager;
6mod keyspace;
7mod oracle;
8mod write_tx;
9
10use crate::{
11    keyspace::KeyspaceKey,
12    tx::{optimistic::oracle::Oracle, single_writer::Openable},
13    Config, Database, KeyspaceCreateOptions, PersistMode, Snapshot,
14};
15use std::{
16    path::Path,
17    sync::{Arc, Mutex},
18};
19
20pub use keyspace::OptimisticTxKeyspace;
21pub use write_tx::{Conflict, WriteTransaction};
22
23/// Transactional database
24#[derive(Clone)]
25pub struct OptimisticTxDatabase {
26    pub(crate) inner: Database,
27    pub(super) oracle: Arc<Oracle>,
28}
29
30impl Openable for OptimisticTxDatabase {
31    fn open(config: Config) -> crate::Result<Self>
32    where
33        Self: Sized,
34    {
35        let inner = Database::create_or_recover(config)?;
36        // inner.start_background_threads()?;
37
38        Ok(Self {
39            oracle: Arc::new(Oracle {
40                write_serialize_lock: Mutex::default(),
41                snapshot_tracker: inner.supervisor.snapshot_tracker.clone(),
42            }),
43            inner,
44        })
45    }
46}
47
48impl OptimisticTxDatabase {
49    /// Creates a new database builder to create or open a database at `path`.
50    pub fn builder(path: impl AsRef<Path>) -> crate::DatabaseBuilder<Self> {
51        crate::DatabaseBuilder::new(path.as_ref())
52    }
53
54    #[doc(hidden)]
55    #[must_use]
56    pub fn inner(&self) -> &Database {
57        &self.inner
58    }
59
60    /// Starts a new writeable transaction.
61    ///
62    /// # Errors
63    ///
64    /// Will return `Err` if creation failed.
65    pub fn write_tx(&self) -> crate::Result<WriteTransaction> {
66        let snapshot = {
67            // acquire a lock here to prevent getting a stale snapshot seqno
68            // this will drain at least part of the commit queue, but ordering
69            // is platform-dependent since we use std::sync::Mutex
70            let _guard = self.oracle.write_serialize_lock()?;
71
72            self.inner.supervisor.snapshot_tracker.open()
73        };
74
75        let mut write_tx =
76            WriteTransaction::new(self.inner().clone(), snapshot, self.oracle.clone());
77
78        if !self.inner.config.manual_journal_persist {
79            write_tx = write_tx.durability(Some(PersistMode::Buffer));
80        }
81
82        Ok(write_tx)
83    }
84
85    /// Starts a new read-only transaction (a.k.a. [`Snapshot`]).
86    #[must_use]
87    pub fn read_tx(&self) -> Snapshot {
88        self.inner.snapshot()
89    }
90
91    /// Flushes the active journal. The durability depends on the [`PersistMode`]
92    /// used.
93    ///
94    /// Persisting only affects durability, NOT consistency! Even without flushing
95    /// data is crash-safe.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// # use fjall::{PersistMode, OptimisticTxDatabase, KeyspaceCreateOptions};
101    /// # let folder = tempfile::tempdir()?;
102    /// let db = OptimisticTxDatabase::builder(folder).open()?;
103    /// let items = db.keyspace("my_items", KeyspaceCreateOptions::default)?;
104    ///
105    /// items.insert("a", "hello")?;
106    ///
107    /// db.persist(PersistMode::SyncAll)?;
108    /// #
109    /// # Ok::<_, fjall::Error>(())
110    /// ```
111    ///
112    /// # Errors
113    ///
114    /// Returns error, if an IO error occurred.
115    pub fn persist(&self, mode: PersistMode) -> crate::Result<()> {
116        self.inner.persist(mode)
117    }
118
119    /// Creates or opens a keyspace.
120    ///
121    /// If the keyspace does not yet exist, it will be created configured with `create_options`.
122    /// Otherwise simply a handle to the existing keyspace will be returned.
123    ///
124    /// Keyspace names can be up to 255 characters long and can not be empty.
125    ///
126    /// # Errors
127    ///
128    /// Returns error, if an IO error occurred.
129    ///
130    /// # Panics
131    ///
132    /// Panics if the keyspace name is invalid.
133    pub fn keyspace(
134        &self,
135        name: &str,
136        create_options: impl FnOnce() -> KeyspaceCreateOptions,
137    ) -> crate::Result<OptimisticTxKeyspace> {
138        let keyspace = self.inner.keyspace(name, create_options)?;
139
140        Ok(OptimisticTxKeyspace {
141            inner: keyspace,
142            db: self.clone(),
143        })
144    }
145
146    /// Returns the number of keyspaces.
147    #[must_use]
148    pub fn keyspace_count(&self) -> usize {
149        self.inner.keyspace_count()
150    }
151
152    /// Gets a list of all keyspace names in the database.
153    #[must_use]
154    pub fn list_keyspace_names(&self) -> Vec<KeyspaceKey> {
155        self.inner.list_keyspace_names()
156    }
157
158    /// Returns `true` if the keyspace with the given name exists.
159    #[must_use]
160    pub fn keyspace_exists(&self, name: &str) -> bool {
161        self.inner.keyspace_exists(name)
162    }
163
164    /// Returns the current write buffer size (active + sealed memtables).
165    #[must_use]
166    pub fn write_buffer_size(&self) -> u64 {
167        self.inner.write_buffer_size()
168    }
169
170    /// Returns the number of journal fragments on disk.
171    #[must_use]
172    pub fn journal_count(&self) -> usize {
173        self.inner.journal_count()
174    }
175
176    /// Returns the disk space usage of the entire database.
177    ///
178    /// # Errors
179    ///
180    /// Returns error, if an IO error occurred.
181    pub fn disk_space(&self) -> crate::Result<u64> {
182        self.inner.disk_space()
183    }
184}