fjall/partition/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod name;
6pub mod options;
7mod write_delay;
8
9use crate::{
10    batch::PartitionKey,
11    compaction::manager::CompactionManager,
12    config::Config as KeyspaceConfig,
13    file::{LSM_MANIFEST_FILE, PARTITIONS_FOLDER, PARTITION_CONFIG_FILE, PARTITION_DELETED_MARKER},
14    flush::manager::{FlushManager, Task as FlushTask},
15    gc::GarbageCollection,
16    journal::{
17        manager::{EvictionWatermark, JournalManager},
18        Journal,
19    },
20    keyspace::Partitions,
21    snapshot_nonce::SnapshotNonce,
22    snapshot_tracker::SnapshotTracker,
23    stats::Stats,
24    write_buffer_manager::WriteBufferManager,
25    Error, Keyspace,
26};
27use lsm_tree::{
28    gc::Report as GcReport, AbstractTree, AnyTree, KvPair, SequenceNumberCounter, UserKey,
29    UserValue,
30};
31use options::CreateOptions;
32use std::{
33    fs::File,
34    ops::RangeBounds,
35    path::Path,
36    sync::{
37        atomic::{AtomicBool, AtomicUsize},
38        Arc, RwLock,
39    },
40    time::{Duration, Instant},
41};
42use std_semaphore::Semaphore;
43use write_delay::get_write_delay;
44
45#[allow(clippy::module_name_repetitions)]
46pub struct PartitionHandleInner {
47    // Internal
48    //
49    /// Partition name
50    pub name: PartitionKey,
51
52    // Partition configuration
53    #[doc(hidden)]
54    pub config: CreateOptions,
55
56    /// If `true`, the partition is marked as deleted
57    pub(crate) is_deleted: AtomicBool,
58
59    /// If `true`, fsync failed during persisting, see `Error::Poisoned`
60    pub(crate) is_poisoned: Arc<AtomicBool>,
61
62    /// LSM-tree wrapper
63    #[doc(hidden)]
64    pub tree: AnyTree,
65
66    // Keyspace stuff
67    //
68    /// Config of keyspace
69    pub(crate) keyspace_config: KeyspaceConfig,
70
71    /// Flush manager of keyspace
72    pub(crate) flush_manager: Arc<RwLock<FlushManager>>,
73
74    /// Journal manager of keyspace
75    pub(crate) journal_manager: Arc<RwLock<JournalManager>>,
76
77    /// Compaction manager of keyspace
78    pub(crate) compaction_manager: CompactionManager,
79
80    /// Write buffer manager of keyspace
81    pub(crate) write_buffer_manager: WriteBufferManager,
82
83    // TODO: notifying flush worker should probably become a method in FlushManager
84    /// Flush semaphore of keyspace
85    pub(crate) flush_semaphore: Arc<Semaphore>,
86
87    /// Journal of keyspace
88    pub(crate) journal: Arc<Journal>,
89
90    /// Partition map of keyspace
91    pub(crate) partitions: Arc<RwLock<Partitions>>,
92
93    /// Sequence number generator of keyspace
94    #[doc(hidden)]
95    pub seqno: SequenceNumberCounter,
96
97    /// Visible sequence number of keyspace
98    #[doc(hidden)]
99    pub visible_seqno: SequenceNumberCounter,
100
101    /// Snapshot tracker
102    pub(crate) snapshot_tracker: SnapshotTracker,
103
104    pub(crate) stats: Arc<Stats>,
105
106    /// Number of completed memtable flushes in this partition
107    pub(crate) flushes_completed: AtomicUsize,
108}
109
110impl Drop for PartitionHandleInner {
111    fn drop(&mut self) {
112        log::trace!("Dropping partition inner: {:?}", self.name);
113
114        if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) {
115            let path = &self.tree.tree_config().path;
116
117            // IMPORTANT: First, delete the manifest,
118            // once that is deleted, the partition is treated as uninitialized
119            // even if the .deleted marker is removed
120            //
121            // This is important, because if somehow `remove_dir_all` ends up
122            // deleting the `.deleted` marker first, we would end up resurrecting
123            // the partition
124            let manifest_file = path.join(LSM_MANIFEST_FILE);
125
126            // TODO: use https://github.com/rust-lang/rust/issues/31436 if stable
127            #[allow(clippy::collapsible_else_if)]
128            match manifest_file.try_exists() {
129                Ok(exists) => {
130                    if exists {
131                        if let Err(e) = std::fs::remove_file(manifest_file) {
132                            log::error!("Failed to cleanup partition manifest at {path:?}: {e}");
133                        } else {
134                            if let Err(e) = std::fs::remove_dir_all(path) {
135                                log::error!(
136                                    "Failed to cleanup deleted partition's folder at {path:?}: {e}"
137                                );
138                            }
139                        }
140                    }
141                }
142                Err(e) => {
143                    log::error!("Failed to cleanup partition manifest at {path:?}: {e}");
144                }
145            }
146        }
147
148        #[cfg(feature = "__internal_whitebox")]
149        crate::drop::decrement_drop_counter();
150    }
151}
152
153/// Access to a keyspace partition
154///
155/// Each partition is backed by an LSM-tree to provide a
156/// disk-backed search tree, and can be configured individually.
157///
158/// A partition generally only takes a little bit of memory and disk space,
159/// but does not spawn its own background threads.
160#[derive(Clone)]
161#[allow(clippy::module_name_repetitions)]
162#[doc(alias = "column family")]
163#[doc(alias = "locality group")]
164#[doc(alias = "table")]
165pub struct PartitionHandle(pub(crate) Arc<PartitionHandleInner>);
166
167impl std::ops::Deref for PartitionHandle {
168    type Target = PartitionHandleInner;
169
170    fn deref(&self) -> &Self::Target {
171        &self.0
172    }
173}
174
175impl PartialEq for PartitionHandle {
176    fn eq(&self, other: &Self) -> bool {
177        self.name == other.name
178    }
179}
180
181impl Eq for PartitionHandle {}
182
183impl std::hash::Hash for PartitionHandle {
184    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
185        state.write(self.name.as_bytes());
186    }
187}
188
189impl GarbageCollection for PartitionHandle {
190    fn gc_scan(&self) -> crate::Result<GcReport> {
191        let _nonce = SnapshotNonce::new(self.seqno.get(), self.snapshot_tracker.clone());
192        crate::gc::GarbageCollector::scan(self)
193    }
194
195    fn gc_with_space_amp_target(&self, factor: f32) -> crate::Result<u64> {
196        let start = Instant::now();
197
198        let result = crate::gc::GarbageCollector::with_space_amp_target(self, factor);
199
200        #[allow(clippy::cast_possible_truncation)]
201        self.stats.time_gc.fetch_add(
202            start.elapsed().as_micros() as u64,
203            std::sync::atomic::Ordering::Relaxed,
204        );
205
206        result
207    }
208
209    fn gc_with_staleness_threshold(&self, threshold: f32) -> crate::Result<u64> {
210        let start = Instant::now();
211
212        let result = crate::gc::GarbageCollector::with_staleness_threshold(self, threshold);
213
214        #[allow(clippy::cast_possible_truncation)]
215        self.stats.time_gc.fetch_add(
216            start.elapsed().as_micros() as u64,
217            std::sync::atomic::Ordering::Relaxed,
218        );
219
220        result
221    }
222
223    fn gc_drop_stale_segments(&self) -> crate::Result<u64> {
224        crate::gc::GarbageCollector::drop_stale_segments(self)
225    }
226}
227
228impl PartitionHandle {
229    /// Ingests a sorted stream of key-value pairs into the partition.
230    ///
231    /// Can only be called on a new fresh, empty partition.
232    ///
233    /// # Errors
234    ///
235    /// Will return `Err` if an IO error occurs.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the partition is **not** initially empty.
240    ///
241    /// Will panic if the input iterator is not sorted in ascending order.
242    pub fn ingest<K: Into<UserKey>, V: Into<UserValue>>(
243        &self,
244        iter: impl Iterator<Item = (K, V)>,
245    ) -> crate::Result<()> {
246        self.tree
247            .ingest(iter.map(|(k, v)| (k.into(), v.into())))
248            .map_err(Into::into)
249    }
250
251    pub(crate) fn from_keyspace(
252        keyspace: &Keyspace,
253        tree: AnyTree,
254        name: PartitionKey,
255        config: CreateOptions,
256    ) -> Self {
257        Self(Arc::new(PartitionHandleInner {
258            name,
259            tree,
260            partitions: keyspace.partitions.clone(),
261            keyspace_config: keyspace.config.clone(),
262            flush_manager: keyspace.flush_manager.clone(),
263            flush_semaphore: keyspace.flush_semaphore.clone(),
264            flushes_completed: AtomicUsize::new(0),
265            journal_manager: keyspace.journal_manager.clone(),
266            journal: keyspace.journal.clone(),
267            compaction_manager: keyspace.compaction_manager.clone(),
268            seqno: keyspace.seqno.clone(),
269            visible_seqno: keyspace.visible_seqno.clone(),
270            write_buffer_manager: keyspace.write_buffer_manager.clone(),
271            is_deleted: AtomicBool::default(),
272            is_poisoned: keyspace.is_poisoned.clone(),
273            snapshot_tracker: keyspace.snapshot_tracker.clone(),
274            config,
275            stats: keyspace.stats.clone(),
276        }))
277    }
278
279    /// Creates a new partition.
280    pub(crate) fn create_new(
281        keyspace: &Keyspace,
282        name: PartitionKey,
283        config: CreateOptions,
284    ) -> crate::Result<Self> {
285        use lsm_tree::coding::Encode;
286
287        log::debug!("Creating partition {name:?}");
288
289        let base_folder = keyspace.config.path.join(PARTITIONS_FOLDER).join(&*name);
290
291        if base_folder.join(PARTITION_DELETED_MARKER).try_exists()? {
292            log::error!("Failed to open partition, partition is deleted.");
293            return Err(Error::PartitionDeleted);
294        }
295
296        std::fs::create_dir_all(&base_folder)?;
297
298        // Write config
299        let mut file = File::create(base_folder.join(PARTITION_CONFIG_FILE))?;
300        config.encode_into(&mut file)?;
301        file.sync_all()?;
302
303        let mut base_config = lsm_tree::Config::new(base_folder)
304            .descriptor_table(keyspace.config.descriptor_table.clone())
305            .use_cache(keyspace.config.cache.clone())
306            .data_block_size(config.data_block_size)
307            .index_block_size(config.index_block_size)
308            .level_count(config.level_count)
309            .compression(config.compression)
310            .bloom_bits_per_key(config.bloom_bits_per_key);
311
312        if let Some(kv_opts) = &config.kv_separation {
313            base_config = base_config
314                .blob_compression(kv_opts.compression)
315                .blob_file_separation_threshold(kv_opts.separation_threshold)
316                .blob_file_target_size(kv_opts.file_target_size);
317        }
318
319        let tree = match config.tree_type {
320            lsm_tree::TreeType::Standard => AnyTree::Standard(base_config.open()?),
321            lsm_tree::TreeType::Blob => AnyTree::Blob(base_config.open_as_blob_tree()?),
322        };
323
324        Ok(Self(Arc::new(PartitionHandleInner {
325            name,
326            config,
327            partitions: keyspace.partitions.clone(),
328            keyspace_config: keyspace.config.clone(),
329            flush_manager: keyspace.flush_manager.clone(),
330            flush_semaphore: keyspace.flush_semaphore.clone(),
331            flushes_completed: AtomicUsize::new(0),
332            journal_manager: keyspace.journal_manager.clone(),
333            journal: keyspace.journal.clone(),
334            compaction_manager: keyspace.compaction_manager.clone(),
335            seqno: keyspace.seqno.clone(),
336            visible_seqno: keyspace.visible_seqno.clone(),
337            tree,
338            write_buffer_manager: keyspace.write_buffer_manager.clone(),
339            is_deleted: AtomicBool::default(),
340            is_poisoned: keyspace.is_poisoned.clone(),
341            snapshot_tracker: keyspace.snapshot_tracker.clone(),
342            stats: keyspace.stats.clone(),
343        })))
344    }
345
346    /// Returns the underlying LSM-tree's path.
347    #[must_use]
348    pub fn path(&self) -> &Path {
349        self.tree.tree_config().path.as_path()
350    }
351
352    /// Returns the disk space usage of this partition.
353    ///
354    /// # Examples
355    ///
356    /// ```
357    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
358    /// #
359    /// # let folder = tempfile::tempdir()?;
360    /// # let keyspace = Config::new(folder).open()?;
361    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
362    /// assert_eq!(0, partition.disk_space());
363    /// #
364    /// # Ok::<(), fjall::Error>(())
365    /// ```
366    #[must_use]
367    pub fn disk_space(&self) -> u64 {
368        self.tree.disk_space()
369    }
370
371    /// Returns an iterator that scans through the entire partition.
372    ///
373    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
379    /// #
380    /// # let folder = tempfile::tempdir()?;
381    /// # let keyspace = Config::new(folder).open()?;
382    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
383    /// partition.insert("a", "abc")?;
384    /// partition.insert("f", "abc")?;
385    /// partition.insert("g", "abc")?;
386    /// assert_eq!(3, partition.iter().count());
387    /// #
388    /// # Ok::<(), fjall::Error>(())
389    /// ```
390    #[must_use]
391    pub fn iter(&self) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
392        self.tree
393            .iter(None, None)
394            .map(|item| item.map_err(Into::into))
395    }
396
397    /// Returns an iterator that scans through the entire partition, returning only keys.
398    ///
399    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
400    #[must_use]
401    pub fn keys(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
402        self.tree
403            .keys(None, None)
404            .map(|item| item.map_err(Into::into))
405    }
406
407    /// Returns an iterator that scans through the entire partition, returning only values.
408    ///
409    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
410    #[must_use]
411    pub fn values(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
412        self.tree
413            .values(None, None)
414            .map(|item| item.map_err(Into::into))
415    }
416
417    /// Returns an iterator over a range of items.
418    ///
419    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
425    /// #
426    /// # let folder = tempfile::tempdir()?;
427    /// # let keyspace = Config::new(folder).open()?;
428    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
429    /// partition.insert("a", "abc")?;
430    /// partition.insert("f", "abc")?;
431    /// partition.insert("g", "abc")?;
432    /// assert_eq!(2, partition.range("a"..="f").count());
433    /// #
434    /// # Ok::<(), fjall::Error>(())
435    /// ```
436    pub fn range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
437        &'a self,
438        range: R,
439    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
440        self.tree
441            .range(range, None, None)
442            .map(|item| item.map_err(Into::into))
443    }
444
445    /// Returns an iterator over a prefixed set of items.
446    ///
447    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
448    ///
449    /// # Examples
450    ///
451    /// ```
452    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
453    /// #
454    /// # let folder = tempfile::tempdir()?;
455    /// # let keyspace = Config::new(folder).open()?;
456    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
457    /// partition.insert("a", "abc")?;
458    /// partition.insert("ab", "abc")?;
459    /// partition.insert("abc", "abc")?;
460    /// assert_eq!(2, partition.prefix("ab").count());
461    /// #
462    /// # Ok::<(), fjall::Error>(())
463    /// ```
464    pub fn prefix<'a, K: AsRef<[u8]> + 'a>(
465        &'a self,
466        prefix: K,
467    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
468        self.tree
469            .prefix(prefix, None, None)
470            .map(|item| item.map_err(Into::into))
471    }
472
473    /// Approximates the amount of items in the partition.
474    ///
475    /// For update- or delete-heavy workloads, this value will
476    /// diverge from the real value, but is a O(1) operation.
477    ///
478    /// For insert-only workloads (e.g. logs, time series)
479    /// this value is reliable.
480    ///
481    /// # Examples
482    ///
483    /// ```
484    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
485    /// #
486    /// # let folder = tempfile::tempdir()?;
487    /// # let keyspace = Config::new(folder).open()?;
488    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
489    /// assert_eq!(partition.approximate_len(), 0);
490    ///
491    /// partition.insert("1", "abc")?;
492    /// assert_eq!(partition.approximate_len(), 1);
493    ///
494    /// partition.remove("1")?;
495    /// // Oops! approximate_len will not be reliable here
496    /// assert_eq!(partition.approximate_len(), 2);
497    /// #
498    /// # Ok::<(), fjall::Error>(())
499    /// ```
500    #[must_use]
501    pub fn approximate_len(&self) -> usize {
502        self.tree.approximate_len()
503    }
504
505    /// Scans the entire partition, returning the amount of items.
506    ///
507    /// ###### Caution
508    ///
509    /// This operation scans the entire partition: O(n) complexity!
510    ///
511    /// Never, under any circumstances, use .`len()` == 0 to check
512    /// if the partition is empty, use [`PartitionHandle::is_empty`] instead.
513    ///
514    /// If you want an estimate, use [`PartitionHandle::approximate_len`] instead.
515    ///
516    /// # Examples
517    ///
518    /// ```
519    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
520    /// #
521    /// # let folder = tempfile::tempdir()?;
522    /// # let keyspace = Config::new(folder).open()?;
523    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
524    /// assert_eq!(partition.len()?, 0);
525    ///
526    /// partition.insert("1", "abc")?;
527    /// partition.insert("3", "abc")?;
528    /// partition.insert("5", "abc")?;
529    /// assert_eq!(partition.len()?, 3);
530    /// #
531    /// # Ok::<(), fjall::Error>(())
532    /// ```
533    ///
534    /// # Errors
535    ///
536    /// Will return `Err` if an IO error occurs.
537    pub fn len(&self) -> crate::Result<usize> {
538        let mut count = 0;
539
540        for kv in self.iter() {
541            let _ = kv?;
542            count += 1;
543        }
544
545        Ok(count)
546    }
547
548    /// Returns `true` if the partition is empty.
549    ///
550    /// This operation has O(1) complexity.
551    ///
552    /// # Examples
553    ///
554    /// ```
555    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
556    /// #
557    /// # let folder = tempfile::tempdir()?;
558    /// # let keyspace = Config::new(folder).open()?;
559    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
560    /// assert!(partition.is_empty()?);
561    ///
562    /// partition.insert("a", "abc")?;
563    /// assert!(!partition.is_empty()?);
564    /// #
565    /// # Ok::<(), fjall::Error>(())
566    /// ```
567    ///
568    /// # Errors
569    ///
570    /// Will return `Err` if an IO error occurs.
571    pub fn is_empty(&self) -> crate::Result<bool> {
572        self.first_key_value().map(|x| x.is_none())
573    }
574
575    /// Returns `true` if the partition contains the specified key.
576    ///
577    /// # Examples
578    ///
579    /// ```
580    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
581    /// #
582    /// # let folder = tempfile::tempdir()?;
583    /// # let keyspace = Config::new(folder).open()?;
584    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
585    /// assert!(!partition.contains_key("a")?);
586    ///
587    /// partition.insert("a", "abc")?;
588    /// assert!(partition.contains_key("a")?);
589    /// #
590    /// # Ok::<(), fjall::Error>(())
591    /// ```
592    ///
593    /// # Errors
594    ///
595    /// Will return `Err` if an IO error occurs.
596    pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
597        self.tree.contains_key(key, None).map_err(Into::into)
598    }
599
600    /// Retrieves an item from the partition.
601    ///
602    /// # Examples
603    ///
604    /// ```
605    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
606    /// #
607    /// # let folder = tempfile::tempdir()?;
608    /// # let keyspace = Config::new(folder).open()?;
609    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
610    /// partition.insert("a", "my_value")?;
611    ///
612    /// let item = partition.get("a")?;
613    /// assert_eq!(Some("my_value".as_bytes().into()), item);
614    /// #
615    /// # Ok::<(), fjall::Error>(())
616    /// ```
617    ///
618    /// # Errors
619    ///
620    /// Will return `Err` if an IO error occurs.
621    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<lsm_tree::UserValue>> {
622        Ok(self.tree.get(key, None)?)
623    }
624
625    /// Retrieves the size of an item from the partition.
626    ///
627    /// # Examples
628    ///
629    /// ```
630    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
631    /// #
632    /// # let folder = tempfile::tempdir()?;
633    /// # let keyspace = Config::new(folder).open()?;
634    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
635    /// partition.insert("a", "my_value")?;
636    ///
637    /// let len = partition.size_of("a")?.unwrap_or_default();
638    /// assert_eq!("my_value".len() as u32, len);
639    /// #
640    /// # Ok::<(), fjall::Error>(())
641    /// ```
642    ///
643    /// # Errors
644    ///
645    /// Will return `Err` if an IO error occurs.
646    pub fn size_of<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<u32>> {
647        Ok(self.tree.size_of(key, None)?)
648    }
649
650    /// Returns the first key-value pair in the partition.
651    /// The key in this pair is the minimum key in the partition.
652    ///
653    /// # Examples
654    ///
655    /// ```
656    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
657    /// #
658    /// # let folder = tempfile::tempdir()?;
659    /// # let keyspace = Config::new(folder).open()?;
660    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
661    /// partition.insert("1", "abc")?;
662    /// partition.insert("3", "abc")?;
663    /// partition.insert("5", "abc")?;
664    ///
665    /// let (key, _) = partition.first_key_value()?.expect("item should exist");
666    /// assert_eq!(&*key, "1".as_bytes());
667    /// #
668    /// # Ok::<(), fjall::Error>(())
669    /// ```
670    ///
671    /// # Errors
672    ///
673    /// Will return `Err` if an IO error occurs.
674    pub fn first_key_value(&self) -> crate::Result<Option<KvPair>> {
675        Ok(self.tree.first_key_value(None, None)?)
676    }
677
678    /// Returns the last key-value pair in the partition.
679    /// The key in this pair is the maximum key in the partition.
680    ///
681    /// # Examples
682    ///
683    /// ```
684    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
685    /// #
686    /// # let folder = tempfile::tempdir()?;
687    /// # let keyspace = Config::new(folder).open()?;
688    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
689    /// partition.insert("1", "abc")?;
690    /// partition.insert("3", "abc")?;
691    /// partition.insert("5", "abc")?;
692    ///
693    /// let (key, _) = partition.last_key_value()?.expect("item should exist");
694    /// assert_eq!(&*key, "5".as_bytes());
695    /// #
696    /// # Ok::<(), fjall::Error>(())
697    /// ```
698    ///
699    /// # Errors
700    ///
701    /// Will return `Err` if an IO error occurs.
702    pub fn last_key_value(&self) -> crate::Result<Option<KvPair>> {
703        Ok(self.tree.last_key_value(None, None)?)
704    }
705
706    /// Returns `true` if the underlying LSM-tree is key-value-separated.
707    ///
708    /// See [`CreateOptions::with_kv_separation`] for more information.
709    ///
710    /// # Examples
711    ///
712    /// ```
713    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
714    /// #
715    /// # let folder = tempfile::tempdir()?;
716    /// # let keyspace = Config::new(folder).open()?;
717    /// let tree1 = keyspace.open_partition("default", PartitionCreateOptions::default())?;
718    /// assert!(!tree1.is_kv_separated());
719    ///
720    /// let blob_cfg = PartitionCreateOptions::default().with_kv_separation(Default::default());
721    /// let tree2 = keyspace.open_partition("blobs", blob_cfg)?;
722    /// assert!(tree2.is_kv_separated());
723    /// #
724    /// # Ok::<(), fjall::Error>(())
725    /// ```
726    #[must_use]
727    pub fn is_kv_separated(&self) -> bool {
728        matches!(self.tree, crate::AnyTree::Blob(_))
729    }
730
731    // NOTE: Used in tests
732    #[doc(hidden)]
733    pub fn rotate_memtable_and_wait(&self) -> crate::Result<()> {
734        if self.rotate_memtable()? {
735            while !self
736                .flush_manager
737                .read()
738                .expect("lock is poisoned")
739                .is_empty()
740            {
741                std::thread::sleep(std::time::Duration::from_millis(10));
742            }
743        }
744        Ok(())
745    }
746
747    /// Returns `true` if the memtable was indeed rotated.
748    #[doc(hidden)]
749    pub fn rotate_memtable(&self) -> crate::Result<bool> {
750        log::debug!("Rotating memtable {:?}", self.name);
751
752        log::trace!("partition: acquiring journal lock");
753        let mut journal = self.journal.get_writer();
754
755        // Rotate memtable
756        let Some((yanked_id, yanked_memtable)) = self.tree.rotate_memtable() else {
757            log::debug!("Got no sealed memtable, someone beat us to it");
758            return Ok(false);
759        };
760
761        log::trace!("partition: acquiring journal manager lock");
762        let mut journal_manager = self.journal_manager.write().expect("lock is poisoned");
763
764        let seqno_map = {
765            let partitions = self.partitions.write().expect("lock is poisoned");
766
767            let mut seqnos = Vec::with_capacity(partitions.len());
768
769            for partition in partitions.values() {
770                if let Some(lsn) = partition.tree.get_highest_memtable_seqno() {
771                    seqnos.push(EvictionWatermark {
772                        lsn,
773                        partition: partition.clone(),
774                    });
775                }
776            }
777
778            seqnos
779        };
780
781        journal_manager.rotate_journal(&mut journal, seqno_map)?;
782
783        log::trace!("partition: acquiring flush manager lock");
784        let mut flush_manager = self.flush_manager.write().expect("lock is poisoned");
785
786        flush_manager.enqueue_task(
787            self.name.clone(),
788            FlushTask {
789                id: yanked_id,
790                partition: self.clone(),
791                sealed_memtable: yanked_memtable,
792            },
793        );
794
795        self.stats
796            .flushes_enqueued
797            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
798
799        drop(flush_manager);
800        drop(journal_manager);
801        drop(journal);
802
803        // Notify flush worker that new work has arrived
804        self.flush_semaphore.release();
805
806        Ok(true)
807    }
808
809    fn check_journal_size(&self) {
810        loop {
811            let bytes = self
812                .journal_manager
813                .read()
814                .expect("lock is poisoned")
815                .disk_space_used();
816
817            if bytes <= self.keyspace_config.max_journaling_size_in_bytes {
818                if bytes as f64 > self.keyspace_config.max_journaling_size_in_bytes as f64 * 0.9 {
819                    log::info!(
820                        "partition: write stall because 90% journal threshold has been reached"
821                    );
822                    std::thread::sleep(std::time::Duration::from_millis(500));
823                }
824
825                break;
826            }
827
828            log::info!(
829                "Write stall in partition {} because journal is too large",
830                self.name
831            );
832            std::thread::sleep(std::time::Duration::from_millis(100)); // TODO: maybe exponential backoff
833        }
834    }
835
836    fn check_write_stall(&self) {
837        let l0_run_count = self.tree.l0_run_count();
838
839        if l0_run_count >= 20 {
840            let sleep_us = get_write_delay(l0_run_count);
841
842            if sleep_us > 0 {
843                log::info!(
844                    "Stalling writes by {sleep_us}µs in partition {} due to many segments in L0...",
845                    self.name
846                );
847                self.compaction_manager.notify(self.clone());
848                std::thread::sleep(Duration::from_micros(sleep_us));
849            }
850        }
851    }
852
853    fn check_write_halt(&self) {
854        while self.tree.l0_run_count() >= 32 {
855            log::info!(
856                "Halting writes in partition {} until L0 is cleared up...",
857                self.name
858            );
859            self.compaction_manager.notify(self.clone());
860            std::thread::sleep(Duration::from_millis(10));
861        }
862    }
863
864    pub(crate) fn check_memtable_overflow(&self, size: u32) -> crate::Result<()> {
865        if size > self.config.max_memtable_size {
866            self.rotate_memtable().inspect_err(|_| {
867                self.is_poisoned
868                    .store(true, std::sync::atomic::Ordering::Relaxed);
869            })?;
870
871            self.check_journal_size();
872            self.check_write_halt();
873        }
874
875        self.check_write_stall();
876
877        Ok(())
878    }
879
880    pub(crate) fn check_write_buffer_size(&self, initial_size: u64) {
881        let limit = self.keyspace_config.max_write_buffer_size_in_bytes;
882
883        if initial_size > limit {
884            let p90_limit = (limit as f64) * 0.9;
885
886            loop {
887                let bytes = self.write_buffer_manager.get();
888
889                if bytes < limit {
890                    if bytes as f64 > p90_limit {
891                        log::info!(
892                            "partition: write stall because 90% write buffer threshold has been reached"
893                        );
894                        std::thread::sleep(std::time::Duration::from_millis(100));
895                    }
896                    break;
897                }
898
899                log::info!(
900                    "Write stall in partition {} because of write buffer saturation",
901                    self.name
902                );
903                std::thread::sleep(std::time::Duration::from_millis(10));
904            }
905        }
906    }
907
908    /// Number of disk segments (a.k.a. SST files) in the LSM-tree.
909    #[doc(hidden)]
910    #[must_use]
911    pub fn segment_count(&self) -> usize {
912        self.tree.segment_count()
913    }
914
915    /// Number of blob files in the LSM-tree.
916    #[doc(hidden)]
917    #[must_use]
918    pub fn blob_file_count(&self) -> usize {
919        self.tree.blob_file_count()
920    }
921
922    /// Number of completed memtable flushes in this partition.
923    #[must_use]
924    #[doc(hidden)]
925    pub fn flushes_completed(&self) -> usize {
926        self.flushes_completed
927            .load(std::sync::atomic::Ordering::Relaxed)
928    }
929
930    /// Opens a snapshot of this partition.
931    #[must_use]
932    pub fn snapshot(&self) -> crate::Snapshot {
933        self.snapshot_at(self.seqno.get())
934    }
935
936    /// Opens a snapshot of this partition with a given sequence number.
937    #[must_use]
938    pub fn snapshot_at(&self, seqno: crate::Instant) -> crate::Snapshot {
939        crate::Snapshot::new(
940            self.tree.snapshot(seqno),
941            SnapshotNonce::new(seqno, self.snapshot_tracker.clone()),
942        )
943    }
944
945    /// Performs major compaction, blocking the caller until it's done.
946    ///
947    /// # Errors
948    ///
949    /// Will return `Err` if an IO error occurs.
950    #[doc(hidden)]
951    pub fn major_compact(&self) -> crate::Result<()> {
952        match &self.config.compaction_strategy {
953            crate::compaction::Strategy::Leveled(x) => self.tree.major_compact(
954                u64::from(x.target_size),
955                self.snapshot_tracker.get_seqno_safe_to_gc(),
956            )?,
957            crate::compaction::Strategy::SizeTiered(_) => self
958                .tree
959                .major_compact(u64::MAX, self.snapshot_tracker.get_seqno_safe_to_gc())?,
960            crate::compaction::Strategy::Fifo(_) => {
961                log::warn!("Major compaction not supported for FIFO strategy");
962            }
963        }
964        Ok(())
965    }
966
967    /// Inserts a key-value pair into the partition.
968    ///
969    /// Keys may be up to 65536 bytes long, values up to 2^32 bytes.
970    /// Shorter keys and values result in better performance.
971    ///
972    /// If the key already exists, the item will be overwritten.
973    ///
974    /// # Examples
975    ///
976    /// ```
977    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
978    /// #
979    /// # let folder = tempfile::tempdir()?;
980    /// # let keyspace = Config::new(folder).open()?;
981    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
982    /// partition.insert("a", "abc")?;
983    ///
984    /// assert!(!partition.is_empty()?);
985    /// #
986    /// # Ok::<(), fjall::Error>(())
987    /// ```
988    ///
989    /// # Errors
990    ///
991    /// Will return `Err` if an IO error occurs.
992    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
993        &self,
994        key: K,
995        value: V,
996    ) -> crate::Result<()> {
997        use std::sync::atomic::Ordering;
998
999        if self.is_deleted.load(Ordering::Relaxed) {
1000            return Err(crate::Error::PartitionDeleted);
1001        }
1002
1003        let key = key.into();
1004        let value = value.into();
1005
1006        let mut journal_writer = self.journal.get_writer();
1007
1008        let seqno = self.seqno.next();
1009
1010        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1011        if self.is_poisoned.load(Ordering::Relaxed) {
1012            return Err(crate::Error::Poisoned);
1013        }
1014
1015        journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;
1016
1017        if !self.config.manual_journal_persist {
1018            journal_writer
1019                .persist(crate::PersistMode::Buffer)
1020                .map_err(|e| {
1021                    log::error!("persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}");
1022                    self.is_poisoned.store(true, Ordering::Relaxed);
1023                    e
1024                })?;
1025        }
1026
1027        let (item_size, memtable_size) = self.tree.insert(key, value, seqno);
1028
1029        self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1030
1031        drop(journal_writer);
1032
1033        let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1034
1035        self.check_memtable_overflow(memtable_size)?;
1036
1037        self.check_write_buffer_size(write_buffer_size);
1038
1039        Ok(())
1040    }
1041
1042    /// Removes an item from the partition.
1043    ///
1044    /// The key may be up to 65536 bytes long.
1045    /// Shorter keys result in better performance.
1046    ///
1047    /// # Examples
1048    ///
1049    /// ```
1050    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
1051    /// #
1052    /// # let folder = tempfile::tempdir()?;
1053    /// # let keyspace = Config::new(folder).open()?;
1054    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
1055    /// partition.insert("a", "abc")?;
1056    ///
1057    /// let item = partition.get("a")?.expect("should have item");
1058    /// assert_eq!("abc".as_bytes(), &*item);
1059    ///
1060    /// partition.remove("a")?;
1061    ///
1062    /// let item = partition.get("a")?;
1063    /// assert_eq!(None, item);
1064    /// #
1065    /// # Ok::<(), fjall::Error>(())
1066    /// ```
1067    ///
1068    /// # Errors
1069    ///
1070    /// Will return `Err` if an IO error occurs.
1071    pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
1072        use std::sync::atomic::Ordering;
1073
1074        if self.is_deleted.load(Ordering::Relaxed) {
1075            return Err(crate::Error::PartitionDeleted);
1076        }
1077
1078        let key = key.into();
1079
1080        let mut journal_writer = self.journal.get_writer();
1081
1082        let seqno = self.seqno.next();
1083
1084        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1085        if self.is_poisoned.load(Ordering::Relaxed) {
1086            return Err(crate::Error::Poisoned);
1087        }
1088
1089        journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
1090
1091        if !self.config.manual_journal_persist {
1092            journal_writer
1093                .persist(crate::PersistMode::Buffer)
1094                .map_err(|e| {
1095                    log::error!("persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}");
1096                    self.is_poisoned.store(true, Ordering::Relaxed);
1097                    e
1098                })?;
1099        }
1100
1101        let (item_size, memtable_size) = self.tree.remove(key, seqno);
1102
1103        self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1104
1105        drop(journal_writer);
1106
1107        let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1108
1109        self.check_memtable_overflow(memtable_size)?;
1110        self.check_write_buffer_size(write_buffer_size);
1111
1112        Ok(())
1113    }
1114
1115    /// Removes an item from the partition, leaving behind a weak tombstone.
1116    ///
1117    /// When a weak tombstone is matched with a single write in a compaction,
1118    /// the tombstone will be removed along with the value. If the key was
1119    /// overwritten the result of a `remove_weak` is undefined.
1120    ///
1121    /// Only use this remove if it is known that the key has only been written
1122    /// to once since its creation or last `remove_weak`.
1123    ///
1124    /// The key may be up to 65536 bytes long.
1125    /// Shorter keys result in better performance.
1126    ///
1127    /// # Experimental
1128    ///
1129    /// This function is currently experimental.
1130    ///
1131    /// # Examples
1132    ///
1133    /// ```
1134    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
1135    /// #
1136    /// # let folder = tempfile::tempdir()?;
1137    /// # let keyspace = Config::new(folder).open()?;
1138    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
1139    /// partition.insert("a", "abc")?;
1140    ///
1141    /// let item = partition.get("a")?.expect("should have item");
1142    /// assert_eq!("abc".as_bytes(), &*item);
1143    ///
1144    /// partition.remove_weak("a")?;
1145    ///
1146    /// let item = partition.get("a")?;
1147    /// assert_eq!(None, item);
1148    /// #
1149    /// # Ok::<(), fjall::Error>(())
1150    /// ```
1151    ///
1152    /// # Errors
1153    ///
1154    /// Will return `Err` if an IO error occurs.
1155    #[doc(hidden)]
1156    pub fn remove_weak<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
1157        use std::sync::atomic::Ordering;
1158
1159        if self.is_deleted.load(Ordering::Relaxed) {
1160            return Err(crate::Error::PartitionDeleted);
1161        }
1162
1163        let key = key.into();
1164
1165        let mut journal_writer = self.journal.get_writer();
1166
1167        let seqno = self.seqno.next();
1168
1169        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
1170        if self.is_poisoned.load(Ordering::Relaxed) {
1171            return Err(crate::Error::Poisoned);
1172        }
1173
1174        journal_writer.write_raw(
1175            &self.name,
1176            &key,
1177            &[],
1178            lsm_tree::ValueType::WeakTombstone,
1179            seqno,
1180        )?;
1181
1182        if !self.config.manual_journal_persist {
1183            journal_writer
1184                .persist(crate::PersistMode::Buffer)
1185                .map_err(|e| {
1186                    log::error!(
1187                        "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
1188                    );
1189                    self.is_poisoned.store(true, Ordering::Relaxed);
1190                    e
1191                })?;
1192        }
1193
1194        let (item_size, memtable_size) = self.tree.remove(key, seqno);
1195
1196        self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);
1197
1198        drop(journal_writer);
1199
1200        let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
1201
1202        self.check_memtable_overflow(memtable_size)?;
1203        self.check_write_buffer_size(write_buffer_size);
1204
1205        Ok(())
1206    }
1207}