fjall/partition/mod.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod name;
6pub mod options;
7mod write_delay;
8
9use crate::{
10 batch::PartitionKey,
11 compaction::manager::CompactionManager,
12 config::Config as KeyspaceConfig,
13 file::{LSM_MANIFEST_FILE, PARTITIONS_FOLDER, PARTITION_CONFIG_FILE, PARTITION_DELETED_MARKER},
14 flush::manager::{FlushManager, Task as FlushTask},
15 gc::GarbageCollection,
16 journal::{
17 manager::{EvictionWatermark, JournalManager},
18 Journal,
19 },
20 keyspace::Partitions,
21 snapshot_nonce::SnapshotNonce,
22 snapshot_tracker::SnapshotTracker,
23 stats::Stats,
24 write_buffer_manager::WriteBufferManager,
25 Error, Keyspace,
26};
27use lsm_tree::{
28 gc::Report as GcReport, AbstractTree, AnyTree, KvPair, SequenceNumberCounter, UserKey,
29 UserValue,
30};
31use options::CreateOptions;
32use std::{
33 fs::File,
34 ops::RangeBounds,
35 path::Path,
36 sync::{
37 atomic::{AtomicBool, AtomicUsize},
38 Arc, RwLock,
39 },
40 time::{Duration, Instant},
41};
42use std_semaphore::Semaphore;
43use write_delay::get_write_delay;
44
45#[allow(clippy::module_name_repetitions)]
46pub struct PartitionHandleInner {
47 // Internal
48 //
49 /// Partition name
50 pub name: PartitionKey,
51
52 // Partition configuration
53 #[doc(hidden)]
54 pub config: CreateOptions,
55
56 /// If `true`, the partition is marked as deleted
57 pub(crate) is_deleted: AtomicBool,
58
59 /// If `true`, fsync failed during persisting, see `Error::Poisoned`
60 pub(crate) is_poisoned: Arc<AtomicBool>,
61
62 /// LSM-tree wrapper
63 #[doc(hidden)]
64 pub tree: AnyTree,
65
66 // Keyspace stuff
67 //
68 /// Config of keyspace
69 pub(crate) keyspace_config: KeyspaceConfig,
70
71 /// Flush manager of keyspace
72 pub(crate) flush_manager: Arc<RwLock<FlushManager>>,
73
74 /// Journal manager of keyspace
75 pub(crate) journal_manager: Arc<RwLock<JournalManager>>,
76
77 /// Compaction manager of keyspace
78 pub(crate) compaction_manager: CompactionManager,
79
80 /// Write buffer manager of keyspace
81 pub(crate) write_buffer_manager: WriteBufferManager,
82
83 // TODO: notifying flush worker should probably become a method in FlushManager
84 /// Flush semaphore of keyspace
85 pub(crate) flush_semaphore: Arc<Semaphore>,
86
87 /// Journal of keyspace
88 pub(crate) journal: Arc<Journal>,
89
90 /// Partition map of keyspace
91 pub(crate) partitions: Arc<RwLock<Partitions>>,
92
93 /// Sequence number generator of keyspace
94 #[doc(hidden)]
95 pub seqno: SequenceNumberCounter,
96
97 /// Visible sequence number of keyspace
98 #[doc(hidden)]
99 pub visible_seqno: SequenceNumberCounter,
100
101 /// Snapshot tracker
102 pub(crate) snapshot_tracker: SnapshotTracker,
103
104 pub(crate) stats: Arc<Stats>,
105
106 /// Number of completed memtable flushes in this partition
107 pub(crate) flushes_completed: AtomicUsize,
108}
109
110impl Drop for PartitionHandleInner {
111 fn drop(&mut self) {
112 log::trace!("Dropping partition inner: {:?}", self.name);
113
114 if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) {
115 let path = &self.tree.tree_config().path;
116
117 // IMPORTANT: First, delete the manifest,
118 // once that is deleted, the partition is treated as uninitialized
119 // even if the .deleted marker is removed
120 //
121 // This is important, because if somehow `remove_dir_all` ends up
122 // deleting the `.deleted` marker first, we would end up resurrecting
123 // the partition
124 let manifest_file = path.join(LSM_MANIFEST_FILE);
125
126 // TODO: use https://github.com/rust-lang/rust/issues/31436 if stable
127 #[allow(clippy::collapsible_else_if)]
128 match manifest_file.try_exists() {
129 Ok(exists) => {
130 if exists {
131 if let Err(e) = std::fs::remove_file(manifest_file) {
132 log::error!("Failed to cleanup partition manifest at {path:?}: {e}");
133 } else {
134 if let Err(e) = std::fs::remove_dir_all(path) {
135 log::error!(
136 "Failed to cleanup deleted partition's folder at {path:?}: {e}"
137 );
138 }
139 }
140 }
141 }
142 Err(e) => {
143 log::error!("Failed to cleanup partition manifest at {path:?}: {e}");
144 }
145 }
146 }
147
148 #[cfg(feature = "__internal_whitebox")]
149 crate::drop::decrement_drop_counter();
150 }
151}
152
153/// Access to a keyspace partition
154///
155/// Each partition is backed by an LSM-tree to provide a
156/// disk-backed search tree, and can be configured individually.
157///
158/// A partition generally only takes a little bit of memory and disk space,
159/// but does not spawn its own background threads.
160#[derive(Clone)]
161#[allow(clippy::module_name_repetitions)]
162#[doc(alias = "column family")]
163#[doc(alias = "locality group")]
164#[doc(alias = "table")]
165pub struct PartitionHandle(pub(crate) Arc<PartitionHandleInner>);
166
167impl std::ops::Deref for PartitionHandle {
168 type Target = PartitionHandleInner;
169
170 fn deref(&self) -> &Self::Target {
171 &self.0
172 }
173}
174
175impl PartialEq for PartitionHandle {
176 fn eq(&self, other: &Self) -> bool {
177 self.name == other.name
178 }
179}
180
181impl Eq for PartitionHandle {}
182
183impl std::hash::Hash for PartitionHandle {
184 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
185 state.write(self.name.as_bytes());
186 }
187}
188
189impl GarbageCollection for PartitionHandle {
190 fn gc_scan(&self) -> crate::Result<GcReport> {
191 let _nonce = SnapshotNonce::new(self.seqno.get(), self.snapshot_tracker.clone());
192 crate::gc::GarbageCollector::scan(self)
193 }
194
195 fn gc_with_space_amp_target(&self, factor: f32) -> crate::Result<u64> {
196 let start = Instant::now();
197
198 let result = crate::gc::GarbageCollector::with_space_amp_target(self, factor);
199
200 #[allow(clippy::cast_possible_truncation)]
201 self.stats.time_gc.fetch_add(
202 start.elapsed().as_micros() as u64,
203 std::sync::atomic::Ordering::Relaxed,
204 );
205
206 result
207 }
208
209 fn gc_with_staleness_threshold(&self, threshold: f32) -> crate::Result<u64> {
210 let start = Instant::now();
211
212 let result = crate::gc::GarbageCollector::with_staleness_threshold(self, threshold);
213
214 #[allow(clippy::cast_possible_truncation)]
215 self.stats.time_gc.fetch_add(
216 start.elapsed().as_micros() as u64,
217 std::sync::atomic::Ordering::Relaxed,
218 );
219
220 result
221 }
222
223 fn gc_drop_stale_segments(&self) -> crate::Result<u64> {
224 crate::gc::GarbageCollector::drop_stale_segments(self)
225 }
226}
227
228impl PartitionHandle {
229 /// Ingests a sorted stream of key-value pairs into the partition.
230 ///
231 /// Can only be called on a new fresh, empty partition.
232 ///
233 /// # Errors
234 ///
235 /// Will return `Err` if an IO error occurs.
236 ///
237 /// # Panics
238 ///
239 /// Panics if the partition is **not** initially empty.
240 ///
241 /// Will panic if the input iterator is not sorted in ascending order.
242 pub fn ingest<K: Into<UserKey>, V: Into<UserValue>>(
243 &self,
244 iter: impl Iterator<Item = (K, V)>,
245 ) -> crate::Result<()> {
246 self.tree
247 .ingest(iter.map(|(k, v)| (k.into(), v.into())))
248 .map_err(Into::into)
249 }
250
251 pub(crate) fn from_keyspace(
252 keyspace: &Keyspace,
253 tree: AnyTree,
254 name: PartitionKey,
255 config: CreateOptions,
256 ) -> Self {
257 Self(Arc::new(PartitionHandleInner {
258 name,
259 tree,
260 partitions: keyspace.partitions.clone(),
261 keyspace_config: keyspace.config.clone(),
262 flush_manager: keyspace.flush_manager.clone(),
263 flush_semaphore: keyspace.flush_semaphore.clone(),
264 flushes_completed: AtomicUsize::new(0),
265 journal_manager: keyspace.journal_manager.clone(),
266 journal: keyspace.journal.clone(),
267 compaction_manager: keyspace.compaction_manager.clone(),
268 seqno: keyspace.seqno.clone(),
269 visible_seqno: keyspace.visible_seqno.clone(),
270 write_buffer_manager: keyspace.write_buffer_manager.clone(),
271 is_deleted: AtomicBool::default(),
272 is_poisoned: keyspace.is_poisoned.clone(),
273 snapshot_tracker: keyspace.snapshot_tracker.clone(),
274 config,
275 stats: keyspace.stats.clone(),
276 }))
277 }
278
279 /// Creates a new partition.
280 pub(crate) fn create_new(
281 keyspace: &Keyspace,
282 name: PartitionKey,
283 config: CreateOptions,
284 ) -> crate::Result<Self> {
285 use lsm_tree::coding::Encode;
286
287 log::debug!("Creating partition {name:?}");
288
289 let base_folder = keyspace.config.path.join(PARTITIONS_FOLDER).join(&*name);
290
291 if base_folder.join(PARTITION_DELETED_MARKER).try_exists()? {
292 log::error!("Failed to open partition, partition is deleted.");
293 return Err(Error::PartitionDeleted);
294 }
295
296 std::fs::create_dir_all(&base_folder)?;
297
298 // Write config
299 let mut file = File::create(base_folder.join(PARTITION_CONFIG_FILE))?;
300 config.encode_into(&mut file)?;
301 file.sync_all()?;
302
303 let mut base_config = lsm_tree::Config::new(base_folder)
304 .descriptor_table(keyspace.config.descriptor_table.clone())
305 .use_cache(keyspace.config.cache.clone())
306 .data_block_size(config.data_block_size)
307 .index_block_size(config.index_block_size)
308 .level_count(config.level_count)
309 .compression(config.compression)
310 .bloom_bits_per_key(config.bloom_bits_per_key);
311
312 if let Some(kv_opts) = &config.kv_separation {
313 base_config = base_config
314 .blob_compression(kv_opts.compression)
315 .blob_file_separation_threshold(kv_opts.separation_threshold)
316 .blob_file_target_size(kv_opts.file_target_size);
317 }
318
319 let tree = match config.tree_type {
320 lsm_tree::TreeType::Standard => AnyTree::Standard(base_config.open()?),
321 lsm_tree::TreeType::Blob => AnyTree::Blob(base_config.open_as_blob_tree()?),
322 };
323
324 Ok(Self(Arc::new(PartitionHandleInner {
325 name,
326 config,
327 partitions: keyspace.partitions.clone(),
328 keyspace_config: keyspace.config.clone(),
329 flush_manager: keyspace.flush_manager.clone(),
330 flush_semaphore: keyspace.flush_semaphore.clone(),
331 flushes_completed: AtomicUsize::new(0),
332 journal_manager: keyspace.journal_manager.clone(),
333 journal: keyspace.journal.clone(),
334 compaction_manager: keyspace.compaction_manager.clone(),
335 seqno: keyspace.seqno.clone(),
336 visible_seqno: keyspace.visible_seqno.clone(),
337 tree,
338 write_buffer_manager: keyspace.write_buffer_manager.clone(),
339 is_deleted: AtomicBool::default(),
340 is_poisoned: keyspace.is_poisoned.clone(),
341 snapshot_tracker: keyspace.snapshot_tracker.clone(),
342 stats: keyspace.stats.clone(),
343 })))
344 }
345
346 /// Returns the underlying LSM-tree's path.
347 #[must_use]
348 pub fn path(&self) -> &Path {
349 self.tree.tree_config().path.as_path()
350 }
351
352 /// Returns the disk space usage of this partition.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
358 /// #
359 /// # let folder = tempfile::tempdir()?;
360 /// # let keyspace = Config::new(folder).open()?;
361 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
362 /// assert_eq!(0, partition.disk_space());
363 /// #
364 /// # Ok::<(), fjall::Error>(())
365 /// ```
366 #[must_use]
367 pub fn disk_space(&self) -> u64 {
368 self.tree.disk_space()
369 }
370
371 /// Returns an iterator that scans through the entire partition.
372 ///
373 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
379 /// #
380 /// # let folder = tempfile::tempdir()?;
381 /// # let keyspace = Config::new(folder).open()?;
382 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
383 /// partition.insert("a", "abc")?;
384 /// partition.insert("f", "abc")?;
385 /// partition.insert("g", "abc")?;
386 /// assert_eq!(3, partition.iter().count());
387 /// #
388 /// # Ok::<(), fjall::Error>(())
389 /// ```
390 #[must_use]
391 pub fn iter(&self) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
392 self.tree
393 .iter(None, None)
394 .map(|item| item.map_err(Into::into))
395 }
396
397 /// Returns an iterator that scans through the entire partition, returning only keys.
398 ///
399 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
400 #[must_use]
401 pub fn keys(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
402 self.tree
403 .keys(None, None)
404 .map(|item| item.map_err(Into::into))
405 }
406
407 /// Returns an iterator that scans through the entire partition, returning only values.
408 ///
409 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
410 #[must_use]
411 pub fn values(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
412 self.tree
413 .values(None, None)
414 .map(|item| item.map_err(Into::into))
415 }
416
417 /// Returns an iterator over a range of items.
418 ///
419 /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
425 /// #
426 /// # let folder = tempfile::tempdir()?;
427 /// # let keyspace = Config::new(folder).open()?;
428 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
429 /// partition.insert("a", "abc")?;
430 /// partition.insert("f", "abc")?;
431 /// partition.insert("g", "abc")?;
432 /// assert_eq!(2, partition.range("a"..="f").count());
433 /// #
434 /// # Ok::<(), fjall::Error>(())
435 /// ```
436 pub fn range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
437 &'a self,
438 range: R,
439 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
440 self.tree
441 .range(range, None, None)
442 .map(|item| item.map_err(Into::into))
443 }
444
445 /// Returns an iterator over a prefixed set of items.
446 ///
447 /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
448 ///
449 /// # Examples
450 ///
451 /// ```
452 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
453 /// #
454 /// # let folder = tempfile::tempdir()?;
455 /// # let keyspace = Config::new(folder).open()?;
456 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
457 /// partition.insert("a", "abc")?;
458 /// partition.insert("ab", "abc")?;
459 /// partition.insert("abc", "abc")?;
460 /// assert_eq!(2, partition.prefix("ab").count());
461 /// #
462 /// # Ok::<(), fjall::Error>(())
463 /// ```
464 pub fn prefix<'a, K: AsRef<[u8]> + 'a>(
465 &'a self,
466 prefix: K,
467 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
468 self.tree
469 .prefix(prefix, None, None)
470 .map(|item| item.map_err(Into::into))
471 }
472
473 /// Approximates the amount of items in the partition.
474 ///
475 /// For update- or delete-heavy workloads, this value will
476 /// diverge from the real value, but is a O(1) operation.
477 ///
478 /// For insert-only workloads (e.g. logs, time series)
479 /// this value is reliable.
480 ///
481 /// # Examples
482 ///
483 /// ```
484 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
485 /// #
486 /// # let folder = tempfile::tempdir()?;
487 /// # let keyspace = Config::new(folder).open()?;
488 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
489 /// assert_eq!(partition.approximate_len(), 0);
490 ///
491 /// partition.insert("1", "abc")?;
492 /// assert_eq!(partition.approximate_len(), 1);
493 ///
494 /// partition.remove("1")?;
495 /// // Oops! approximate_len will not be reliable here
496 /// assert_eq!(partition.approximate_len(), 2);
497 /// #
498 /// # Ok::<(), fjall::Error>(())
499 /// ```
500 #[must_use]
501 pub fn approximate_len(&self) -> usize {
502 self.tree.approximate_len()
503 }
504
505 /// Scans the entire partition, returning the amount of items.
506 ///
507 /// ###### Caution
508 ///
509 /// This operation scans the entire partition: O(n) complexity!
510 ///
511 /// Never, under any circumstances, use .`len()` == 0 to check
512 /// if the partition is empty, use [`PartitionHandle::is_empty`] instead.
513 ///
514 /// If you want an estimate, use [`PartitionHandle::approximate_len`] instead.
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
520 /// #
521 /// # let folder = tempfile::tempdir()?;
522 /// # let keyspace = Config::new(folder).open()?;
523 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
524 /// assert_eq!(partition.len()?, 0);
525 ///
526 /// partition.insert("1", "abc")?;
527 /// partition.insert("3", "abc")?;
528 /// partition.insert("5", "abc")?;
529 /// assert_eq!(partition.len()?, 3);
530 /// #
531 /// # Ok::<(), fjall::Error>(())
532 /// ```
533 ///
534 /// # Errors
535 ///
536 /// Will return `Err` if an IO error occurs.
537 pub fn len(&self) -> crate::Result<usize> {
538 let mut count = 0;
539
540 for kv in self.iter() {
541 let _ = kv?;
542 count += 1;
543 }
544
545 Ok(count)
546 }
547
548 /// Returns `true` if the partition is empty.
549 ///
550 /// This operation has O(1) complexity.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
556 /// #
557 /// # let folder = tempfile::tempdir()?;
558 /// # let keyspace = Config::new(folder).open()?;
559 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
560 /// assert!(partition.is_empty()?);
561 ///
562 /// partition.insert("a", "abc")?;
563 /// assert!(!partition.is_empty()?);
564 /// #
565 /// # Ok::<(), fjall::Error>(())
566 /// ```
567 ///
568 /// # Errors
569 ///
570 /// Will return `Err` if an IO error occurs.
571 pub fn is_empty(&self) -> crate::Result<bool> {
572 self.first_key_value().map(|x| x.is_none())
573 }
574
575 /// Returns `true` if the partition contains the specified key.
576 ///
577 /// # Examples
578 ///
579 /// ```
580 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
581 /// #
582 /// # let folder = tempfile::tempdir()?;
583 /// # let keyspace = Config::new(folder).open()?;
584 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
585 /// assert!(!partition.contains_key("a")?);
586 ///
587 /// partition.insert("a", "abc")?;
588 /// assert!(partition.contains_key("a")?);
589 /// #
590 /// # Ok::<(), fjall::Error>(())
591 /// ```
592 ///
593 /// # Errors
594 ///
595 /// Will return `Err` if an IO error occurs.
596 pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
597 self.tree.contains_key(key, None).map_err(Into::into)
598 }
599
600 /// Retrieves an item from the partition.
601 ///
602 /// # Examples
603 ///
604 /// ```
605 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
606 /// #
607 /// # let folder = tempfile::tempdir()?;
608 /// # let keyspace = Config::new(folder).open()?;
609 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
610 /// partition.insert("a", "my_value")?;
611 ///
612 /// let item = partition.get("a")?;
613 /// assert_eq!(Some("my_value".as_bytes().into()), item);
614 /// #
615 /// # Ok::<(), fjall::Error>(())
616 /// ```
617 ///
618 /// # Errors
619 ///
620 /// Will return `Err` if an IO error occurs.
621 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<lsm_tree::UserValue>> {
622 Ok(self.tree.get(key, None)?)
623 }
624
625 /// Retrieves the size of an item from the partition.
626 ///
627 /// # Examples
628 ///
629 /// ```
630 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
631 /// #
632 /// # let folder = tempfile::tempdir()?;
633 /// # let keyspace = Config::new(folder).open()?;
634 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
635 /// partition.insert("a", "my_value")?;
636 ///
637 /// let len = partition.size_of("a")?.unwrap_or_default();
638 /// assert_eq!("my_value".len() as u32, len);
639 /// #
640 /// # Ok::<(), fjall::Error>(())
641 /// ```
642 ///
643 /// # Errors
644 ///
645 /// Will return `Err` if an IO error occurs.
646 pub fn size_of<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<u32>> {
647 Ok(self.tree.size_of(key, None)?)
648 }
649
650 /// Returns the first key-value pair in the partition.
651 /// The key in this pair is the minimum key in the partition.
652 ///
653 /// # Examples
654 ///
655 /// ```
656 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
657 /// #
658 /// # let folder = tempfile::tempdir()?;
659 /// # let keyspace = Config::new(folder).open()?;
660 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
661 /// partition.insert("1", "abc")?;
662 /// partition.insert("3", "abc")?;
663 /// partition.insert("5", "abc")?;
664 ///
665 /// let (key, _) = partition.first_key_value()?.expect("item should exist");
666 /// assert_eq!(&*key, "1".as_bytes());
667 /// #
668 /// # Ok::<(), fjall::Error>(())
669 /// ```
670 ///
671 /// # Errors
672 ///
673 /// Will return `Err` if an IO error occurs.
674 pub fn first_key_value(&self) -> crate::Result<Option<KvPair>> {
675 Ok(self.tree.first_key_value(None, None)?)
676 }
677
678 /// Returns the last key-value pair in the partition.
679 /// The key in this pair is the maximum key in the partition.
680 ///
681 /// # Examples
682 ///
683 /// ```
684 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
685 /// #
686 /// # let folder = tempfile::tempdir()?;
687 /// # let keyspace = Config::new(folder).open()?;
688 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
689 /// partition.insert("1", "abc")?;
690 /// partition.insert("3", "abc")?;
691 /// partition.insert("5", "abc")?;
692 ///
693 /// let (key, _) = partition.last_key_value()?.expect("item should exist");
694 /// assert_eq!(&*key, "5".as_bytes());
695 /// #
696 /// # Ok::<(), fjall::Error>(())
697 /// ```
698 ///
699 /// # Errors
700 ///
701 /// Will return `Err` if an IO error occurs.
702 pub fn last_key_value(&self) -> crate::Result<Option<KvPair>> {
703 Ok(self.tree.last_key_value(None, None)?)
704 }
705
706 /// Returns `true` if the underlying LSM-tree is key-value-separated.
707 ///
708 /// See [`CreateOptions::with_kv_separation`] for more information.
709 ///
710 /// # Examples
711 ///
712 /// ```
713 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
714 /// #
715 /// # let folder = tempfile::tempdir()?;
716 /// # let keyspace = Config::new(folder).open()?;
717 /// let tree1 = keyspace.open_partition("default", PartitionCreateOptions::default())?;
718 /// assert!(!tree1.is_kv_separated());
719 ///
720 /// let blob_cfg = PartitionCreateOptions::default().with_kv_separation(Default::default());
721 /// let tree2 = keyspace.open_partition("blobs", blob_cfg)?;
722 /// assert!(tree2.is_kv_separated());
723 /// #
724 /// # Ok::<(), fjall::Error>(())
725 /// ```
726 #[must_use]
727 pub fn is_kv_separated(&self) -> bool {
728 matches!(self.tree, crate::AnyTree::Blob(_))
729 }
730
731 // NOTE: Used in tests
732 #[doc(hidden)]
733 pub fn rotate_memtable_and_wait(&self) -> crate::Result<()> {
734 if self.rotate_memtable()? {
735 while !self
736 .flush_manager
737 .read()
738 .expect("lock is poisoned")
739 .is_empty()
740 {
741 std::thread::sleep(std::time::Duration::from_millis(10));
742 }
743 }
744 Ok(())
745 }
746
747 /// Returns `true` if the memtable was indeed rotated.
748 #[doc(hidden)]
749 pub fn rotate_memtable(&self) -> crate::Result<bool> {
750 log::debug!("Rotating memtable {:?}", self.name);
751
752 log::trace!("partition: acquiring journal lock");
753 let mut journal = self.journal.get_writer();
754
755 // Rotate memtable
756 let Some((yanked_id, yanked_memtable)) = self.tree.rotate_memtable() else {
757 log::debug!("Got no sealed memtable, someone beat us to it");
758 return Ok(false);
759 };
760
761 log::trace!("partition: acquiring journal manager lock");
762 let mut journal_manager = self.journal_manager.write().expect("lock is poisoned");
763
764 let seqno_map = {
765 let partitions = self.partitions.write().expect("lock is poisoned");
766
767 let mut seqnos = Vec::with_capacity(partitions.len());
768
769 for partition in partitions.values() {
770 if let Some(lsn) = partition.tree.get_highest_memtable_seqno() {
771 seqnos.push(EvictionWatermark {
772 lsn,
773 partition: partition.clone(),
774 });
775 }
776 }
777
778 seqnos
779 };
780
781 journal_manager.rotate_journal(&mut journal, seqno_map)?;
782
783 log::trace!("partition: acquiring flush manager lock");
784 let mut flush_manager = self.flush_manager.write().expect("lock is poisoned");
785
786 flush_manager.enqueue_task(
787 self.name.clone(),
788 FlushTask {
789 id: yanked_id,
790 partition: self.clone(),
791 sealed_memtable: yanked_memtable,
792 },
793 );
794
795 self.stats
796 .flushes_enqueued
797 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
798
799 drop(flush_manager);
800 drop(journal_manager);
801 drop(journal);
802
803 // Notify flush worker that new work has arrived
804 self.flush_semaphore.release();
805
806 Ok(true)
807 }
808
809 fn check_journal_size(&self) {
810 loop {
811 let bytes = self
812 .journal_manager
813 .read()
814 .expect("lock is poisoned")
815 .disk_space_used();
816
817 if bytes <= self.keyspace_config.max_journaling_size_in_bytes {
818 if bytes as f64 > self.keyspace_config.max_journaling_size_in_bytes as f64 * 0.9 {
819 log::info!(
820 "partition: write stall because 90% journal threshold has been reached"
821 );
822 std::thread::sleep(std::time::Duration::from_millis(500));
823 }
824
825 break;
826 }
827
828 log::info!(
829 "Write stall in partition {} because journal is too large",
830 self.name
831 );
832 std::thread::sleep(std::time::Duration::from_millis(100)); // TODO: maybe exponential backoff
833 }
834 }
835
836 fn check_write_stall(&self) {
837 let l0_run_count = self.tree.l0_run_count();
838
839 if l0_run_count >= 20 {
840 let sleep_us = get_write_delay(l0_run_count);
841
842 if sleep_us > 0 {
843 log::info!(
844 "Stalling writes by {sleep_us}µs in partition {} due to many segments in L0...",
845 self.name
846 );
847 self.compaction_manager.notify(self.clone());
848 std::thread::sleep(Duration::from_micros(sleep_us));
849 }
850 }
851 }
852
853 fn check_write_halt(&self) {
854 while self.tree.l0_run_count() >= 32 {
855 log::info!(
856 "Halting writes in partition {} until L0 is cleared up...",
857 self.name
858 );
859 self.compaction_manager.notify(self.clone());
860 std::thread::sleep(Duration::from_millis(10));
861 }
862 }
863
864 pub(crate) fn check_memtable_overflow(&self, size: u32) -> crate::Result<()> {
865 if size > self.config.max_memtable_size {
866 self.rotate_memtable().inspect_err(|_| {
867 self.is_poisoned
868 .store(true, std::sync::atomic::Ordering::Relaxed);
869 })?;
870
871 self.check_journal_size();
872 self.check_write_halt();
873 }
874
875 self.check_write_stall();
876
877 Ok(())
878 }
879
880 pub(crate) fn check_write_buffer_size(&self, initial_size: u64) {
881 let limit = self.keyspace_config.max_write_buffer_size_in_bytes;
882
883 if initial_size > limit {
884 let p90_limit = (limit as f64) * 0.9;
885
886 loop {
887 let bytes = self.write_buffer_manager.get();
888
889 if bytes < limit {
890 if bytes as f64 > p90_limit {
891 log::info!(
892 "partition: write stall because 90% write buffer threshold has been reached"
893 );
894 std::thread::sleep(std::time::Duration::from_millis(100));
895 }
896 break;
897 }
898
899 log::info!(
900 "Write stall in partition {} because of write buffer saturation",
901 self.name
902 );
903 std::thread::sleep(std::time::Duration::from_millis(10));
904 }
905 }
906 }
907
908 /// Number of disk segments (a.k.a. SST files) in the LSM-tree.
909 #[doc(hidden)]
910 #[must_use]
911 pub fn segment_count(&self) -> usize {
912 self.tree.segment_count()
913 }
914
915 /// Number of blob files in the LSM-tree.
916 #[doc(hidden)]
917 #[must_use]
918 pub fn blob_file_count(&self) -> usize {
919 self.tree.blob_file_count()
920 }
921
922 /// Number of completed memtable flushes in this partition.
923 #[must_use]
924 #[doc(hidden)]
925 pub fn flushes_completed(&self) -> usize {
926 self.flushes_completed
927 .load(std::sync::atomic::Ordering::Relaxed)
928 }
929
930 /// Opens a snapshot of this partition.
931 #[must_use]
932 pub fn snapshot(&self) -> crate::Snapshot {
933 self.snapshot_at(self.seqno.get())
934 }
935
936 /// Opens a snapshot of this partition with a given sequence number.
937 #[must_use]
938 pub fn snapshot_at(&self, seqno: crate::Instant) -> crate::Snapshot {
939 crate::Snapshot::new(
940 self.tree.snapshot(seqno),
941 SnapshotNonce::new(seqno, self.snapshot_tracker.clone()),
942 )
943 }
944
945 /// Performs major compaction, blocking the caller until it's done.
946 ///
947 /// # Errors
948 ///
949 /// Will return `Err` if an IO error occurs.
950 #[doc(hidden)]
951 pub fn major_compact(&self) -> crate::Result<()> {
952 match &self.config.compaction_strategy {
953 crate::compaction::Strategy::Leveled(x) => self.tree.major_compact(
954 u64::from(x.target_size),
955 self.snapshot_tracker.get_seqno_safe_to_gc(),
956 )?,
957 crate::compaction::Strategy::SizeTiered(_) => self
958 .tree
959 .major_compact(u64::MAX, self.snapshot_tracker.get_seqno_safe_to_gc())?,
960 crate::compaction::Strategy::Fifo(_) => {
961 log::warn!("Major compaction not supported for FIFO strategy");
962 }
963 }
964 Ok(())
965 }
966
967 /// Inserts a key-value pair into the partition.
968 ///
969 /// Keys may be up to 65536 bytes long, values up to 2^32 bytes.
970 /// Shorter keys and values result in better performance.
971 ///
972 /// If the key already exists, the item will be overwritten.
973 ///
974 /// # Examples
975 ///
976 /// ```
977 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
978 /// #
979 /// # let folder = tempfile::tempdir()?;
980 /// # let keyspace = Config::new(folder).open()?;
981 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
982 /// partition.insert("a", "abc")?;
983 ///
984 /// assert!(!partition.is_empty()?);
985 /// #
986 /// # Ok::<(), fjall::Error>(())
987 /// ```
988 ///
989 /// # Errors
990 ///
991 /// Will return `Err` if an IO error occurs.
992 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
993 &self,
994 key: K,
995 value: V,
996 ) -> crate::Result<()> {
997 use std::sync::atomic::Ordering;
998
999 if self.is_deleted.load(Ordering::Relaxed) {
1000 return Err(crate::Error::PartitionDeleted);
1001 }
1002
1003 let key = key.into();
1004 let value = value.into();
1005
1006 let mut journal_writer = self.journal.get_writer();
1007
1008 let seqno = self.seqno.next();
1009
1010 // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1011 if self.is_poisoned.load(Ordering::Relaxed) {
1012 return Err(crate::Error::Poisoned);
1013 }
1014
1015 journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;
1016
1017 if !self.config.manual_journal_persist {
1018 journal_writer
1019 .persist(crate::PersistMode::Buffer)
1020 .map_err(|e| {
1021 log::error!("persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}");
1022 self.is_poisoned.store(true, Ordering::Relaxed);
1023 e
1024 })?;
1025 }
1026
1027 let (item_size, memtable_size) = self.tree.insert(key, value, seqno);
1028
1029 self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1030
1031 drop(journal_writer);
1032
1033 let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1034
1035 self.check_memtable_overflow(memtable_size)?;
1036
1037 self.check_write_buffer_size(write_buffer_size);
1038
1039 Ok(())
1040 }
1041
1042 /// Removes an item from the partition.
1043 ///
1044 /// The key may be up to 65536 bytes long.
1045 /// Shorter keys result in better performance.
1046 ///
1047 /// # Examples
1048 ///
1049 /// ```
1050 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
1051 /// #
1052 /// # let folder = tempfile::tempdir()?;
1053 /// # let keyspace = Config::new(folder).open()?;
1054 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
1055 /// partition.insert("a", "abc")?;
1056 ///
1057 /// let item = partition.get("a")?.expect("should have item");
1058 /// assert_eq!("abc".as_bytes(), &*item);
1059 ///
1060 /// partition.remove("a")?;
1061 ///
1062 /// let item = partition.get("a")?;
1063 /// assert_eq!(None, item);
1064 /// #
1065 /// # Ok::<(), fjall::Error>(())
1066 /// ```
1067 ///
1068 /// # Errors
1069 ///
1070 /// Will return `Err` if an IO error occurs.
1071 pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
1072 use std::sync::atomic::Ordering;
1073
1074 if self.is_deleted.load(Ordering::Relaxed) {
1075 return Err(crate::Error::PartitionDeleted);
1076 }
1077
1078 let key = key.into();
1079
1080 let mut journal_writer = self.journal.get_writer();
1081
1082 let seqno = self.seqno.next();
1083
1084 // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1085 if self.is_poisoned.load(Ordering::Relaxed) {
1086 return Err(crate::Error::Poisoned);
1087 }
1088
1089 journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
1090
1091 if !self.config.manual_journal_persist {
1092 journal_writer
1093 .persist(crate::PersistMode::Buffer)
1094 .map_err(|e| {
1095 log::error!("persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}");
1096 self.is_poisoned.store(true, Ordering::Relaxed);
1097 e
1098 })?;
1099 }
1100
1101 let (item_size, memtable_size) = self.tree.remove(key, seqno);
1102
1103 self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1104
1105 drop(journal_writer);
1106
1107 let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1108
1109 self.check_memtable_overflow(memtable_size)?;
1110 self.check_write_buffer_size(write_buffer_size);
1111
1112 Ok(())
1113 }
1114
1115 /// Removes an item from the partition, leaving behind a weak tombstone.
1116 ///
1117 /// When a weak tombstone is matched with a single write in a compaction,
1118 /// the tombstone will be removed along with the value. If the key was
1119 /// overwritten the result of a `remove_weak` is undefined.
1120 ///
1121 /// Only use this remove if it is known that the key has only been written
1122 /// to once since its creation or last `remove_weak`.
1123 ///
1124 /// The key may be up to 65536 bytes long.
1125 /// Shorter keys result in better performance.
1126 ///
1127 /// # Experimental
1128 ///
1129 /// This function is currently experimental.
1130 ///
1131 /// # Examples
1132 ///
1133 /// ```
1134 /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
1135 /// #
1136 /// # let folder = tempfile::tempdir()?;
1137 /// # let keyspace = Config::new(folder).open()?;
1138 /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
1139 /// partition.insert("a", "abc")?;
1140 ///
1141 /// let item = partition.get("a")?.expect("should have item");
1142 /// assert_eq!("abc".as_bytes(), &*item);
1143 ///
1144 /// partition.remove_weak("a")?;
1145 ///
1146 /// let item = partition.get("a")?;
1147 /// assert_eq!(None, item);
1148 /// #
1149 /// # Ok::<(), fjall::Error>(())
1150 /// ```
1151 ///
1152 /// # Errors
1153 ///
1154 /// Will return `Err` if an IO error occurs.
1155 #[doc(hidden)]
1156 pub fn remove_weak<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
1157 use std::sync::atomic::Ordering;
1158
1159 if self.is_deleted.load(Ordering::Relaxed) {
1160 return Err(crate::Error::PartitionDeleted);
1161 }
1162
1163 let key = key.into();
1164
1165 let mut journal_writer = self.journal.get_writer();
1166
1167 let seqno = self.seqno.next();
1168
1169 // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1170 if self.is_poisoned.load(Ordering::Relaxed) {
1171 return Err(crate::Error::Poisoned);
1172 }
1173
1174 journal_writer.write_raw(
1175 &self.name,
1176 &key,
1177 &[],
1178 lsm_tree::ValueType::WeakTombstone,
1179 seqno,
1180 )?;
1181
1182 if !self.config.manual_journal_persist {
1183 journal_writer
1184 .persist(crate::PersistMode::Buffer)
1185 .map_err(|e| {
1186 log::error!(
1187 "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
1188 );
1189 self.is_poisoned.store(true, Ordering::Relaxed);
1190 e
1191 })?;
1192 }
1193
1194 let (item_size, memtable_size) = self.tree.remove(key, seqno);
1195
1196 self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1197
1198 drop(journal_writer);
1199
1200 let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1201
1202 self.check_memtable_overflow(memtable_size)?;
1203 self.check_write_buffer_size(write_buffer_size);
1204
1205 Ok(())
1206 }
1207}