pub mod inner;
use crate::{
    coding::{Decode, Encode},
    compaction::{stream::CompactionStream, CompactionStrategy},
    config::Config,
    descriptor_table::FileDescriptorTable,
    level_manifest::LevelManifest,
    manifest::Manifest,
    memtable::Memtable,
    range::{prefix_to_range, MemtableLockGuard, TreeIter},
    segment::{block_index::two_level_index::TwoLevelBlockIndex, meta::TableType, Segment},
    stop_signal::StopSignal,
    value::InternalValue,
    version::Version,
    AbstractTree, BlockCache, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
};
use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
use std::{
    io::Cursor,
    ops::RangeBounds,
    path::Path,
    sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
    if item.is_tombstone() {
        None
    } else {
        Some(item)
    }
}
#[derive(Clone)]
pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
impl std::ops::Deref for Tree {
    type Target = TreeInner;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}
impl AbstractTree for Tree {
    #[cfg(feature = "bloom")]
    fn bloom_filter_size(&self) -> usize {
        self.levels
            .read()
            .expect("lock is poisoned")
            .iter()
            .map(|x| x.bloom_filter.len())
            .sum()
    }
    fn sealed_memtable_count(&self) -> usize {
        self.sealed_memtables
            .read()
            .expect("lock is poisoned")
            .len()
    }
    fn is_first_level_disjoint(&self) -> bool {
        self.levels
            .read()
            .expect("lock is poisoned")
            .levels
            .first()
            .expect("first level should exist")
            .is_disjoint
    }
    fn verify(&self) -> crate::Result<usize> {
        let _lock = self.lock_active_memtable();
        let mut sum = 0;
        let level_manifest = self.levels.read().expect("lock is poisoned");
        for level in &level_manifest.levels {
            for segment in &level.segments {
                sum += segment.verify()?;
            }
        }
        Ok(sum)
    }
    fn keys_with_seqno(
        &self,
        seqno: SeqNo,
        index: Option<Arc<Memtable>>,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
        Box::new(
            self.create_iter(Some(seqno), index)
                .map(|x| x.map(|(k, _)| k)),
        )
    }
    fn values_with_seqno(
        &self,
        seqno: SeqNo,
        index: Option<Arc<Memtable>>,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
        Box::new(
            self.create_iter(Some(seqno), index)
                .map(|x| x.map(|(_, v)| v)),
        )
    }
    fn keys(&self) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
        Box::new(self.create_iter(None, None).map(|x| x.map(|(k, _)| k)))
    }
    fn values(&self) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
        Box::new(self.create_iter(None, None).map(|x| x.map(|(_, v)| v)))
    }
    fn flush_memtable(
        &self,
        segment_id: SegmentId,
        memtable: &Arc<Memtable>,
        seqno_threshold: SeqNo,
    ) -> crate::Result<Option<Arc<Segment>>> {
        use crate::{
            file::SEGMENTS_FOLDER,
            segment::writer::{Options, Writer},
        };
        let folder = self.config.path.join(SEGMENTS_FOLDER);
        log::debug!("writing segment to {folder:?}");
        let mut segment_writer = Writer::new(Options {
            segment_id,
            folder,
            data_block_size: self.config.data_block_size,
            index_block_size: self.config.index_block_size,
        })?
        .use_compression(self.config.compression);
        #[cfg(feature = "bloom")]
        {
            if self.config.bloom_bits_per_key >= 0 {
                segment_writer = segment_writer.use_bloom_policy(
                    crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
                );
            }
        }
        let iter = memtable.iter().map(Ok);
        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
        for item in compaction_filter {
            segment_writer.write(item?)?;
        }
        self.consume_writer(segment_id, segment_writer)
    }
    fn register_segments(&self, segments: &[Arc<Segment>]) -> crate::Result<()> {
        log::trace!("flush: acquiring levels manifest write lock");
        let mut original_levels = self.levels.write().expect("lock is poisoned");
        log::trace!("flush: acquiring sealed memtables write lock");
        let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
        original_levels.atomic_swap(|recipe| {
            for segment in segments.iter().cloned() {
                recipe
                    .first_mut()
                    .expect("first level should exist")
                    .insert(segment);
            }
        })?;
        for segment in segments {
            log::trace!("releasing sealed memtable {}", segment.metadata.id);
            sealed_memtables.remove(segment.metadata.id);
        }
        Ok(())
    }
    fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Memtable> {
        self.active_memtable.write().expect("lock is poisoned")
    }
    fn set_active_memtable(&self, memtable: Memtable) {
        let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
        *memtable_lock = memtable;
    }
    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
        let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
        memtable_lock.add(id, memtable);
    }
    fn compact(
        &self,
        strategy: Arc<dyn CompactionStrategy>,
        seqno_threshold: SeqNo,
    ) -> crate::Result<()> {
        use crate::compaction::worker::{do_compaction, Options};
        let mut opts = Options::from_tree(self, strategy);
        opts.eviction_seqno = seqno_threshold;
        do_compaction(&opts)?;
        log::debug!("lsm-tree: compaction run over");
        Ok(())
    }
    fn get_next_segment_id(&self) -> SegmentId {
        self.0.get_next_segment_id()
    }
    fn tree_config(&self) -> &Config {
        &self.config
    }
    fn active_memtable_size(&self) -> u32 {
        use std::sync::atomic::Ordering::Acquire;
        self.active_memtable
            .read()
            .expect("lock is poisoned")
            .approximate_size
            .load(Acquire)
    }
    fn tree_type(&self) -> crate::TreeType {
        crate::TreeType::Standard
    }
    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
        log::trace!("rotate: acquiring active memtable write lock");
        let mut active_memtable = self.lock_active_memtable();
        log::trace!("rotate: acquiring sealed memtables write lock");
        let mut sealed_memtables = self.lock_sealed_memtables();
        if active_memtable.is_empty() {
            return None;
        }
        let yanked_memtable = std::mem::take(&mut *active_memtable);
        let yanked_memtable = Arc::new(yanked_memtable);
        let tmp_memtable_id = self.get_next_segment_id();
        sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
        Some((tmp_memtable_id, yanked_memtable))
    }
    fn segment_count(&self) -> usize {
        self.levels.read().expect("lock is poisoned").len()
    }
    fn first_level_segment_count(&self) -> usize {
        self.levels
            .read()
            .expect("lock is poisoned")
            .first_level_segment_count()
    }
    #[allow(clippy::significant_drop_tightening)]
    fn approximate_len(&self) -> usize {
        let levels = self.levels.read().expect("lock is poisoned");
        let memtable = self.active_memtable.read().expect("lock is poisoned");
        let sealed = self.sealed_memtables.read().expect("lock is poisoned");
        let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
        let memtable_count = memtable.len() as u64;
        let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
        (memtable_count + sealed_count + segments_item_count)
            .try_into()
            .expect("should not be too large")
    }
    fn disk_space(&self) -> u64 {
        let levels = self.levels.read().expect("lock is poisoned");
        levels.iter().map(|x| x.metadata.file_size).sum()
    }
    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
        let active = self
            .active_memtable
            .read()
            .expect("lock is poisoned")
            .get_highest_seqno();
        let sealed = self
            .sealed_memtables
            .read()
            .expect("Lock is poisoned")
            .iter()
            .map(|(_, table)| table.get_highest_seqno())
            .max()
            .flatten();
        active.max(sealed)
    }
    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
        let levels = self.levels.read().expect("lock is poisoned");
        levels.iter().map(|s| s.get_highest_seqno()).max()
    }
    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
        use crate::AnyTree::Standard;
        Snapshot::new(Standard(self.clone()), seqno)
    }
    fn get_with_seqno<K: AsRef<[u8]>>(
        &self,
        key: K,
        seqno: SeqNo,
    ) -> crate::Result<Option<UserValue>> {
        Ok(self
            .get_internal_entry(key, true, Some(seqno))?
            .map(|x| x.value))
    }
    fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<UserValue>> {
        Ok(self.get_internal_entry(key, true, None)?.map(|x| x.value))
    }
    fn iter_with_seqno(
        &self,
        seqno: SeqNo,
        index: Option<Arc<Memtable>>,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
        self.range_with_seqno::<UserKey, _>(.., seqno, index)
    }
    fn range_with_seqno<K: AsRef<[u8]>, R: RangeBounds<K>>(
        &self,
        range: R,
        seqno: SeqNo,
        index: Option<Arc<Memtable>>,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
        Box::new(self.create_range(&range, Some(seqno), index))
    }
    fn prefix_with_seqno<K: AsRef<[u8]>>(
        &self,
        prefix: K,
        seqno: SeqNo,
        index: Option<Arc<Memtable>>,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
        Box::new(self.create_prefix(prefix, Some(seqno), index))
    }
    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
        &self,
        range: R,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
        Box::new(self.create_range(&range, None, None))
    }
    fn prefix<K: AsRef<[u8]>>(
        &self,
        prefix: K,
    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
        Box::new(self.create_prefix(prefix, None, None))
    }
    fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V, seqno: SeqNo) -> (u32, u32) {
        let value =
            InternalValue::from_components(key.as_ref(), value.as_ref(), seqno, ValueType::Value);
        self.append_entry(value)
    }
    fn raw_insert_with_lock<K: AsRef<[u8]>, V: AsRef<[u8]>>(
        &self,
        lock: &RwLockWriteGuard<'_, Memtable>,
        key: K,
        value: V,
        seqno: SeqNo,
        r#type: ValueType,
    ) -> (u32, u32) {
        let value = InternalValue::from_components(key.as_ref(), value.as_ref(), seqno, r#type);
        lock.insert(value)
    }
    fn remove<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
        let value = InternalValue::new_tombstone(key.as_ref(), seqno);
        self.append_entry(value)
    }
    fn remove_weak<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
        let value = InternalValue::new_weak_tombstone(key.as_ref(), seqno);
        self.append_entry(value)
    }
}
impl Tree {
    pub(crate) fn open(config: Config) -> crate::Result<Self> {
        use crate::file::MANIFEST_FILE;
        log::debug!("Opening LSM-tree at {:?}", config.path);
        if config.path.join("version").try_exists()? {
            return Err(crate::Error::InvalidVersion(Version::V1));
        }
        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
            Self::recover(config)
        } else {
            Self::create_new(config)
        }?;
        Ok(tree)
    }
    pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Memtable> {
        self.active_memtable.read().expect("lock is poisoned")
    }
    #[doc(hidden)]
    pub fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
        log::info!("Starting major compaction");
        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
        self.compact(strategy, seqno_threshold)
    }
    pub(crate) fn consume_writer(
        &self,
        segment_id: SegmentId,
        mut writer: crate::segment::writer::Writer,
    ) -> crate::Result<Option<Arc<Segment>>> {
        #[cfg(feature = "bloom")]
        use crate::bloom::BloomFilter;
        let segment_folder = writer.opts.folder.clone();
        let segment_file_path = segment_folder.join(segment_id.to_string());
        let Some(trailer) = writer.finish()? else {
            return Ok(None);
        };
        log::debug!("Finalized segment write at {segment_folder:?}");
        let block_index = Arc::new(TwoLevelBlockIndex::from_file(
            &segment_file_path,
            trailer.offsets.tli_ptr,
            (self.id, segment_id).into(),
            self.config.descriptor_table.clone(),
            self.config.block_cache.clone(),
        )?);
        #[cfg(feature = "bloom")]
        let bloom_ptr = trailer.offsets.bloom_ptr;
        let created_segment: Arc<_> = Segment {
            tree_id: self.id,
            metadata: trailer.metadata,
            offsets: trailer.offsets,
            descriptor_table: self.config.descriptor_table.clone(),
            block_index,
            block_cache: self.config.block_cache.clone(),
            #[cfg(feature = "bloom")]
            bloom_filter: {
                use crate::coding::Decode;
                use std::io::Seek;
                assert!(*bloom_ptr > 0, "can not find bloom filter block");
                let mut reader = std::fs::File::open(&segment_file_path)?;
                reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?;
                BloomFilter::decode_from(&mut reader)?
            },
        }
        .into();
        self.config.descriptor_table.insert(
            segment_file_path,
            (self.id, created_segment.metadata.id).into(),
        );
        log::debug!("Flushed segment to {segment_folder:?}");
        Ok(Some(created_segment))
    }
    #[doc(hidden)]
    pub fn flush_active_memtable(
        &self,
        seqno_threshold: SeqNo,
    ) -> crate::Result<Option<Arc<Segment>>> {
        log::debug!("flush: flushing active memtable");
        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
            return Ok(None);
        };
        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
        else {
            return Ok(None);
        };
        self.register_segments(&[segment.clone()])?;
        Ok(Some(segment))
    }
    #[doc(hidden)]
    #[must_use]
    pub fn is_compacting(&self) -> bool {
        let levels = self.levels.read().expect("lock is poisoned");
        levels.is_compacting()
    }
    fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
        self.sealed_memtables.write().expect("lock is poisoned")
    }
    pub(crate) fn get_internal_entry_with_lock<K: AsRef<[u8]>>(
        &self,
        memtable_lock: &RwLockWriteGuard<'_, Memtable>,
        key: K,
        evict_tombstone: bool,
        seqno: Option<SeqNo>,
    ) -> crate::Result<Option<InternalValue>> {
        if let Some(entry) = memtable_lock.get(&key, seqno) {
            if evict_tombstone {
                return Ok(ignore_tombstone_value(entry));
            }
            return Ok(Some(entry));
        };
        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
            if evict_tombstone {
                return Ok(ignore_tombstone_value(entry));
            }
            return Ok(Some(entry));
        }
        self.get_internal_entry_from_segments(key, evict_tombstone, seqno)
    }
    fn get_internal_entry_from_sealed_memtables<K: AsRef<[u8]>>(
        &self,
        key: K,
        seqno: Option<SeqNo>,
    ) -> Option<InternalValue> {
        let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
        for (_, memtable) in memtable_lock.iter().rev() {
            if let Some(entry) = memtable.get(&key, seqno) {
                return Some(entry);
            }
        }
        None
    }
    fn get_internal_entry_from_segments<K: AsRef<[u8]>>(
        &self,
        key: K,
        evict_tombstone: bool, seqno: Option<SeqNo>,
    ) -> crate::Result<Option<InternalValue>> {
        #[cfg(feature = "bloom")]
        let key_hash = crate::bloom::BloomFilter::get_hash(key.as_ref());
        let level_manifest = self.levels.read().expect("lock is poisoned");
        for level in &level_manifest.levels {
            if level.len() >= 5 {
                if let Some(level) = level.as_disjoint() {
                    if let Some(segment) = level.get_segment_containing_key(&key) {
                        #[cfg(not(feature = "bloom"))]
                        let maybe_item = segment.get(&key, seqno)?;
                        #[cfg(feature = "bloom")]
                        let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?;
                        if let Some(item) = maybe_item {
                            if evict_tombstone {
                                return Ok(ignore_tombstone_value(item));
                            }
                            return Ok(Some(item));
                        }
                    } else {
                        continue;
                    }
                }
            }
            for segment in &level.segments {
                #[cfg(not(feature = "bloom"))]
                let maybe_item = segment.get(&key, seqno)?;
                #[cfg(feature = "bloom")]
                let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?;
                if let Some(item) = maybe_item {
                    if evict_tombstone {
                        return Ok(ignore_tombstone_value(item));
                    }
                    return Ok(Some(item));
                }
            }
        }
        Ok(None)
    }
    #[doc(hidden)]
    pub fn get_internal_entry<K: AsRef<[u8]>>(
        &self,
        key: K,
        evict_tombstone: bool, seqno: Option<SeqNo>,
    ) -> crate::Result<Option<InternalValue>> {
        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
        if let Some(entry) = memtable_lock.get(&key, seqno) {
            if evict_tombstone {
                return Ok(ignore_tombstone_value(entry));
            }
            return Ok(Some(entry));
        };
        drop(memtable_lock);
        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
            if evict_tombstone {
                return Ok(ignore_tombstone_value(entry));
            }
            return Ok(Some(entry));
        }
        self.get_internal_entry_from_segments(key, evict_tombstone, seqno)
    }
    #[doc(hidden)]
    #[must_use]
    pub fn create_iter(
        &self,
        seqno: Option<SeqNo>,
        ephemeral: Option<Arc<Memtable>>,
    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
    }
    #[doc(hidden)]
    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
        &'a self,
        range: &'a R,
        seqno: Option<SeqNo>,
        ephemeral: Option<Arc<Memtable>>,
    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
        use std::ops::Bound::{self, Excluded, Included, Unbounded};
        let lo: Bound<UserKey> = match range.start_bound() {
            Included(x) => Included(x.as_ref().into()),
            Excluded(x) => Excluded(x.as_ref().into()),
            Unbounded => Unbounded,
        };
        let hi: Bound<UserKey> = match range.end_bound() {
            Included(x) => Included(x.as_ref().into()),
            Excluded(x) => Excluded(x.as_ref().into()),
            Unbounded => Unbounded,
        };
        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
        let level_manifest_lock =
            guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
        let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
            .expect("lock is poisoned");
        let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
            .expect("lock is poisoned");
        TreeIter::create_range(
            MemtableLockGuard {
                active,
                sealed,
                ephemeral,
            },
            bounds,
            seqno,
            level_manifest_lock,
        )
    }
    #[doc(hidden)]
    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
        &'a self,
        prefix: K,
        seqno: Option<SeqNo>,
        ephemeral: Option<Arc<Memtable>>,
    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
        let range = prefix_to_range(prefix.as_ref());
        self.create_range(&range, seqno, ephemeral)
    }
    #[doc(hidden)]
    #[must_use]
    pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
        memtable_lock.insert(value)
    }
    fn recover(mut config: Config) -> crate::Result<Self> {
        use crate::file::MANIFEST_FILE;
        use inner::get_next_tree_id;
        log::info!("Recovering LSM-tree at {:?}", config.path);
        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
        let mut bytes = Cursor::new(bytes);
        let manifest = Manifest::decode_from(&mut bytes)?;
        if manifest.version != Version::V2 {
            return Err(crate::Error::InvalidVersion(manifest.version));
        }
        config.level_count = manifest.level_count;
        config.table_type = manifest.table_type;
        config.tree_type = manifest.tree_type;
        let tree_id = get_next_tree_id();
        let mut levels = Self::recover_levels(
            &config.path,
            tree_id,
            &config.block_cache,
            &config.descriptor_table,
        )?;
        levels.sort_levels();
        let highest_segment_id = levels
            .iter()
            .map(|x| x.metadata.id)
            .max()
            .unwrap_or_default();
        let inner = TreeInner {
            id: tree_id,
            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
            active_memtable: Arc::default(),
            sealed_memtables: Arc::default(),
            levels: Arc::new(RwLock::new(levels)),
            stop_signal: StopSignal::default(),
            config,
        };
        Ok(Self(Arc::new(inner)))
    }
    fn create_new(config: Config) -> crate::Result<Self> {
        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
        use std::fs::{create_dir_all, File};
        let path = config.path.clone();
        log::trace!("Creating LSM-tree at {path:?}");
        create_dir_all(&path)?;
        let manifest_path = path.join(MANIFEST_FILE);
        assert!(!manifest_path.try_exists()?);
        let segment_folder_path = path.join(SEGMENTS_FOLDER);
        create_dir_all(&segment_folder_path)?;
        let mut file = File::create(manifest_path)?;
        Manifest {
            version: Version::V2,
            level_count: config.level_count,
            tree_type: config.tree_type,
            table_type: TableType::Block,
        }
        .encode_into(&mut file)?;
        file.sync_all()?;
        fsync_directory(&segment_folder_path)?;
        fsync_directory(&path)?;
        let inner = TreeInner::create_new(config)?;
        Ok(Self(Arc::new(inner)))
    }
    fn recover_levels<P: AsRef<Path>>(
        tree_path: P,
        tree_id: TreeId,
        block_cache: &Arc<BlockCache>,
        descriptor_table: &Arc<FileDescriptorTable>,
    ) -> crate::Result<LevelManifest> {
        use crate::{
            file::fsync_directory,
            file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
            SegmentId,
        };
        let tree_path = tree_path.as_ref();
        log::debug!("Recovering disk segments from {tree_path:?}");
        let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
        let segment_ids_to_recover = LevelManifest::recover_ids(&level_manifest_path)?;
        let mut segments = vec![];
        let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
        if !segment_base_folder.try_exists()? {
            std::fs::create_dir_all(&segment_base_folder)?;
            fsync_directory(&segment_base_folder)?;
        }
        for dirent in std::fs::read_dir(&segment_base_folder)? {
            let dirent = dirent?;
            let file_name = dirent.file_name();
            let segment_file_name = file_name.to_str().ok_or_else(|| {
                log::error!("invalid segment file name {file_name:?}");
                crate::Error::Unrecoverable
            })?;
            let segment_file_path = dirent.path();
            assert!(!segment_file_path.is_dir());
            if segment_file_name.starts_with("tmp_") {
                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
                std::fs::remove_file(&segment_file_path)?;
                continue;
            }
            log::debug!("Recovering segment from {segment_file_path:?}");
            let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
                log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
                crate::Error::Unrecoverable
            })?;
            if segment_ids_to_recover.contains(&segment_id) {
                let segment = Segment::recover(
                    &segment_file_path,
                    tree_id,
                    block_cache.clone(),
                    descriptor_table.clone(),
                )?;
                descriptor_table.insert(&segment_file_path, (tree_id, segment.metadata.id).into());
                segments.push(Arc::new(segment));
                log::debug!("Recovered segment from {segment_file_path:?}");
            } else {
                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
                std::fs::remove_file(&segment_file_path)?;
            }
        }
        if segments.len() < segment_ids_to_recover.len() {
            log::error!("Expected segments: {segment_ids_to_recover:?}");
            return Err(crate::Error::Unrecoverable);
        }
        log::debug!("Recovered {} segments", segments.len());
        LevelManifest::recover(&level_manifest_path, segments)
    }
}