fjall/
keyspace.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    background_worker::{Activity, BackgroundWorker},
7    batch::{Batch, PartitionKey},
8    compaction::manager::CompactionManager,
9    config::Config,
10    file::{
11        fsync_directory, FJALL_MARKER, JOURNALS_FOLDER, PARTITIONS_FOLDER, PARTITION_DELETED_MARKER,
12    },
13    flush::manager::FlushManager,
14    journal::{manager::JournalManager, writer::PersistMode, Journal},
15    monitor::Monitor,
16    partition::name::is_valid_partition_name,
17    poison_dart::PoisonDart,
18    recovery::{recover_partitions, recover_sealed_memtables},
19    snapshot_tracker::SnapshotTracker,
20    stats::Stats,
21    version::Version,
22    write_buffer_manager::WriteBufferManager,
23    HashMap, PartitionCreateOptions, PartitionHandle,
24};
25use lsm_tree::{AbstractTree, SequenceNumberCounter};
26use std::{
27    fs::{remove_dir_all, File},
28    path::Path,
29    sync::{
30        atomic::{AtomicBool, AtomicUsize},
31        Arc, RwLock,
32    },
33    time::Duration,
34};
35use std_semaphore::Semaphore;
36
37pub type Partitions = HashMap<PartitionKey, PartitionHandle>;
38
39#[allow(clippy::module_name_repetitions)]
40pub struct KeyspaceInner {
41    /// Dictionary of all partitions
42    #[doc(hidden)]
43    pub partitions: Arc<RwLock<Partitions>>,
44
45    /// Journal (write-ahead-log/WAL)
46    pub(crate) journal: Arc<Journal>,
47
48    /// Keyspace configuration
49    #[doc(hidden)]
50    pub config: Config,
51
52    /// Current sequence number
53    pub(crate) seqno: SequenceNumberCounter,
54
55    /// Current visible sequence number
56    pub(crate) visible_seqno: SequenceNumberCounter,
57
58    /// Caps write buffer size by flushing
59    /// memtables to disk segments
60    pub(crate) flush_manager: Arc<RwLock<FlushManager>>,
61
62    /// Checks on-disk journal size and flushes memtables
63    /// if needed, to garbage collect sealed journals
64    pub(crate) journal_manager: Arc<RwLock<JournalManager>>,
65
66    /// Notifies flush threads
67    pub(crate) flush_semaphore: Arc<Semaphore>,
68
69    /// Keeps track of which partitions are most likely to be
70    /// candidates for compaction
71    pub(crate) compaction_manager: CompactionManager,
72
73    /// Stop signal when keyspace is dropped to stop background threads
74    pub(crate) stop_signal: lsm_tree::stop_signal::StopSignal,
75
76    /// Counter of background threads
77    pub(crate) active_background_threads: Arc<AtomicUsize>,
78
79    /// Keeps track of write buffer size
80    pub(crate) write_buffer_manager: WriteBufferManager,
81
82    /// True if fsync failed
83    pub(crate) is_poisoned: Arc<AtomicBool>,
84
85    pub(crate) stats: Arc<Stats>,
86
87    #[doc(hidden)]
88    pub snapshot_tracker: SnapshotTracker,
89}
90
91impl Drop for KeyspaceInner {
92    fn drop(&mut self) {
93        log::trace!("Dropping Keyspace");
94
95        self.stop_signal.send();
96
97        while self
98            .active_background_threads
99            .load(std::sync::atomic::Ordering::Relaxed)
100            > 0
101        {
102            std::thread::sleep(std::time::Duration::from_micros(100));
103
104            // NOTE: Trick threads into waking up
105            self.flush_semaphore.release();
106            self.compaction_manager.notify_empty();
107        }
108
109        // IMPORTANT: Break cyclic Arcs
110        self.flush_manager
111            .write()
112            .expect("lock is poisoned")
113            .clear();
114        self.compaction_manager.clear();
115        self.partitions.write().expect("lock is poisoned").clear();
116        self.journal_manager
117            .write()
118            .expect("lock is poisoned")
119            .clear();
120
121        self.config.descriptor_table.clear();
122
123        if self.config.clean_path_on_drop {
124            log::info!(
125                "Deleting keyspace because temporary=true: {:?}",
126                self.config.path,
127            );
128
129            if let Err(err) = remove_dir_all(&self.config.path) {
130                eprintln!("Failed to clean up path: {:?} - {err}", self.config.path);
131            }
132        }
133
134        #[cfg(feature = "__internal_whitebox")]
135        crate::drop::decrement_drop_counter();
136    }
137}
138
139/// A keyspace is a single logical database
140/// which can house multiple partitions
141///
142/// In your application, you should create a single keyspace
143/// and keep it around for as long as needed
144/// (as long as you are using its partitions).
145#[derive(Clone)]
146#[doc(alias = "database")]
147#[doc(alias = "collection")]
148pub struct Keyspace(pub(crate) Arc<KeyspaceInner>);
149
150impl std::ops::Deref for Keyspace {
151    type Target = KeyspaceInner;
152
153    fn deref(&self) -> &Self::Target {
154        &self.0
155    }
156}
157
158impl Keyspace {
159    /// Initializes a new atomic write batch.
160    ///
161    /// Items may be written to multiple partitions, which
162    /// will be be updated atomically when the batch is committed.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
168    /// #
169    /// # let folder = tempfile::tempdir()?;
170    /// # let keyspace = Config::new(folder).open()?;
171    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
172    /// let mut batch = keyspace.batch();
173    ///
174    /// assert_eq!(partition.len()?, 0);
175    /// batch.insert(&partition, "1", "abc");
176    /// batch.insert(&partition, "3", "abc");
177    /// batch.insert(&partition, "5", "abc");
178    ///
179    /// assert_eq!(partition.len()?, 0);
180    ///
181    /// batch.commit()?;
182    /// assert_eq!(partition.len()?, 3);
183    /// #
184    /// # Ok::<(), fjall::Error>(())
185    /// ```
186    #[must_use]
187    pub fn batch(&self) -> Batch {
188        let mut batch = Batch::new(self.clone());
189
190        if !self.config.manual_journal_persist {
191            batch = batch.durability(Some(PersistMode::Buffer));
192        }
193
194        batch
195    }
196
197    /// Returns the current write buffer size (active + sealed memtables).
198    #[must_use]
199    pub fn write_buffer_size(&self) -> u64 {
200        self.write_buffer_manager.get()
201    }
202
203    /// Returns the amount of completed memtable flushes.
204    #[doc(hidden)]
205    #[must_use]
206    pub fn flushes_completed(&self) -> usize {
207        self.stats
208            .flushes_completed
209            .load(std::sync::atomic::Ordering::Relaxed)
210    }
211
212    /// Returns the time all compactions took until now.
213    #[doc(hidden)]
214    #[must_use]
215    pub fn time_compacting(&self) -> std::time::Duration {
216        let us = self
217            .stats
218            .time_compacting
219            .load(std::sync::atomic::Ordering::Relaxed);
220
221        std::time::Duration::from_micros(us)
222    }
223
224    /// Returns the number of active compactions currently running.
225    #[doc(hidden)]
226    #[must_use]
227    pub fn active_compactions(&self) -> usize {
228        self.stats
229            .active_compaction_count
230            .load(std::sync::atomic::Ordering::Relaxed)
231    }
232
233    /// Returns the amount of completed compactions.
234    #[doc(hidden)]
235    #[must_use]
236    pub fn compactions_completed(&self) -> usize {
237        self.stats
238            .compactions_completed
239            .load(std::sync::atomic::Ordering::Relaxed)
240    }
241
242    /// Returns the amount of journals on disk.
243    ///
244    /// # Examples
245    ///
246    /// ```
247    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
248    /// #
249    /// # let folder = tempfile::tempdir()?;
250    /// # let keyspace = Config::new(folder).open()?;
251    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
252    /// assert_eq!(1, keyspace.journal_count());
253    /// #
254    /// # Ok::<(), fjall::Error>(())
255    /// ```
256    #[must_use]
257    pub fn journal_count(&self) -> usize {
258        self.journal_manager
259            .read()
260            .expect("lock is poisoned")
261            .journal_count()
262    }
263
264    /// Returns the disk space usage of the journal.
265    #[doc(hidden)]
266    #[must_use]
267    pub fn journal_disk_space(&self) -> u64 {
268        // TODO: 3.0.0 error is not handled because that would break the API...
269        self.journal.get_writer().len().unwrap_or_default()
270            + self
271                .journal_manager
272                .read()
273                .expect("lock is poisoned")
274                .disk_space_used()
275    }
276
277    /// Returns the disk space usage of the entire keyspace.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
283    /// #
284    /// # let folder = tempfile::tempdir()?;
285    /// # let keyspace = Config::new(folder).open()?;
286    /// # let partition = keyspace.open_partition("default", PartitionCreateOptions::default())?;
287    /// assert!(keyspace.disk_space() > 0);
288    /// #
289    /// # Ok::<(), fjall::Error>(())
290    /// ```
291    pub fn disk_space(&self) -> u64 {
292        let partitions_size = self
293            .partitions
294            .read()
295            .expect("lock is poisoned")
296            .values()
297            .map(PartitionHandle::disk_space)
298            .sum::<u64>();
299
300        self.journal_disk_space() + partitions_size
301    }
302
303    /// Flushes the active journal. The durability depends on the [`PersistMode`]
304    /// used.
305    ///
306    /// Persisting only affects durability, NOT consistency! Even without flushing
307    /// data is crash-safe.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// # use fjall::{Config, PersistMode, Keyspace, PartitionCreateOptions};
313    /// # let folder = tempfile::tempdir()?;
314    /// let keyspace = Config::new(folder).open()?;
315    /// let items = keyspace.open_partition("my_items", PartitionCreateOptions::default())?;
316    ///
317    /// items.insert("a", "hello")?;
318    ///
319    /// keyspace.persist(PersistMode::SyncAll)?;
320    /// #
321    /// # Ok::<_, fjall::Error>(())
322    /// ```
323    ///
324    /// # Errors
325    ///
326    /// Returns error, if an IO error occurred.
327    pub fn persist(&self, mode: PersistMode) -> crate::Result<()> {
328        if self.is_poisoned.load(std::sync::atomic::Ordering::Relaxed) {
329            return Err(crate::Error::Poisoned);
330        }
331
332        if let Err(e) = self.journal.persist(mode) {
333            self.is_poisoned
334                .store(true, std::sync::atomic::Ordering::Release);
335
336            log::error!(
337                "flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
338            );
339
340            return Err(crate::Error::Poisoned);
341        }
342
343        Ok(())
344    }
345
346    #[doc(hidden)]
347    #[must_use]
348    pub fn cache_capacity(&self) -> u64 {
349        self.config.cache.capacity()
350    }
351
352    /// Opens a keyspace in the given directory.
353    ///
354    /// # Errors
355    ///
356    /// Returns error, if an IO error occurred.
357    pub fn open(config: Config) -> crate::Result<Self> {
358        log::debug!(
359            "cache capacity={}MiB",
360            config.cache.capacity() / 1_024 / 1_024,
361        );
362
363        let keyspace = Self::create_or_recover(config)?;
364        keyspace.start_background_threads()?;
365
366        #[cfg(feature = "__internal_whitebox")]
367        crate::drop::increment_drop_counter();
368
369        Ok(keyspace)
370    }
371
372    /// Same as [`Keyspace::open`], but does not start background threads.
373    ///
374    /// Needed to open a keyspace without threads for testing.
375    ///
376    /// Should not be user-facing.
377    #[doc(hidden)]
378    pub fn create_or_recover(config: Config) -> crate::Result<Self> {
379        log::info!("Opening keyspace at {:?}", config.path);
380
381        if config.path.join(FJALL_MARKER).try_exists()? {
382            Self::recover(config)
383        } else {
384            Self::create_new(config)
385        }
386    }
387
388    /// Starts background threads that maintain the keyspace.
389    ///
390    /// Should not be called, unless in [`Keyspace::open`]
391    /// and should definitely not be user-facing.
392    pub(crate) fn start_background_threads(&self) -> crate::Result<()> {
393        if self.config.flush_workers_count > 0 {
394            self.spawn_flush_worker()?;
395
396            for _ in 0..self
397                .flush_manager
398                .read()
399                .expect("lock is poisoned")
400                .queue_count()
401            {
402                self.flush_semaphore.release();
403            }
404        }
405
406        log::debug!(
407            "Spawning {} compaction threads",
408            self.config.compaction_workers_count
409        );
410
411        for _ in 0..self.config.compaction_workers_count {
412            self.spawn_compaction_worker()?;
413        }
414
415        if let Some(ms) = self.config.fsync_ms {
416            self.spawn_fsync_thread(ms.into())?;
417        }
418
419        self.spawn_monitor_thread()
420    }
421
422    /// Destroys the partition, removing all data associated with it.
423    ///
424    /// # Errors
425    ///
426    /// Will return `Err` if an IO error occurs.
427    pub fn delete_partition(&self, handle: PartitionHandle) -> crate::Result<()> {
428        let partition_path = handle.path();
429
430        let file = File::create(partition_path.join(PARTITION_DELETED_MARKER))?;
431        file.sync_all()?;
432
433        // IMPORTANT: fsync folder on Unix
434        fsync_directory(partition_path)?;
435
436        handle
437            .is_deleted
438            .store(true, std::sync::atomic::Ordering::Release);
439
440        // IMPORTANT: Care, locks partitions map
441        self.compaction_manager.remove_partition(&handle.name);
442
443        self.flush_manager
444            .write()
445            .expect("lock is poisoned")
446            .remove_partition(&handle.name);
447
448        self.partitions
449            .write()
450            .expect("lock is poisoned")
451            .remove(&handle.name);
452
453        Ok(())
454    }
455
456    /// Creates or opens a keyspace partition.
457    ///
458    /// If the partition does not yet exist, it will be created configured with `create_options`.
459    /// Otherwise simply a handle to the existing partition will be returned.
460    ///
461    /// Partition names can be up to 255 characters long, can not be empty and
462    /// can only contain alphanumerics, underscore (`_`), dash (`-`), hash tag (`#`) and dollar (`$`).
463    ///
464    /// # Errors
465    ///
466    /// Returns error, if an IO error occurred.
467    ///
468    /// # Panics
469    ///
470    /// Panics if the partition name is invalid.
471    pub fn open_partition(
472        &self,
473        name: &str,
474        create_options: PartitionCreateOptions,
475    ) -> crate::Result<PartitionHandle> {
476        assert!(is_valid_partition_name(name));
477
478        let mut partitions = self.partitions.write().expect("lock is poisoned");
479
480        Ok(if let Some(partition) = partitions.get(name) {
481            partition.clone()
482        } else {
483            let name: PartitionKey = name.into();
484
485            let handle = PartitionHandle::create_new(self, name.clone(), create_options)?;
486            partitions.insert(name, handle.clone());
487
488            #[cfg(feature = "__internal_whitebox")]
489            crate::drop::increment_drop_counter();
490
491            handle
492        })
493    }
494
495    /// Returns the amount of partitions.
496    #[must_use]
497    pub fn partition_count(&self) -> usize {
498        self.partitions.read().expect("lock is poisoned").len()
499    }
500
501    /// Gets a list of all partition names in the keyspace.
502    #[must_use]
503    pub fn list_partitions(&self) -> Vec<PartitionKey> {
504        self.partitions
505            .read()
506            .expect("lock is poisoned")
507            .keys()
508            .cloned()
509            .collect()
510    }
511
512    /// Returns `true` if the partition with the given name exists.
513    ///
514    /// # Examples
515    ///
516    /// ```
517    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
518    /// #
519    /// # let folder = tempfile::tempdir()?;
520    /// # let keyspace = Config::new(folder).open()?;
521    /// assert!(!keyspace.partition_exists("default"));
522    /// keyspace.open_partition("default", PartitionCreateOptions::default())?;
523    /// assert!(keyspace.partition_exists("default"));
524    /// #
525    /// # Ok::<(), fjall::Error>(())
526    /// ```
527    #[must_use]
528    pub fn partition_exists(&self, name: &str) -> bool {
529        self.partitions
530            .read()
531            .expect("lock is poisoned")
532            .contains_key(name)
533    }
534
535    /// Gets the current sequence number.
536    ///
537    /// Can be used to start a cross-partition snapshot, using [`PartitionHandle::snapshot_at`].
538    ///
539    /// # Examples
540    ///
541    /// ```
542    /// # use fjall::{Config, Keyspace, PartitionCreateOptions};
543    /// #
544    /// # let folder = tempfile::tempdir()?;
545    /// # let keyspace = Config::new(folder).open()?;
546    /// let partition1 = keyspace.open_partition("default", PartitionCreateOptions::default())?;
547    /// let partition2 = keyspace.open_partition("another", PartitionCreateOptions::default())?;
548    ///
549    /// partition1.insert("abc1", "abc")?;
550    /// partition2.insert("abc2", "abc")?;
551    ///
552    /// let instant = keyspace.instant();
553    /// let snapshot1 = partition1.snapshot_at(instant);
554    /// let snapshot2 = partition2.snapshot_at(instant);
555    ///
556    /// assert!(partition1.contains_key("abc1")?);
557    /// assert!(partition2.contains_key("abc2")?);
558    ///
559    /// assert!(snapshot1.contains_key("abc1")?);
560    /// assert!(snapshot2.contains_key("abc2")?);
561    ///
562    /// partition1.insert("def1", "def")?;
563    /// partition2.insert("def2", "def")?;
564    ///
565    /// assert!(!snapshot1.contains_key("def1")?);
566    /// assert!(!snapshot2.contains_key("def2")?);
567    ///
568    /// assert!(partition1.contains_key("def1")?);
569    /// assert!(partition2.contains_key("def2")?);
570    /// #
571    /// # Ok::<(), fjall::Error>(())
572    /// ```
573    #[must_use]
574    pub fn instant(&self) -> crate::Instant {
575        self.visible_seqno.get()
576    }
577
578    fn check_version<P: AsRef<Path>>(path: P) -> crate::Result<()> {
579        let bytes = std::fs::read(path.as_ref().join(FJALL_MARKER))?;
580
581        if let Some(version) = Version::parse_file_header(&bytes) {
582            if version != Version::V2 {
583                return Err(crate::Error::InvalidVersion(Some(version)));
584            }
585        } else {
586            return Err(crate::Error::InvalidVersion(None));
587        }
588
589        Ok(())
590    }
591
592    /// Recovers existing keyspace from directory.
593    #[allow(clippy::too_many_lines)]
594    #[doc(hidden)]
595    pub fn recover(config: Config) -> crate::Result<Self> {
596        log::info!("Recovering keyspace at {:?}", config.path);
597
598        // TODO:
599        // let recovery_mode = config.journal_recovery_mode;
600
601        // Check version
602        Self::check_version(&config.path)?;
603
604        // Reload active journal
605        let journals_folder = config.path.join(JOURNALS_FOLDER);
606        let journal_recovery = Journal::recover(journals_folder)?;
607        log::debug!("journal recovery result: {journal_recovery:#?}");
608
609        let active_journal = Arc::new(journal_recovery.active);
610        active_journal.get_writer().persist(PersistMode::SyncAll)?;
611
612        let sealed_journals = journal_recovery.sealed;
613
614        let journal_manager = JournalManager::from_active(active_journal.path());
615
616        // Construct (empty) keyspace, then fill back with partition data
617        let inner = KeyspaceInner {
618            config,
619            journal: active_journal,
620            partitions: Arc::new(RwLock::new(Partitions::with_capacity_and_hasher(
621                10,
622                xxhash_rust::xxh3::Xxh3Builder::new(),
623            ))),
624            seqno: SequenceNumberCounter::default(),
625            visible_seqno: SequenceNumberCounter::default(),
626            flush_manager: Arc::new(RwLock::new(FlushManager::new())),
627            journal_manager: Arc::new(RwLock::new(journal_manager)),
628            flush_semaphore: Arc::new(Semaphore::new(0)),
629            compaction_manager: CompactionManager::default(),
630            stop_signal: lsm_tree::stop_signal::StopSignal::default(),
631            active_background_threads: Arc::default(),
632            write_buffer_manager: WriteBufferManager::default(),
633            is_poisoned: Arc::default(),
634            snapshot_tracker: SnapshotTracker::default(),
635            stats: Arc::default(),
636        };
637
638        let keyspace = Self(Arc::new(inner));
639
640        // Recover partitions
641        recover_partitions(&keyspace)?;
642
643        // Recover sealed memtables by walking through old journals
644        recover_sealed_memtables(
645            &keyspace,
646            &sealed_journals
647                .into_iter()
648                .map(|(_, x)| x)
649                .collect::<Vec<_>>(),
650        )?;
651
652        {
653            let partitions = keyspace.partitions.read().expect("lock is poisoned");
654
655            #[cfg(debug_assertions)]
656            for partition in partitions.values() {
657                // NOTE: If this triggers, the last sealed memtable
658                // was not correctly rotated
659                debug_assert!(
660                    partition.tree.lock_active_memtable().is_empty(),
661                    "active memtable is not empty - this is a bug"
662                );
663            }
664
665            // NOTE: We only need to recover the active journal, if it actually existed before
666            // nothing to recover, if we just created it
667            if !journal_recovery.was_active_created {
668                log::trace!("Recovering active memtables from active journal");
669
670                let reader = keyspace.journal.get_reader()?;
671
672                for batch in reader {
673                    let batch = batch?;
674
675                    for item in batch.items {
676                        if let Some(partition) = partitions.get(&item.partition) {
677                            let tree = &partition.tree;
678
679                            match item.value_type {
680                                lsm_tree::ValueType::Value => {
681                                    tree.insert(item.key, item.value, batch.seqno);
682                                }
683                                lsm_tree::ValueType::Tombstone => {
684                                    tree.remove(item.key, batch.seqno);
685                                }
686                                lsm_tree::ValueType::WeakTombstone => {
687                                    tree.remove_weak(item.key, batch.seqno);
688                                }
689                            }
690                        }
691                    }
692                }
693
694                for partition in partitions.values() {
695                    let size = partition.tree.active_memtable_size().into();
696
697                    log::trace!(
698                        "Recovered active memtable of size {size}B for partition {:?} ({} items)",
699                        partition.name,
700                        partition.tree.lock_active_memtable().len(),
701                    );
702
703                    // IMPORTANT: Add active memtable size to current write buffer size
704                    keyspace.write_buffer_manager.allocate(size);
705
706                    // Recover seqno
707                    let maybe_next_seqno = partition
708                        .tree
709                        .get_highest_seqno()
710                        .map(|x| x + 1)
711                        .unwrap_or_default();
712
713                    keyspace
714                        .seqno
715                        .fetch_max(maybe_next_seqno, std::sync::atomic::Ordering::AcqRel);
716
717                    log::debug!("Keyspace seqno is now {}", keyspace.seqno.get());
718                }
719            }
720        }
721
722        keyspace.visible_seqno.store(
723            keyspace.seqno.load(std::sync::atomic::Ordering::Acquire),
724            std::sync::atomic::Ordering::Release,
725        );
726
727        log::trace!("Recovery successful");
728
729        Ok(keyspace)
730    }
731
732    #[doc(hidden)]
733    pub fn create_new(config: Config) -> crate::Result<Self> {
734        let path = config.path.clone();
735        log::debug!("Creating keyspace at {path:?}");
736
737        std::fs::create_dir_all(&path)?;
738
739        let marker_path = path.join(FJALL_MARKER);
740        assert!(!marker_path.try_exists()?);
741
742        let journal_folder_path = path.join(JOURNALS_FOLDER);
743        let partition_folder_path = path.join(PARTITIONS_FOLDER);
744
745        std::fs::create_dir_all(&journal_folder_path)?;
746        std::fs::create_dir_all(&partition_folder_path)?;
747
748        let active_journal_path = journal_folder_path.join("0");
749        let journal = Journal::create_new(&active_journal_path)?;
750        let journal = Arc::new(journal);
751
752        let inner = KeyspaceInner {
753            config,
754            journal,
755            partitions: Arc::new(RwLock::new(Partitions::with_capacity_and_hasher(
756                10,
757                xxhash_rust::xxh3::Xxh3Builder::new(),
758            ))),
759            seqno: SequenceNumberCounter::default(),
760            visible_seqno: SequenceNumberCounter::default(),
761            flush_manager: Arc::new(RwLock::new(FlushManager::new())),
762            journal_manager: Arc::new(RwLock::new(JournalManager::from_active(
763                active_journal_path,
764            ))),
765            flush_semaphore: Arc::new(Semaphore::new(0)),
766            compaction_manager: CompactionManager::default(),
767            stop_signal: lsm_tree::stop_signal::StopSignal::default(),
768            active_background_threads: Arc::default(),
769            write_buffer_manager: WriteBufferManager::default(),
770            is_poisoned: Arc::default(),
771            snapshot_tracker: SnapshotTracker::default(),
772            stats: Arc::default(),
773        };
774
775        // NOTE: Lastly, fsync .fjall marker, which contains the version
776        // -> the keyspace is fully initialized
777        let mut file = std::fs::File::create(marker_path)?;
778        Version::V2.write_file_header(&mut file)?;
779        file.sync_all()?;
780
781        // IMPORTANT: fsync folders on Unix
782        fsync_directory(&journal_folder_path)?;
783        fsync_directory(&partition_folder_path)?;
784        fsync_directory(&path)?;
785
786        Ok(Self(Arc::new(inner)))
787    }
788
789    fn spawn_monitor_thread(&self) -> crate::Result<()> {
790        const NAME: &str = "monitor";
791
792        let monitor = Monitor::new(self);
793
794        let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
795        let thread_counter = self.active_background_threads.clone();
796        let stop_signal = self.stop_signal.clone();
797
798        let worker = BackgroundWorker::new(monitor, poison_dart, thread_counter, stop_signal);
799
800        std::thread::Builder::new()
801            .name(NAME.into())
802            .spawn(move || {
803                worker.start();
804            })
805            .map(|_| ())
806            .map_err(Into::into)
807    }
808
809    fn spawn_fsync_thread(&self, ms: u64) -> crate::Result<()> {
810        const NAME: &str = "syncer";
811
812        struct Syncer {
813            journal: Arc<Journal>,
814            sleep_dur: Duration,
815        }
816
817        impl Activity for Syncer {
818            fn name(&self) -> &'static str {
819                NAME
820            }
821
822            fn run(&mut self) -> crate::Result<()> {
823                std::thread::sleep(self.sleep_dur);
824                self.journal.persist(PersistMode::SyncAll)?;
825                Ok(())
826            }
827        }
828
829        let syncer = Syncer {
830            journal: self.journal.clone(),
831            sleep_dur: Duration::from_millis(ms),
832        };
833
834        let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
835        let thread_counter = self.active_background_threads.clone();
836        let stop_signal = self.stop_signal.clone();
837
838        let worker = BackgroundWorker::new(syncer, poison_dart, thread_counter, stop_signal);
839
840        std::thread::Builder::new()
841            .name(NAME.into())
842            .spawn(move || {
843                worker.start();
844            })
845            .map(|_| ())
846            .map_err(Into::into)
847    }
848
849    fn spawn_compaction_worker(&self) -> crate::Result<()> {
850        const NAME: &str = "compactor";
851
852        struct Compactor {
853            manager: CompactionManager,
854            snapshot_tracker: SnapshotTracker,
855            stats: Arc<Stats>,
856        }
857
858        impl Activity for Compactor {
859            fn name(&self) -> &'static str {
860                NAME
861            }
862
863            fn run(&mut self) -> crate::Result<()> {
864                log::trace!("{:?}: waiting for work", self.name());
865                self.manager.wait_for();
866                crate::compaction::worker::run(&self.manager, &self.snapshot_tracker, &self.stats)?;
867                Ok(())
868            }
869        }
870
871        let compactor = Compactor {
872            manager: self.compaction_manager.clone(),
873            snapshot_tracker: self.snapshot_tracker.clone(),
874            stats: self.stats.clone(),
875        };
876
877        let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
878        let thread_counter = self.active_background_threads.clone();
879        let stop_signal = self.stop_signal.clone();
880
881        let worker = BackgroundWorker::new(compactor, poison_dart, thread_counter, stop_signal);
882
883        std::thread::Builder::new()
884            .name(NAME.into())
885            .spawn(move || {
886                worker.start();
887            })
888            .map(|_| ())
889            .map_err(Into::into)
890    }
891
892    /// Only used for internal testing.
893    ///
894    /// Should NOT be called when there is a flush worker active already!!!
895    #[doc(hidden)]
896    pub fn force_flush(&self) -> crate::Result<()> {
897        let parallelism = self.config.flush_workers_count;
898
899        crate::flush::worker::run(
900            &self.flush_manager,
901            &self.journal_manager,
902            &self.compaction_manager,
903            &self.write_buffer_manager,
904            &self.snapshot_tracker,
905            parallelism,
906            &self.stats,
907        )
908    }
909
910    fn spawn_flush_worker(&self) -> crate::Result<()> {
911        const NAME: &str = "flusher";
912
913        struct Flusher {
914            parallelism: usize,
915            flush_semaphore: Arc<Semaphore>,
916            flush_manager: Arc<RwLock<FlushManager>>,
917            journal_manager: Arc<RwLock<JournalManager>>,
918            write_buffer_manager: WriteBufferManager,
919            compaction_manager: CompactionManager,
920            snapshot_tracker: SnapshotTracker,
921            stats: Arc<Stats>,
922        }
923
924        impl Activity for Flusher {
925            fn name(&self) -> &'static str {
926                NAME
927            }
928
929            fn run(&mut self) -> crate::Result<()> {
930                log::trace!("{:?}: waiting for work", self.name());
931
932                self.flush_semaphore.acquire();
933
934                crate::flush::worker::run(
935                    &self.flush_manager,
936                    &self.journal_manager,
937                    &self.compaction_manager,
938                    &self.write_buffer_manager,
939                    &self.snapshot_tracker,
940                    self.parallelism,
941                    &self.stats,
942                )?;
943
944                Ok(())
945            }
946        }
947
948        let flusher = Flusher {
949            flush_manager: self.flush_manager.clone(),
950            journal_manager: self.journal_manager.clone(),
951            compaction_manager: self.compaction_manager.clone(),
952            flush_semaphore: self.flush_semaphore.clone(),
953            write_buffer_manager: self.write_buffer_manager.clone(),
954            snapshot_tracker: self.snapshot_tracker.clone(),
955            stats: self.stats.clone(),
956            parallelism: self.config.flush_workers_count,
957        };
958
959        let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
960        let thread_counter = self.active_background_threads.clone();
961        let stop_signal = self.stop_signal.clone();
962
963        let worker = BackgroundWorker::new(flusher, poison_dart, thread_counter, stop_signal);
964
965        std::thread::Builder::new()
966            .name(NAME.into())
967            .spawn(move || {
968                worker.start();
969            })
970            .map(|_| ())
971            .map_err(Into::into)
972    }
973}
974
975#[cfg(test)]
976mod tests {
977    use super::*;
978    use test_log::test;
979
980    // TODO: 3.0.0 if we store the partition as a monotonic integer
981    // and the partition's name inside the partition options/manifest
982    // we could allow all UTF-8 characters for partition names
983    //
984    // https://github.com/fjall-rs/fjall/issues/89
985    #[test]
986    pub fn test_exotic_partition_names() -> crate::Result<()> {
987        let folder = tempfile::tempdir()?;
988        let config = Config::new(&folder);
989        let keyspace = Keyspace::create_or_recover(config)?;
990
991        for name in ["hello$world", "hello#world", "hello.world", "hello_world"] {
992            let db = keyspace.open_partition(name, Default::default())?;
993            db.insert("a", "a")?;
994            assert_eq!(1, db.len()?);
995        }
996
997        Ok(())
998    }
999
1000    #[test]
1001    pub fn recover_after_rotation_multiple_partitions() -> crate::Result<()> {
1002        let folder = tempfile::tempdir()?;
1003
1004        {
1005            let config = Config::new(&folder);
1006            let keyspace = Keyspace::create_or_recover(config)?;
1007            let db = keyspace.open_partition("default", Default::default())?;
1008            let db2 = keyspace.open_partition("default2", Default::default())?;
1009
1010            db.insert("a", "a")?;
1011            db2.insert("a", "a")?;
1012            assert_eq!(1, db.len()?);
1013            assert_eq!(1, db2.len()?);
1014
1015            assert_eq!(None, db.tree.get_highest_persisted_seqno());
1016            assert_eq!(None, db2.tree.get_highest_persisted_seqno());
1017
1018            db.rotate_memtable()?;
1019
1020            assert_eq!(1, db.len()?);
1021            assert_eq!(1, db.tree.sealed_memtable_count());
1022
1023            assert_eq!(1, db2.len()?);
1024            assert_eq!(0, db2.tree.sealed_memtable_count());
1025
1026            db2.insert("b", "b")?;
1027            db2.rotate_memtable()?;
1028
1029            assert_eq!(1, db.len()?);
1030            assert_eq!(1, db.tree.sealed_memtable_count());
1031
1032            assert_eq!(2, db2.len()?);
1033            assert_eq!(1, db2.tree.sealed_memtable_count());
1034        }
1035
1036        {
1037            // IMPORTANT: We need to allocate enough flush workers
1038            // because on CI there may not be enough cores by default
1039            // so the result would be wrong
1040            let config = Config::new(&folder).flush_workers(16);
1041            let keyspace = Keyspace::create_or_recover(config)?;
1042            let db = keyspace.open_partition("default", Default::default())?;
1043            let db2 = keyspace.open_partition("default2", Default::default())?;
1044
1045            assert_eq!(1, db.len()?);
1046            assert_eq!(1, db.tree.sealed_memtable_count());
1047
1048            assert_eq!(2, db2.len()?);
1049            assert_eq!(2, db2.tree.sealed_memtable_count());
1050
1051            assert_eq!(3, keyspace.journal_count());
1052
1053            keyspace.force_flush()?;
1054
1055            assert_eq!(1, db.len()?);
1056            assert_eq!(0, db.tree.sealed_memtable_count());
1057
1058            assert_eq!(2, db2.len()?);
1059            assert_eq!(0, db2.tree.sealed_memtable_count());
1060
1061            assert_eq!(1, keyspace.journal_count());
1062
1063            assert_eq!(Some(0), db.tree.get_highest_persisted_seqno());
1064            assert_eq!(Some(2), db2.tree.get_highest_persisted_seqno());
1065        }
1066
1067        Ok(())
1068    }
1069
1070    #[test]
1071    pub fn recover_after_rotation() -> crate::Result<()> {
1072        let folder = tempfile::tempdir()?;
1073
1074        {
1075            let config = Config::new(&folder);
1076            let keyspace = Keyspace::create_or_recover(config)?;
1077            let db = keyspace.open_partition("default", Default::default())?;
1078
1079            db.insert("a", "a")?;
1080            assert_eq!(1, db.len()?);
1081
1082            db.rotate_memtable()?;
1083
1084            assert_eq!(1, db.len()?);
1085            assert_eq!(1, db.tree.sealed_memtable_count());
1086        }
1087
1088        {
1089            let config = Config::new(&folder);
1090            let keyspace = Keyspace::create_or_recover(config)?;
1091            let db = keyspace.open_partition("default", Default::default())?;
1092
1093            assert_eq!(1, db.len()?);
1094            assert_eq!(1, db.tree.sealed_memtable_count());
1095            assert_eq!(2, keyspace.journal_count());
1096
1097            keyspace.force_flush()?;
1098
1099            assert_eq!(1, db.len()?);
1100            assert_eq!(0, db.tree.sealed_memtable_count());
1101            assert_eq!(1, keyspace.journal_count());
1102        }
1103
1104        Ok(())
1105    }
1106
1107    #[test]
1108    pub fn force_flush_multiple_partitions() -> crate::Result<()> {
1109        let folder = tempfile::tempdir()?;
1110
1111        let config = Config::new(folder);
1112        let keyspace = Keyspace::create_or_recover(config)?;
1113        let db = keyspace.open_partition("default", Default::default())?;
1114        let db2 = keyspace.open_partition("default2", Default::default())?;
1115
1116        assert_eq!(0, keyspace.write_buffer_size());
1117
1118        assert_eq!(0, db.segment_count());
1119        assert_eq!(0, db2.segment_count());
1120
1121        assert_eq!(
1122            0,
1123            keyspace
1124                .journal_manager
1125                .read()
1126                .expect("lock is poisoned")
1127                .sealed_journal_count()
1128        );
1129
1130        assert_eq!(
1131            0,
1132            keyspace
1133                .flush_manager
1134                .read()
1135                .expect("lock is poisoned")
1136                .queued_size()
1137        );
1138
1139        assert_eq!(
1140            0,
1141            keyspace
1142                .flush_manager
1143                .read()
1144                .expect("lock is poisoned")
1145                .len()
1146        );
1147
1148        for _ in 0..100 {
1149            db.insert(nanoid::nanoid!(), "abc")?;
1150            db2.insert(nanoid::nanoid!(), "abc")?;
1151        }
1152
1153        db.rotate_memtable()?;
1154
1155        assert_eq!(
1156            1,
1157            keyspace
1158                .flush_manager
1159                .read()
1160                .expect("lock is poisoned")
1161                .len()
1162        );
1163
1164        assert_eq!(
1165            1,
1166            keyspace
1167                .journal_manager
1168                .read()
1169                .expect("lock is poisoned")
1170                .sealed_journal_count()
1171        );
1172
1173        for _ in 0..100 {
1174            db2.insert(nanoid::nanoid!(), "abc")?;
1175        }
1176
1177        db2.rotate_memtable()?;
1178
1179        assert_eq!(
1180            2,
1181            keyspace
1182                .flush_manager
1183                .read()
1184                .expect("lock is poisoned")
1185                .len()
1186        );
1187
1188        assert_eq!(
1189            2,
1190            keyspace
1191                .journal_manager
1192                .read()
1193                .expect("lock is poisoned")
1194                .sealed_journal_count()
1195        );
1196
1197        assert_eq!(0, db.segment_count());
1198        assert_eq!(0, db2.segment_count());
1199
1200        keyspace.force_flush()?;
1201
1202        assert_eq!(
1203            0,
1204            keyspace
1205                .flush_manager
1206                .read()
1207                .expect("lock is poisoned")
1208                .queued_size()
1209        );
1210
1211        assert_eq!(
1212            0,
1213            keyspace
1214                .flush_manager
1215                .read()
1216                .expect("lock is poisoned")
1217                .len()
1218        );
1219
1220        assert_eq!(
1221            0,
1222            keyspace
1223                .journal_manager
1224                .read()
1225                .expect("lock is poisoned")
1226                .sealed_journal_count()
1227        );
1228
1229        assert_eq!(0, keyspace.write_buffer_size());
1230        assert_eq!(1, db.segment_count());
1231        assert_eq!(1, db2.segment_count());
1232
1233        Ok(())
1234    }
1235
1236    #[test]
1237    pub fn force_flush() -> crate::Result<()> {
1238        let folder = tempfile::tempdir()?;
1239
1240        let config = Config::new(folder);
1241        let keyspace = Keyspace::create_or_recover(config)?;
1242        let db = keyspace.open_partition("default", Default::default())?;
1243
1244        assert_eq!(0, keyspace.write_buffer_size());
1245
1246        assert_eq!(0, db.segment_count());
1247
1248        assert_eq!(
1249            0,
1250            keyspace
1251                .journal_manager
1252                .read()
1253                .expect("lock is poisoned")
1254                .sealed_journal_count()
1255        );
1256
1257        assert_eq!(
1258            0,
1259            keyspace
1260                .flush_manager
1261                .read()
1262                .expect("lock is poisoned")
1263                .queued_size()
1264        );
1265
1266        assert_eq!(
1267            0,
1268            keyspace
1269                .flush_manager
1270                .read()
1271                .expect("lock is poisoned")
1272                .len()
1273        );
1274
1275        for _ in 0..100 {
1276            db.insert(nanoid::nanoid!(), "abc")?;
1277        }
1278
1279        db.rotate_memtable()?;
1280
1281        assert_eq!(
1282            1,
1283            keyspace
1284                .flush_manager
1285                .read()
1286                .expect("lock is poisoned")
1287                .len()
1288        );
1289
1290        assert_eq!(
1291            1,
1292            keyspace
1293                .journal_manager
1294                .read()
1295                .expect("lock is poisoned")
1296                .sealed_journal_count()
1297        );
1298
1299        assert_eq!(0, db.segment_count());
1300
1301        keyspace.force_flush()?;
1302
1303        assert_eq!(
1304            0,
1305            keyspace
1306                .flush_manager
1307                .read()
1308                .expect("lock is poisoned")
1309                .queued_size()
1310        );
1311
1312        assert_eq!(
1313            0,
1314            keyspace
1315                .flush_manager
1316                .read()
1317                .expect("lock is poisoned")
1318                .len()
1319        );
1320
1321        assert_eq!(
1322            0,
1323            keyspace
1324                .journal_manager
1325                .read()
1326                .expect("lock is poisoned")
1327                .sealed_journal_count()
1328        );
1329
1330        assert_eq!(0, keyspace.write_buffer_size());
1331        assert_eq!(1, db.segment_count());
1332
1333        Ok(())
1334    }
1335}