1use super::{read_tx::ReadTransaction, write_tx::WriteTransaction};
6use crate::{
7 batch::PartitionKey, snapshot_nonce::SnapshotNonce, Config, Keyspace, PartitionCreateOptions,
8 PersistMode, TxPartitionHandle,
9};
10use std::sync::{Arc, Mutex};
11
12#[cfg(feature = "ssi_tx")]
13use super::oracle::Oracle;
14
15#[derive(Clone)]
17#[allow(clippy::module_name_repetitions)]
18pub struct TransactionalKeyspace {
19 pub(crate) inner: Keyspace,
20
21 #[cfg(feature = "ssi_tx")]
22 pub(super) oracle: Arc<Oracle>,
23
24 #[cfg(feature = "single_writer_tx")]
25 single_writer_lock: Arc<Mutex<()>>,
26}
27
28#[allow(clippy::module_name_repetitions)]
30pub type TxKeyspace = TransactionalKeyspace;
31
32impl TxKeyspace {
33 #[doc(hidden)]
34 #[must_use]
35 pub fn inner(&self) -> &Keyspace {
36 &self.inner
37 }
38
39 #[cfg(feature = "single_writer_tx")]
41 #[must_use]
42 pub fn write_tx(&self) -> WriteTransaction {
43 let guard = self.single_writer_lock.lock().expect("poisoned tx lock");
44 let instant = self.inner.instant();
45
46 let mut write_tx = WriteTransaction::new(
47 self.clone(),
48 SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()),
49 guard,
50 );
51
52 if !self.inner.config.manual_journal_persist {
53 write_tx = write_tx.durability(Some(PersistMode::Buffer));
54 }
55
56 write_tx
57 }
58
59 #[cfg(feature = "ssi_tx")]
65 pub fn write_tx(&self) -> crate::Result<WriteTransaction> {
66 let instant = {
67 let _guard = self.oracle.write_serialize_lock()?;
71
72 self.inner.instant()
73 };
74
75 let mut write_tx = WriteTransaction::new(
76 self.clone(),
77 SnapshotNonce::new(instant, self.inner.snapshot_tracker.clone()),
78 );
79
80 if !self.inner.config.manual_journal_persist {
81 write_tx = write_tx.durability(Some(PersistMode::Buffer));
82 }
83
84 Ok(write_tx)
85 }
86
87 #[must_use]
89 pub fn read_tx(&self) -> ReadTransaction {
90 let instant = self.inner.instant();
91
92 ReadTransaction::new(SnapshotNonce::new(
93 instant,
94 self.inner.snapshot_tracker.clone(),
95 ))
96 }
97
98 pub fn persist(&self, mode: PersistMode) -> crate::Result<()> {
123 self.inner.persist(mode)
124 }
125
126 pub fn open_partition(
142 &self,
143 name: &str,
144 create_options: PartitionCreateOptions,
145 ) -> crate::Result<TxPartitionHandle> {
146 let partition = self.inner.open_partition(name, create_options)?;
147
148 Ok(TxPartitionHandle {
149 inner: partition,
150 keyspace: self.clone(),
151 })
152 }
153
154 #[must_use]
156 pub fn partition_count(&self) -> usize {
157 self.inner.partition_count()
158 }
159
160 #[must_use]
162 pub fn list_partitions(&self) -> Vec<PartitionKey> {
163 self.inner.list_partitions()
164 }
165
166 #[must_use]
168 pub fn partition_exists(&self, name: &str) -> bool {
169 self.inner.partition_exists(name)
170 }
171
172 pub fn delete_partition(&self, handle: TxPartitionHandle) -> crate::Result<()> {
178 self.inner.delete_partition(handle.inner)
179 }
180
181 #[must_use]
183 pub fn write_buffer_size(&self) -> u64 {
184 self.inner.write_buffer_size()
185 }
186
187 #[must_use]
189 pub fn journal_count(&self) -> usize {
190 self.inner.journal_count()
191 }
192
193 #[must_use]
195 pub fn disk_space(&self) -> u64 {
196 self.inner.disk_space()
197 }
198
199 pub fn open(config: Config) -> crate::Result<Self> {
205 let inner = Keyspace::create_or_recover(config)?;
206 inner.start_background_threads()?;
207
208 Ok(Self {
209 #[cfg(feature = "ssi_tx")]
210 oracle: Arc::new(Oracle {
211 write_serialize_lock: Mutex::default(),
212 seqno: inner.seqno.clone(),
213 snapshot_tracker: inner.snapshot_tracker.clone(),
214 }),
215 inner,
216 #[cfg(feature = "single_writer_tx")]
217 single_writer_lock: Default::default(),
218 })
219 }
220}