1use crate::{
6 background_worker::{Activity, BackgroundWorker},
7 batch::{Batch, PartitionKey},
8 compaction::manager::CompactionManager,
9 config::Config,
10 file::{
11 fsync_directory, FJALL_MARKER, JOURNALS_FOLDER, PARTITIONS_FOLDER, PARTITION_DELETED_MARKER,
12 },
13 flush::manager::FlushManager,
14 journal::{manager::JournalManager, writer::PersistMode, Journal},
15 monitor::Monitor,
16 partition::name::is_valid_partition_name,
17 poison_dart::PoisonDart,
18 recovery::{recover_partitions, recover_sealed_memtables},
19 snapshot_tracker::SnapshotTracker,
20 stats::Stats,
21 version::Version,
22 write_buffer_manager::WriteBufferManager,
23 HashMap, PartitionCreateOptions, PartitionHandle,
24};
25use lsm_tree::{AbstractTree, SequenceNumberCounter};
26use std::{
27 fs::{remove_dir_all, File},
28 path::Path,
29 sync::{
30 atomic::{AtomicBool, AtomicUsize},
31 Arc, RwLock,
32 },
33 time::Duration,
34};
35use std_semaphore::Semaphore;
36
37pub type Partitions = HashMap<PartitionKey, PartitionHandle>;
38
39#[allow(clippy::module_name_repetitions)]
40pub struct KeyspaceInner {
41 #[doc(hidden)]
43 pub partitions: Arc<RwLock<Partitions>>,
44
45 pub(crate) journal: Arc<Journal>,
47
48 #[doc(hidden)]
50 pub config: Config,
51
52 pub(crate) seqno: SequenceNumberCounter,
54
55 pub(crate) visible_seqno: SequenceNumberCounter,
57
58 pub(crate) flush_manager: Arc<RwLock<FlushManager>>,
61
62 pub(crate) journal_manager: Arc<RwLock<JournalManager>>,
65
66 pub(crate) flush_semaphore: Arc<Semaphore>,
68
69 pub(crate) compaction_manager: CompactionManager,
72
73 pub(crate) stop_signal: lsm_tree::stop_signal::StopSignal,
75
76 pub(crate) active_background_threads: Arc<AtomicUsize>,
78
79 pub(crate) write_buffer_manager: WriteBufferManager,
81
82 pub(crate) is_poisoned: Arc<AtomicBool>,
84
85 pub(crate) stats: Arc<Stats>,
86
87 #[doc(hidden)]
88 pub snapshot_tracker: SnapshotTracker,
89}
90
91impl Drop for KeyspaceInner {
92 fn drop(&mut self) {
93 log::trace!("Dropping Keyspace");
94
95 self.stop_signal.send();
96
97 while self
98 .active_background_threads
99 .load(std::sync::atomic::Ordering::Relaxed)
100 > 0
101 {
102 std::thread::sleep(std::time::Duration::from_micros(100));
103
104 self.flush_semaphore.release();
106 self.compaction_manager.notify_empty();
107 }
108
109 self.flush_manager
111 .write()
112 .expect("lock is poisoned")
113 .clear();
114 self.compaction_manager.clear();
115 self.partitions.write().expect("lock is poisoned").clear();
116 self.journal_manager
117 .write()
118 .expect("lock is poisoned")
119 .clear();
120
121 self.config.descriptor_table.clear();
122
123 if self.config.clean_path_on_drop {
124 log::info!(
125 "Deleting keyspace because temporary=true: {:?}",
126 self.config.path,
127 );
128
129 if let Err(err) = remove_dir_all(&self.config.path) {
130 eprintln!("Failed to clean up path: {:?} - {err}", self.config.path);
131 }
132 }
133
134 #[cfg(feature = "__internal_whitebox")]
135 crate::drop::decrement_drop_counter();
136 }
137}
138
139#[derive(Clone)]
146#[doc(alias = "database")]
147#[doc(alias = "collection")]
148pub struct Keyspace(pub(crate) Arc<KeyspaceInner>);
149
150impl std::ops::Deref for Keyspace {
151 type Target = KeyspaceInner;
152
153 fn deref(&self) -> &Self::Target {
154 &self.0
155 }
156}
157
158impl Keyspace {
159 #[must_use]
187 pub fn batch(&self) -> Batch {
188 let mut batch = Batch::new(self.clone());
189
190 if !self.config.manual_journal_persist {
191 batch = batch.durability(Some(PersistMode::Buffer));
192 }
193
194 batch
195 }
196
197 #[must_use]
199 pub fn write_buffer_size(&self) -> u64 {
200 self.write_buffer_manager.get()
201 }
202
203 #[doc(hidden)]
205 #[must_use]
206 pub fn flushes_completed(&self) -> usize {
207 self.stats
208 .flushes_completed
209 .load(std::sync::atomic::Ordering::Relaxed)
210 }
211
212 #[doc(hidden)]
214 #[must_use]
215 pub fn time_compacting(&self) -> std::time::Duration {
216 let us = self
217 .stats
218 .time_compacting
219 .load(std::sync::atomic::Ordering::Relaxed);
220
221 std::time::Duration::from_micros(us)
222 }
223
224 #[doc(hidden)]
226 #[must_use]
227 pub fn active_compactions(&self) -> usize {
228 self.stats
229 .active_compaction_count
230 .load(std::sync::atomic::Ordering::Relaxed)
231 }
232
233 #[doc(hidden)]
235 #[must_use]
236 pub fn compactions_completed(&self) -> usize {
237 self.stats
238 .compactions_completed
239 .load(std::sync::atomic::Ordering::Relaxed)
240 }
241
242 #[must_use]
257 pub fn journal_count(&self) -> usize {
258 self.journal_manager
259 .read()
260 .expect("lock is poisoned")
261 .journal_count()
262 }
263
264 #[doc(hidden)]
266 #[must_use]
267 pub fn journal_disk_space(&self) -> u64 {
268 self.journal.get_writer().len().unwrap_or_default()
270 + self
271 .journal_manager
272 .read()
273 .expect("lock is poisoned")
274 .disk_space_used()
275 }
276
277 pub fn disk_space(&self) -> u64 {
292 let partitions_size = self
293 .partitions
294 .read()
295 .expect("lock is poisoned")
296 .values()
297 .map(PartitionHandle::disk_space)
298 .sum::<u64>();
299
300 self.journal_disk_space() + partitions_size
301 }
302
303 pub fn persist(&self, mode: PersistMode) -> crate::Result<()> {
328 if self.is_poisoned.load(std::sync::atomic::Ordering::Relaxed) {
329 return Err(crate::Error::Poisoned);
330 }
331
332 if let Err(e) = self.journal.persist(mode) {
333 self.is_poisoned
334 .store(true, std::sync::atomic::Ordering::Release);
335
336 log::error!(
337 "flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
338 );
339
340 return Err(crate::Error::Poisoned);
341 }
342
343 Ok(())
344 }
345
346 #[doc(hidden)]
347 #[must_use]
348 pub fn cache_capacity(&self) -> u64 {
349 self.config.cache.capacity()
350 }
351
352 pub fn open(config: Config) -> crate::Result<Self> {
358 log::debug!(
359 "cache capacity={}MiB",
360 config.cache.capacity() / 1_024 / 1_024,
361 );
362
363 let keyspace = Self::create_or_recover(config)?;
364 keyspace.start_background_threads()?;
365
366 #[cfg(feature = "__internal_whitebox")]
367 crate::drop::increment_drop_counter();
368
369 Ok(keyspace)
370 }
371
372 #[doc(hidden)]
378 pub fn create_or_recover(config: Config) -> crate::Result<Self> {
379 log::info!("Opening keyspace at {:?}", config.path);
380
381 if config.path.join(FJALL_MARKER).try_exists()? {
382 Self::recover(config)
383 } else {
384 Self::create_new(config)
385 }
386 }
387
388 pub(crate) fn start_background_threads(&self) -> crate::Result<()> {
393 if self.config.flush_workers_count > 0 {
394 self.spawn_flush_worker()?;
395
396 for _ in 0..self
397 .flush_manager
398 .read()
399 .expect("lock is poisoned")
400 .queue_count()
401 {
402 self.flush_semaphore.release();
403 }
404 }
405
406 log::debug!(
407 "Spawning {} compaction threads",
408 self.config.compaction_workers_count
409 );
410
411 for _ in 0..self.config.compaction_workers_count {
412 self.spawn_compaction_worker()?;
413 }
414
415 if let Some(ms) = self.config.fsync_ms {
416 self.spawn_fsync_thread(ms.into())?;
417 }
418
419 self.spawn_monitor_thread()
420 }
421
422 pub fn delete_partition(&self, handle: PartitionHandle) -> crate::Result<()> {
428 let partition_path = handle.path();
429
430 let file = File::create(partition_path.join(PARTITION_DELETED_MARKER))?;
431 file.sync_all()?;
432
433 fsync_directory(partition_path)?;
435
436 handle
437 .is_deleted
438 .store(true, std::sync::atomic::Ordering::Release);
439
440 self.compaction_manager.remove_partition(&handle.name);
442
443 self.flush_manager
444 .write()
445 .expect("lock is poisoned")
446 .remove_partition(&handle.name);
447
448 self.partitions
449 .write()
450 .expect("lock is poisoned")
451 .remove(&handle.name);
452
453 Ok(())
454 }
455
456 pub fn open_partition(
472 &self,
473 name: &str,
474 create_options: PartitionCreateOptions,
475 ) -> crate::Result<PartitionHandle> {
476 assert!(is_valid_partition_name(name));
477
478 let mut partitions = self.partitions.write().expect("lock is poisoned");
479
480 Ok(if let Some(partition) = partitions.get(name) {
481 partition.clone()
482 } else {
483 let name: PartitionKey = name.into();
484
485 let handle = PartitionHandle::create_new(self, name.clone(), create_options)?;
486 partitions.insert(name, handle.clone());
487
488 #[cfg(feature = "__internal_whitebox")]
489 crate::drop::increment_drop_counter();
490
491 handle
492 })
493 }
494
495 #[must_use]
497 pub fn partition_count(&self) -> usize {
498 self.partitions.read().expect("lock is poisoned").len()
499 }
500
501 #[must_use]
503 pub fn list_partitions(&self) -> Vec<PartitionKey> {
504 self.partitions
505 .read()
506 .expect("lock is poisoned")
507 .keys()
508 .cloned()
509 .collect()
510 }
511
512 #[must_use]
528 pub fn partition_exists(&self, name: &str) -> bool {
529 self.partitions
530 .read()
531 .expect("lock is poisoned")
532 .contains_key(name)
533 }
534
535 #[must_use]
574 pub fn instant(&self) -> crate::Instant {
575 self.visible_seqno.get()
576 }
577
578 fn check_version<P: AsRef<Path>>(path: P) -> crate::Result<()> {
579 let bytes = std::fs::read(path.as_ref().join(FJALL_MARKER))?;
580
581 if let Some(version) = Version::parse_file_header(&bytes) {
582 if version != Version::V2 {
583 return Err(crate::Error::InvalidVersion(Some(version)));
584 }
585 } else {
586 return Err(crate::Error::InvalidVersion(None));
587 }
588
589 Ok(())
590 }
591
592 #[allow(clippy::too_many_lines)]
594 #[doc(hidden)]
595 pub fn recover(config: Config) -> crate::Result<Self> {
596 log::info!("Recovering keyspace at {:?}", config.path);
597
598 Self::check_version(&config.path)?;
603
604 let journals_folder = config.path.join(JOURNALS_FOLDER);
606 let journal_recovery = Journal::recover(journals_folder)?;
607 log::debug!("journal recovery result: {journal_recovery:#?}");
608
609 let active_journal = Arc::new(journal_recovery.active);
610 active_journal.get_writer().persist(PersistMode::SyncAll)?;
611
612 let sealed_journals = journal_recovery.sealed;
613
614 let journal_manager = JournalManager::from_active(active_journal.path());
615
616 let inner = KeyspaceInner {
618 config,
619 journal: active_journal,
620 partitions: Arc::new(RwLock::new(Partitions::with_capacity_and_hasher(
621 10,
622 xxhash_rust::xxh3::Xxh3Builder::new(),
623 ))),
624 seqno: SequenceNumberCounter::default(),
625 visible_seqno: SequenceNumberCounter::default(),
626 flush_manager: Arc::new(RwLock::new(FlushManager::new())),
627 journal_manager: Arc::new(RwLock::new(journal_manager)),
628 flush_semaphore: Arc::new(Semaphore::new(0)),
629 compaction_manager: CompactionManager::default(),
630 stop_signal: lsm_tree::stop_signal::StopSignal::default(),
631 active_background_threads: Arc::default(),
632 write_buffer_manager: WriteBufferManager::default(),
633 is_poisoned: Arc::default(),
634 snapshot_tracker: SnapshotTracker::default(),
635 stats: Arc::default(),
636 };
637
638 let keyspace = Self(Arc::new(inner));
639
640 recover_partitions(&keyspace)?;
642
643 recover_sealed_memtables(
645 &keyspace,
646 &sealed_journals
647 .into_iter()
648 .map(|(_, x)| x)
649 .collect::<Vec<_>>(),
650 )?;
651
652 {
653 let partitions = keyspace.partitions.read().expect("lock is poisoned");
654
655 #[cfg(debug_assertions)]
656 for partition in partitions.values() {
657 debug_assert!(
660 partition.tree.lock_active_memtable().is_empty(),
661 "active memtable is not empty - this is a bug"
662 );
663 }
664
665 if !journal_recovery.was_active_created {
668 log::trace!("Recovering active memtables from active journal");
669
670 let reader = keyspace.journal.get_reader()?;
671
672 for batch in reader {
673 let batch = batch?;
674
675 for item in batch.items {
676 if let Some(partition) = partitions.get(&item.partition) {
677 let tree = &partition.tree;
678
679 match item.value_type {
680 lsm_tree::ValueType::Value => {
681 tree.insert(item.key, item.value, batch.seqno);
682 }
683 lsm_tree::ValueType::Tombstone => {
684 tree.remove(item.key, batch.seqno);
685 }
686 lsm_tree::ValueType::WeakTombstone => {
687 tree.remove_weak(item.key, batch.seqno);
688 }
689 }
690 }
691 }
692 }
693
694 for partition in partitions.values() {
695 let size = partition.tree.active_memtable_size().into();
696
697 log::trace!(
698 "Recovered active memtable of size {size}B for partition {:?} ({} items)",
699 partition.name,
700 partition.tree.lock_active_memtable().len(),
701 );
702
703 keyspace.write_buffer_manager.allocate(size);
705
706 let maybe_next_seqno = partition
708 .tree
709 .get_highest_seqno()
710 .map(|x| x + 1)
711 .unwrap_or_default();
712
713 keyspace
714 .seqno
715 .fetch_max(maybe_next_seqno, std::sync::atomic::Ordering::AcqRel);
716
717 log::debug!("Keyspace seqno is now {}", keyspace.seqno.get());
718 }
719 }
720 }
721
722 keyspace.visible_seqno.store(
723 keyspace.seqno.load(std::sync::atomic::Ordering::Acquire),
724 std::sync::atomic::Ordering::Release,
725 );
726
727 log::trace!("Recovery successful");
728
729 Ok(keyspace)
730 }
731
732 #[doc(hidden)]
733 pub fn create_new(config: Config) -> crate::Result<Self> {
734 let path = config.path.clone();
735 log::debug!("Creating keyspace at {path:?}");
736
737 std::fs::create_dir_all(&path)?;
738
739 let marker_path = path.join(FJALL_MARKER);
740 assert!(!marker_path.try_exists()?);
741
742 let journal_folder_path = path.join(JOURNALS_FOLDER);
743 let partition_folder_path = path.join(PARTITIONS_FOLDER);
744
745 std::fs::create_dir_all(&journal_folder_path)?;
746 std::fs::create_dir_all(&partition_folder_path)?;
747
748 let active_journal_path = journal_folder_path.join("0");
749 let journal = Journal::create_new(&active_journal_path)?;
750 let journal = Arc::new(journal);
751
752 let inner = KeyspaceInner {
753 config,
754 journal,
755 partitions: Arc::new(RwLock::new(Partitions::with_capacity_and_hasher(
756 10,
757 xxhash_rust::xxh3::Xxh3Builder::new(),
758 ))),
759 seqno: SequenceNumberCounter::default(),
760 visible_seqno: SequenceNumberCounter::default(),
761 flush_manager: Arc::new(RwLock::new(FlushManager::new())),
762 journal_manager: Arc::new(RwLock::new(JournalManager::from_active(
763 active_journal_path,
764 ))),
765 flush_semaphore: Arc::new(Semaphore::new(0)),
766 compaction_manager: CompactionManager::default(),
767 stop_signal: lsm_tree::stop_signal::StopSignal::default(),
768 active_background_threads: Arc::default(),
769 write_buffer_manager: WriteBufferManager::default(),
770 is_poisoned: Arc::default(),
771 snapshot_tracker: SnapshotTracker::default(),
772 stats: Arc::default(),
773 };
774
775 let mut file = std::fs::File::create(marker_path)?;
778 Version::V2.write_file_header(&mut file)?;
779 file.sync_all()?;
780
781 fsync_directory(&journal_folder_path)?;
783 fsync_directory(&partition_folder_path)?;
784 fsync_directory(&path)?;
785
786 Ok(Self(Arc::new(inner)))
787 }
788
789 fn spawn_monitor_thread(&self) -> crate::Result<()> {
790 const NAME: &str = "monitor";
791
792 let monitor = Monitor::new(self);
793
794 let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
795 let thread_counter = self.active_background_threads.clone();
796 let stop_signal = self.stop_signal.clone();
797
798 let worker = BackgroundWorker::new(monitor, poison_dart, thread_counter, stop_signal);
799
800 std::thread::Builder::new()
801 .name(NAME.into())
802 .spawn(move || {
803 worker.start();
804 })
805 .map(|_| ())
806 .map_err(Into::into)
807 }
808
809 fn spawn_fsync_thread(&self, ms: u64) -> crate::Result<()> {
810 const NAME: &str = "syncer";
811
812 struct Syncer {
813 journal: Arc<Journal>,
814 sleep_dur: Duration,
815 }
816
817 impl Activity for Syncer {
818 fn name(&self) -> &'static str {
819 NAME
820 }
821
822 fn run(&mut self) -> crate::Result<()> {
823 std::thread::sleep(self.sleep_dur);
824 self.journal.persist(PersistMode::SyncAll)?;
825 Ok(())
826 }
827 }
828
829 let syncer = Syncer {
830 journal: self.journal.clone(),
831 sleep_dur: Duration::from_millis(ms),
832 };
833
834 let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
835 let thread_counter = self.active_background_threads.clone();
836 let stop_signal = self.stop_signal.clone();
837
838 let worker = BackgroundWorker::new(syncer, poison_dart, thread_counter, stop_signal);
839
840 std::thread::Builder::new()
841 .name(NAME.into())
842 .spawn(move || {
843 worker.start();
844 })
845 .map(|_| ())
846 .map_err(Into::into)
847 }
848
849 fn spawn_compaction_worker(&self) -> crate::Result<()> {
850 const NAME: &str = "compactor";
851
852 struct Compactor {
853 manager: CompactionManager,
854 snapshot_tracker: SnapshotTracker,
855 stats: Arc<Stats>,
856 }
857
858 impl Activity for Compactor {
859 fn name(&self) -> &'static str {
860 NAME
861 }
862
863 fn run(&mut self) -> crate::Result<()> {
864 log::trace!("{:?}: waiting for work", self.name());
865 self.manager.wait_for();
866 crate::compaction::worker::run(&self.manager, &self.snapshot_tracker, &self.stats)?;
867 Ok(())
868 }
869 }
870
871 let compactor = Compactor {
872 manager: self.compaction_manager.clone(),
873 snapshot_tracker: self.snapshot_tracker.clone(),
874 stats: self.stats.clone(),
875 };
876
877 let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
878 let thread_counter = self.active_background_threads.clone();
879 let stop_signal = self.stop_signal.clone();
880
881 let worker = BackgroundWorker::new(compactor, poison_dart, thread_counter, stop_signal);
882
883 std::thread::Builder::new()
884 .name(NAME.into())
885 .spawn(move || {
886 worker.start();
887 })
888 .map(|_| ())
889 .map_err(Into::into)
890 }
891
892 #[doc(hidden)]
896 pub fn force_flush(&self) -> crate::Result<()> {
897 let parallelism = self.config.flush_workers_count;
898
899 crate::flush::worker::run(
900 &self.flush_manager,
901 &self.journal_manager,
902 &self.compaction_manager,
903 &self.write_buffer_manager,
904 &self.snapshot_tracker,
905 parallelism,
906 &self.stats,
907 )
908 }
909
910 fn spawn_flush_worker(&self) -> crate::Result<()> {
911 const NAME: &str = "flusher";
912
913 struct Flusher {
914 parallelism: usize,
915 flush_semaphore: Arc<Semaphore>,
916 flush_manager: Arc<RwLock<FlushManager>>,
917 journal_manager: Arc<RwLock<JournalManager>>,
918 write_buffer_manager: WriteBufferManager,
919 compaction_manager: CompactionManager,
920 snapshot_tracker: SnapshotTracker,
921 stats: Arc<Stats>,
922 }
923
924 impl Activity for Flusher {
925 fn name(&self) -> &'static str {
926 NAME
927 }
928
929 fn run(&mut self) -> crate::Result<()> {
930 log::trace!("{:?}: waiting for work", self.name());
931
932 self.flush_semaphore.acquire();
933
934 crate::flush::worker::run(
935 &self.flush_manager,
936 &self.journal_manager,
937 &self.compaction_manager,
938 &self.write_buffer_manager,
939 &self.snapshot_tracker,
940 self.parallelism,
941 &self.stats,
942 )?;
943
944 Ok(())
945 }
946 }
947
948 let flusher = Flusher {
949 flush_manager: self.flush_manager.clone(),
950 journal_manager: self.journal_manager.clone(),
951 compaction_manager: self.compaction_manager.clone(),
952 flush_semaphore: self.flush_semaphore.clone(),
953 write_buffer_manager: self.write_buffer_manager.clone(),
954 snapshot_tracker: self.snapshot_tracker.clone(),
955 stats: self.stats.clone(),
956 parallelism: self.config.flush_workers_count,
957 };
958
959 let poison_dart = PoisonDart::new(NAME, self.is_poisoned.clone());
960 let thread_counter = self.active_background_threads.clone();
961 let stop_signal = self.stop_signal.clone();
962
963 let worker = BackgroundWorker::new(flusher, poison_dart, thread_counter, stop_signal);
964
965 std::thread::Builder::new()
966 .name(NAME.into())
967 .spawn(move || {
968 worker.start();
969 })
970 .map(|_| ())
971 .map_err(Into::into)
972 }
973}
974
975#[cfg(test)]
976mod tests {
977 use super::*;
978 use test_log::test;
979
980 #[test]
986 pub fn test_exotic_partition_names() -> crate::Result<()> {
987 let folder = tempfile::tempdir()?;
988 let config = Config::new(&folder);
989 let keyspace = Keyspace::create_or_recover(config)?;
990
991 for name in ["hello$world", "hello#world", "hello.world", "hello_world"] {
992 let db = keyspace.open_partition(name, Default::default())?;
993 db.insert("a", "a")?;
994 assert_eq!(1, db.len()?);
995 }
996
997 Ok(())
998 }
999
1000 #[test]
1001 pub fn recover_after_rotation_multiple_partitions() -> crate::Result<()> {
1002 let folder = tempfile::tempdir()?;
1003
1004 {
1005 let config = Config::new(&folder);
1006 let keyspace = Keyspace::create_or_recover(config)?;
1007 let db = keyspace.open_partition("default", Default::default())?;
1008 let db2 = keyspace.open_partition("default2", Default::default())?;
1009
1010 db.insert("a", "a")?;
1011 db2.insert("a", "a")?;
1012 assert_eq!(1, db.len()?);
1013 assert_eq!(1, db2.len()?);
1014
1015 assert_eq!(None, db.tree.get_highest_persisted_seqno());
1016 assert_eq!(None, db2.tree.get_highest_persisted_seqno());
1017
1018 db.rotate_memtable()?;
1019
1020 assert_eq!(1, db.len()?);
1021 assert_eq!(1, db.tree.sealed_memtable_count());
1022
1023 assert_eq!(1, db2.len()?);
1024 assert_eq!(0, db2.tree.sealed_memtable_count());
1025
1026 db2.insert("b", "b")?;
1027 db2.rotate_memtable()?;
1028
1029 assert_eq!(1, db.len()?);
1030 assert_eq!(1, db.tree.sealed_memtable_count());
1031
1032 assert_eq!(2, db2.len()?);
1033 assert_eq!(1, db2.tree.sealed_memtable_count());
1034 }
1035
1036 {
1037 let config = Config::new(&folder).flush_workers(16);
1041 let keyspace = Keyspace::create_or_recover(config)?;
1042 let db = keyspace.open_partition("default", Default::default())?;
1043 let db2 = keyspace.open_partition("default2", Default::default())?;
1044
1045 assert_eq!(1, db.len()?);
1046 assert_eq!(1, db.tree.sealed_memtable_count());
1047
1048 assert_eq!(2, db2.len()?);
1049 assert_eq!(2, db2.tree.sealed_memtable_count());
1050
1051 assert_eq!(3, keyspace.journal_count());
1052
1053 keyspace.force_flush()?;
1054
1055 assert_eq!(1, db.len()?);
1056 assert_eq!(0, db.tree.sealed_memtable_count());
1057
1058 assert_eq!(2, db2.len()?);
1059 assert_eq!(0, db2.tree.sealed_memtable_count());
1060
1061 assert_eq!(1, keyspace.journal_count());
1062
1063 assert_eq!(Some(0), db.tree.get_highest_persisted_seqno());
1064 assert_eq!(Some(2), db2.tree.get_highest_persisted_seqno());
1065 }
1066
1067 Ok(())
1068 }
1069
1070 #[test]
1071 pub fn recover_after_rotation() -> crate::Result<()> {
1072 let folder = tempfile::tempdir()?;
1073
1074 {
1075 let config = Config::new(&folder);
1076 let keyspace = Keyspace::create_or_recover(config)?;
1077 let db = keyspace.open_partition("default", Default::default())?;
1078
1079 db.insert("a", "a")?;
1080 assert_eq!(1, db.len()?);
1081
1082 db.rotate_memtable()?;
1083
1084 assert_eq!(1, db.len()?);
1085 assert_eq!(1, db.tree.sealed_memtable_count());
1086 }
1087
1088 {
1089 let config = Config::new(&folder);
1090 let keyspace = Keyspace::create_or_recover(config)?;
1091 let db = keyspace.open_partition("default", Default::default())?;
1092
1093 assert_eq!(1, db.len()?);
1094 assert_eq!(1, db.tree.sealed_memtable_count());
1095 assert_eq!(2, keyspace.journal_count());
1096
1097 keyspace.force_flush()?;
1098
1099 assert_eq!(1, db.len()?);
1100 assert_eq!(0, db.tree.sealed_memtable_count());
1101 assert_eq!(1, keyspace.journal_count());
1102 }
1103
1104 Ok(())
1105 }
1106
1107 #[test]
1108 pub fn force_flush_multiple_partitions() -> crate::Result<()> {
1109 let folder = tempfile::tempdir()?;
1110
1111 let config = Config::new(folder);
1112 let keyspace = Keyspace::create_or_recover(config)?;
1113 let db = keyspace.open_partition("default", Default::default())?;
1114 let db2 = keyspace.open_partition("default2", Default::default())?;
1115
1116 assert_eq!(0, keyspace.write_buffer_size());
1117
1118 assert_eq!(0, db.segment_count());
1119 assert_eq!(0, db2.segment_count());
1120
1121 assert_eq!(
1122 0,
1123 keyspace
1124 .journal_manager
1125 .read()
1126 .expect("lock is poisoned")
1127 .sealed_journal_count()
1128 );
1129
1130 assert_eq!(
1131 0,
1132 keyspace
1133 .flush_manager
1134 .read()
1135 .expect("lock is poisoned")
1136 .queued_size()
1137 );
1138
1139 assert_eq!(
1140 0,
1141 keyspace
1142 .flush_manager
1143 .read()
1144 .expect("lock is poisoned")
1145 .len()
1146 );
1147
1148 for _ in 0..100 {
1149 db.insert(nanoid::nanoid!(), "abc")?;
1150 db2.insert(nanoid::nanoid!(), "abc")?;
1151 }
1152
1153 db.rotate_memtable()?;
1154
1155 assert_eq!(
1156 1,
1157 keyspace
1158 .flush_manager
1159 .read()
1160 .expect("lock is poisoned")
1161 .len()
1162 );
1163
1164 assert_eq!(
1165 1,
1166 keyspace
1167 .journal_manager
1168 .read()
1169 .expect("lock is poisoned")
1170 .sealed_journal_count()
1171 );
1172
1173 for _ in 0..100 {
1174 db2.insert(nanoid::nanoid!(), "abc")?;
1175 }
1176
1177 db2.rotate_memtable()?;
1178
1179 assert_eq!(
1180 2,
1181 keyspace
1182 .flush_manager
1183 .read()
1184 .expect("lock is poisoned")
1185 .len()
1186 );
1187
1188 assert_eq!(
1189 2,
1190 keyspace
1191 .journal_manager
1192 .read()
1193 .expect("lock is poisoned")
1194 .sealed_journal_count()
1195 );
1196
1197 assert_eq!(0, db.segment_count());
1198 assert_eq!(0, db2.segment_count());
1199
1200 keyspace.force_flush()?;
1201
1202 assert_eq!(
1203 0,
1204 keyspace
1205 .flush_manager
1206 .read()
1207 .expect("lock is poisoned")
1208 .queued_size()
1209 );
1210
1211 assert_eq!(
1212 0,
1213 keyspace
1214 .flush_manager
1215 .read()
1216 .expect("lock is poisoned")
1217 .len()
1218 );
1219
1220 assert_eq!(
1221 0,
1222 keyspace
1223 .journal_manager
1224 .read()
1225 .expect("lock is poisoned")
1226 .sealed_journal_count()
1227 );
1228
1229 assert_eq!(0, keyspace.write_buffer_size());
1230 assert_eq!(1, db.segment_count());
1231 assert_eq!(1, db2.segment_count());
1232
1233 Ok(())
1234 }
1235
1236 #[test]
1237 pub fn force_flush() -> crate::Result<()> {
1238 let folder = tempfile::tempdir()?;
1239
1240 let config = Config::new(folder);
1241 let keyspace = Keyspace::create_or_recover(config)?;
1242 let db = keyspace.open_partition("default", Default::default())?;
1243
1244 assert_eq!(0, keyspace.write_buffer_size());
1245
1246 assert_eq!(0, db.segment_count());
1247
1248 assert_eq!(
1249 0,
1250 keyspace
1251 .journal_manager
1252 .read()
1253 .expect("lock is poisoned")
1254 .sealed_journal_count()
1255 );
1256
1257 assert_eq!(
1258 0,
1259 keyspace
1260 .flush_manager
1261 .read()
1262 .expect("lock is poisoned")
1263 .queued_size()
1264 );
1265
1266 assert_eq!(
1267 0,
1268 keyspace
1269 .flush_manager
1270 .read()
1271 .expect("lock is poisoned")
1272 .len()
1273 );
1274
1275 for _ in 0..100 {
1276 db.insert(nanoid::nanoid!(), "abc")?;
1277 }
1278
1279 db.rotate_memtable()?;
1280
1281 assert_eq!(
1282 1,
1283 keyspace
1284 .flush_manager
1285 .read()
1286 .expect("lock is poisoned")
1287 .len()
1288 );
1289
1290 assert_eq!(
1291 1,
1292 keyspace
1293 .journal_manager
1294 .read()
1295 .expect("lock is poisoned")
1296 .sealed_journal_count()
1297 );
1298
1299 assert_eq!(0, db.segment_count());
1300
1301 keyspace.force_flush()?;
1302
1303 assert_eq!(
1304 0,
1305 keyspace
1306 .flush_manager
1307 .read()
1308 .expect("lock is poisoned")
1309 .queued_size()
1310 );
1311
1312 assert_eq!(
1313 0,
1314 keyspace
1315 .flush_manager
1316 .read()
1317 .expect("lock is poisoned")
1318 .len()
1319 );
1320
1321 assert_eq!(
1322 0,
1323 keyspace
1324 .journal_manager
1325 .read()
1326 .expect("lock is poisoned")
1327 .sealed_journal_count()
1328 );
1329
1330 assert_eq!(0, keyspace.write_buffer_size());
1331 assert_eq!(1, db.segment_count());
1332
1333 Ok(())
1334 }
1335}