fjall/tx/partition.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace};
6use lsm_tree::{gc::Report as GcReport, KvPair, UserKey, UserValue};
7use std::path::PathBuf;
8
9/// Access to a partition of a transactional keyspace
10#[derive(Clone)]
11pub struct TransactionalPartitionHandle {
12 pub(crate) inner: PartitionHandle,
13 pub(crate) keyspace: TxKeyspace,
14}
15
16impl GarbageCollection for TransactionalPartitionHandle {
17 fn gc_scan(&self) -> crate::Result<GcReport> {
18 self.inner().gc_scan()
19 }
20
21 fn gc_with_space_amp_target(&self, factor: f32) -> crate::Result<u64> {
22 self.inner().gc_with_space_amp_target(factor)
23 }
24
25 fn gc_with_staleness_threshold(&self, threshold: f32) -> crate::Result<u64> {
26 self.inner().gc_with_staleness_threshold(threshold)
27 }
28
29 fn gc_drop_stale_segments(&self) -> crate::Result<u64> {
30 self.inner().gc_drop_stale_segments()
31 }
32}
33
34impl TransactionalPartitionHandle {
35 /// Returns the underlying LSM-tree's path
36 #[must_use]
37 pub fn path(&self) -> PathBuf {
38 self.inner.path().into()
39 }
40
41 /// Approximates the amount of items in the partition.
42 ///
43 /// For update- or delete-heavy workloads, this value will
44 /// diverge from the real value, but is a O(1) operation.
45 ///
46 /// For insert-only workloads (e.g. logs, time series)
47 /// this value is reliable.
48 ///
49 /// # Examples
50 ///
51 /// ```
52 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
53 /// #
54 /// # let folder = tempfile::tempdir()?;
55 /// # let keyspace = Config::new(folder).open_transactional()?;
56 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
57 /// assert_eq!(partition.approximate_len(), 0);
58 ///
59 /// partition.insert("1", "abc")?;
60 /// assert_eq!(partition.approximate_len(), 1);
61 ///
62 /// partition.remove("1")?;
63 /// // Oops! approximate_len will not be reliable here
64 /// assert_eq!(partition.approximate_len(), 2);
65 /// #
66 /// # Ok::<(), fjall::Error>(())
67 /// ```
68 #[must_use]
69 pub fn approximate_len(&self) -> usize {
70 self.inner.approximate_len()
71 }
72
73 /// Removes an item and returns its value if it existed.
74 ///
75 /// The operation will run wrapped in a transaction.
76 ///
77 /// ```
78 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
79 /// # use std::sync::Arc;
80 /// #
81 /// # let folder = tempfile::tempdir()?;
82 /// # let keyspace = Config::new(folder).open_transactional()?;
83 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
84 /// partition.insert("a", "abc")?;
85 ///
86 /// let taken = partition.take("a")?.unwrap();
87 /// assert_eq!(b"abc", &*taken);
88 ///
89 /// let item = partition.get("a")?;
90 /// assert!(item.is_none());
91 /// #
92 /// # Ok::<(), fjall::Error>(())
93 /// ```
94 ///
95 /// # Errors
96 ///
97 /// Will return `Err` if an IO error occurs.
98 pub fn take<K: Into<UserKey>>(&self, key: K) -> crate::Result<Option<UserValue>> {
99 self.fetch_update(key, |_| None)
100 }
101
102 /// Atomically updates an item and returns the previous value.
103 ///
104 /// Returning `None` removes the item if it existed before.
105 ///
106 /// The operation will run wrapped in a transaction.
107 ///
108 /// # Note
109 ///
110 /// The provided closure can be called multiple times as this function
111 /// automatically retries on conflict. Since this is an `FnMut`, make sure
112 /// it is idempotent and will not cause side-effects.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// # use fjall::{Config, Keyspace, Slice, PartitionCreateOptions};
118 /// # use std::sync::Arc;
119 /// #
120 /// # let folder = tempfile::tempdir()?;
121 /// # let keyspace = Config::new(folder).open_transactional()?;
122 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
123 /// partition.insert("a", "abc")?;
124 ///
125 /// let prev = partition.fetch_update("a", |_| Some(Slice::from(*b"def")))?.unwrap();
126 /// assert_eq!(b"abc", &*prev);
127 ///
128 /// let item = partition.get("a")?;
129 /// assert_eq!(Some("def".as_bytes().into()), item);
130 /// #
131 /// # Ok::<(), fjall::Error>(())
132 /// ```
133 ///
134 /// ```
135 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
136 /// # use std::sync::Arc;
137 /// #
138 /// # let folder = tempfile::tempdir()?;
139 /// # let keyspace = Config::new(folder).open_transactional()?;
140 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
141 /// partition.insert("a", "abc")?;
142 ///
143 /// let prev = partition.fetch_update("a", |_| None)?.unwrap();
144 /// assert_eq!(b"abc", &*prev);
145 ///
146 /// let item = partition.get("a")?;
147 /// assert!(item.is_none());
148 /// #
149 /// # Ok::<(), fjall::Error>(())
150 /// ```
151 ///
152 /// # Errors
153 ///
154 /// Will return `Err` if an IO error occurs.
155 #[allow(unused_mut)]
156 pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
157 &self,
158 key: K,
159 mut f: F,
160 ) -> crate::Result<Option<UserValue>> {
161 let key: UserKey = key.into();
162
163 #[cfg(feature = "single_writer_tx")]
164 {
165 let mut tx = self.keyspace.write_tx();
166
167 let prev = tx.fetch_update(self, key, f)?;
168 tx.commit()?;
169
170 Ok(prev)
171 }
172
173 #[cfg(feature = "ssi_tx")]
174 loop {
175 let mut tx = self.keyspace.write_tx()?;
176 let prev = tx.fetch_update(self, key.clone(), &mut f)?;
177 if tx.commit()?.is_ok() {
178 return Ok(prev);
179 }
180 }
181 }
182
183 /// Atomically updates an item and returns the new value.
184 ///
185 /// Returning `None` removes the item if it existed before.
186 ///
187 /// The operation will run wrapped in a transaction.
188 ///
189 /// # Note
190 ///
191 /// The provided closure can be called multiple times as this function
192 /// automatically retries on conflict. Since this is an `FnMut`, make sure
193 /// it is idempotent and will not cause side-effects.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// # use fjall::{Config, Keyspace, Slice, PartitionCreateOptions};
199 /// # use std::sync::Arc;
200 /// #
201 /// # let folder = tempfile::tempdir()?;
202 /// # let keyspace = Config::new(folder).open_transactional()?;
203 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
204 /// partition.insert("a", "abc")?;
205 ///
206 /// let updated = partition.update_fetch("a", |_| Some(Slice::from(*b"def")))?.unwrap();
207 /// assert_eq!(b"def", &*updated);
208 ///
209 /// let item = partition.get("a")?;
210 /// assert_eq!(Some("def".as_bytes().into()), item);
211 /// #
212 /// # Ok::<(), fjall::Error>(())
213 /// ```
214 ///
215 /// ```
216 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
217 /// # use std::sync::Arc;
218 /// #
219 /// # let folder = tempfile::tempdir()?;
220 /// # let keyspace = Config::new(folder).open_transactional()?;
221 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
222 /// partition.insert("a", "abc")?;
223 ///
224 /// let updated = partition.update_fetch("a", |_| None)?;
225 /// assert!(updated.is_none());
226 ///
227 /// let item = partition.get("a")?;
228 /// assert!(item.is_none());
229 /// #
230 /// # Ok::<(), fjall::Error>(())
231 /// ```
232 ///
233 /// # Errors
234 ///
235 /// Will return `Err` if an IO error occurs.
236 #[allow(unused_mut)]
237 pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
238 &self,
239 key: K,
240 mut f: F,
241 ) -> crate::Result<Option<UserValue>> {
242 let key = key.into();
243
244 #[cfg(feature = "single_writer_tx")]
245 {
246 let mut tx = self.keyspace.write_tx();
247 let updated = tx.update_fetch(self, key, f)?;
248 tx.commit()?;
249
250 Ok(updated)
251 }
252
253 #[cfg(feature = "ssi_tx")]
254 loop {
255 let mut tx = self.keyspace.write_tx()?;
256 let updated = tx.update_fetch(self, key.clone(), &mut f)?;
257 if tx.commit()?.is_ok() {
258 return Ok(updated);
259 }
260 }
261 }
262
263 /// Inserts a key-value pair into the partition.
264 ///
265 /// Keys may be up to 65536 bytes long, values up to 2^32 bytes.
266 /// Shorter keys and values result in better performance.
267 ///
268 /// If the key already exists, the item will be overwritten.
269 ///
270 /// The operation will run wrapped in a transaction.
271 ///
272 /// # Examples
273 ///
274 /// ```
275 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
276 /// #
277 /// # let folder = tempfile::tempdir()?;
278 /// # let keyspace = Config::new(folder).open_transactional()?;
279 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
280 /// partition.insert("a", "abc")?;
281 ///
282 /// assert!(!keyspace.read_tx().is_empty(&partition)?);
283 /// #
284 /// # Ok::<(), fjall::Error>(())
285 /// ```
286 ///
287 /// # Errors
288 ///
289 /// Will return `Err` if an IO error occurs.
290 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
291 &self,
292 key: K,
293 value: V,
294 ) -> crate::Result<()> {
295 #[cfg(feature = "single_writer_tx")]
296 {
297 let mut tx = self.keyspace.write_tx();
298 tx.insert(self, key, value);
299 tx.commit()?;
300 Ok(())
301 }
302
303 #[cfg(feature = "ssi_tx")]
304 {
305 let mut tx = self.keyspace.write_tx()?;
306 tx.insert(self, key, value);
307 tx.commit()?.expect("blind insert should not conflict ever");
308 Ok(())
309 }
310 }
311
312 /// Removes an item from the partition.
313 ///
314 /// The key may be up to 65536 bytes long.
315 /// Shorter keys result in better performance.
316 ///
317 /// The operation will run wrapped in a transaction.
318 ///
319 /// # Examples
320 ///
321 /// ```
322 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
323 /// #
324 /// # let folder = tempfile::tempdir()?;
325 /// # let keyspace = Config::new(folder).open_transactional()?;
326 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
327 /// partition.insert("a", "abc")?;
328 /// assert!(!keyspace.read_tx().is_empty(&partition)?);
329 ///
330 /// partition.remove("a")?;
331 /// assert!(keyspace.read_tx().is_empty(&partition)?);
332 /// #
333 /// # Ok::<(), fjall::Error>(())
334 /// ```
335 ///
336 /// # Errors
337 ///
338 /// Will return `Err` if an IO error occurs.
339 pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
340 #[cfg(feature = "single_writer_tx")]
341 {
342 let mut tx = self.keyspace.write_tx();
343 tx.remove(self, key);
344 tx.commit()?;
345 Ok(())
346 }
347
348 #[cfg(feature = "ssi_tx")]
349 {
350 let mut tx = self.keyspace.write_tx()?;
351 tx.remove(self, key);
352 tx.commit()?.expect("blind remove should not conflict ever");
353 Ok(())
354 }
355 }
356
357 /// Removes an item from the partition, leaving behind a weak tombstone.
358 ///
359 /// The tombstone marker of this delete operation will vanish when it
360 /// collides with its corresponding insertion.
361 /// This may cause older versions of the value to be resurrected, so it should
362 /// only be used and preferred in scenarios where a key is only ever written once.
363 ///
364 /// The key may be up to 65536 bytes long.
365 /// Shorter keys result in better performance.
366 ///
367 /// The operation will run wrapped in a transaction.
368 ///
369 /// # Experimental
370 ///
371 /// This function is currently experimental.
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
377 /// #
378 /// # let folder = tempfile::tempdir()?;
379 /// # let keyspace = Config::new(folder).open_transactional()?;
380 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
381 /// partition.insert("a", "abc")?;
382 /// assert!(!keyspace.read_tx().is_empty(&partition)?);
383 ///
384 /// partition.remove_weak("a")?;
385 /// assert!(keyspace.read_tx().is_empty(&partition)?);
386 /// #
387 /// # Ok::<(), fjall::Error>(())
388 /// ```
389 ///
390 /// # Errors
391 ///
392 /// Will return `Err` if an IO error occurs.
393 #[doc(hidden)]
394 pub fn remove_weak<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
395 #[cfg(feature = "single_writer_tx")]
396 {
397 let mut tx = self.keyspace.write_tx();
398 tx.remove_weak(self, key);
399 tx.commit()?;
400 Ok(())
401 }
402
403 #[cfg(feature = "ssi_tx")]
404 {
405 let mut tx = self.keyspace.write_tx()?;
406 tx.remove_weak(self, key);
407 tx.commit()?.expect("blind remove should not conflict ever");
408 Ok(())
409 }
410 }
411
412 /// Retrieves an item from the partition.
413 ///
414 /// The operation will run wrapped in a read snapshot.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
420 /// #
421 /// # let folder = tempfile::tempdir()?;
422 /// # let keyspace = Config::new(folder).open_transactional()?;
423 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
424 /// partition.insert("a", "my_value")?;
425 ///
426 /// let item = partition.get("a")?;
427 /// assert_eq!(Some("my_value".as_bytes().into()), item);
428 /// #
429 /// # Ok::<(), fjall::Error>(())
430 /// ```
431 ///
432 /// # Errors
433 ///
434 /// Will return `Err` if an IO error occurs.
435 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<lsm_tree::UserValue>> {
436 self.inner.get(key)
437 }
438
439 /// Retrieves the size of an item from the partition.
440 ///
441 /// The operation will run wrapped in a read snapshot.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
447 /// #
448 /// # let folder = tempfile::tempdir()?;
449 /// # let keyspace = Config::new(folder).open_transactional()?;
450 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
451 /// partition.insert("a", "my_value")?;
452 ///
453 /// let len = partition.size_of("a")?.unwrap_or_default();
454 /// assert_eq!("my_value".len() as u32, len);
455 /// #
456 /// # Ok::<(), fjall::Error>(())
457 /// ```
458 ///
459 /// # Errors
460 ///
461 /// Will return `Err` if an IO error occurs.
462 pub fn size_of<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<u32>> {
463 self.inner.size_of(key)
464 }
465
466 /// Returns the first key-value pair in the partition.
467 /// The key in this pair is the minimum key in the partition.
468 ///
469 /// The operation will run wrapped in a read snapshot.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
475 /// #
476 /// # let folder = tempfile::tempdir()?;
477 /// # let keyspace = Config::new(folder).open_transactional()?;
478 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
479 /// partition.insert("a", "my_value")?;
480 /// partition.insert("b", "my_value")?;
481 ///
482 /// assert_eq!(b"a", &*partition.first_key_value()?.unwrap().0);
483 /// #
484 /// # Ok::<(), fjall::Error>(())
485 /// ```
486 ///
487 /// # Errors
488 ///
489 /// Will return `Err` if an IO error occurs.
490 pub fn first_key_value(&self) -> crate::Result<Option<KvPair>> {
491 let read_tx = self.keyspace.read_tx();
492 read_tx.first_key_value(self)
493 }
494
495 /// Returns the last key-value pair in the partition.
496 /// The key in this pair is the maximum key in the partition.
497 ///
498 /// The operation will run wrapped in a read snapshot.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
504 /// #
505 /// # let folder = tempfile::tempdir()?;
506 /// # let keyspace = Config::new(folder).open_transactional()?;
507 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
508 /// partition.insert("a", "my_value")?;
509 /// partition.insert("b", "my_value")?;
510 ///
511 /// assert_eq!(b"b", &*partition.last_key_value()?.unwrap().0);
512 /// #
513 /// # Ok::<(), fjall::Error>(())
514 /// ```
515 ///
516 /// # Errors
517 ///
518 /// Will return `Err` if an IO error occurs.
519 pub fn last_key_value(&self) -> crate::Result<Option<KvPair>> {
520 let read_tx = self.keyspace.read_tx();
521 read_tx.last_key_value(self)
522 }
523
524 /// Returns `true` if the partition contains the specified key.
525 ///
526 /// The operation will run wrapped in a read snapshot.
527 ///
528 /// # Examples
529 ///
530 /// ```
531 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
532 /// #
533 /// # let folder = tempfile::tempdir()?;
534 /// # let keyspace = Config::new(folder).open_transactional()?;
535 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
536 /// partition.insert("a", "my_value")?;
537 ///
538 /// assert!(partition.contains_key("a")?);
539 /// assert!(!partition.contains_key("b")?);
540 /// #
541 /// # Ok::<(), fjall::Error>(())
542 /// ```
543 ///
544 /// # Errors
545 ///
546 /// Will return `Err` if an IO error occurs.
547 pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
548 self.inner.contains_key(key)
549 }
550
551 /// Allows access to the inner partition handle, allowing to
552 /// escape from the transactional context.
553 #[doc(hidden)]
554 #[must_use]
555 pub fn inner(&self) -> &PartitionHandle {
556 &self.inner
557 }
558}