pub mod block;
pub mod block_index;
pub mod file_offsets;
pub mod id;
pub mod inner;
pub mod level_reader;
pub mod meta;
pub mod multi_writer;
pub mod range;
pub mod reader;
pub mod trailer;
pub mod value_block;
pub mod value_block_consumer;
pub mod writer;
use crate::{
    block_cache::BlockCache,
    descriptor_table::FileDescriptorTable,
    segment::reader::Reader,
    tree::inner::TreeId,
    value::{InternalValue, SeqNo, UserKey},
};
use inner::Inner;
use range::Range;
use std::{ops::Bound, path::Path, sync::Arc};
#[cfg(feature = "bloom")]
use crate::bloom::{BloomFilter, CompositeHash};
pub type SegmentInner = Inner;
#[doc(alias("sstable", "sst", "sorted string table"))]
#[derive(Clone)]
pub struct Segment(Arc<Inner>);
impl From<Inner> for Segment {
    fn from(value: Inner) -> Self {
        Self(Arc::new(value))
    }
}
impl std::ops::Deref for Segment {
    type Target = Inner;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}
impl std::fmt::Debug for Segment {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Segment:{}({})",
            self.metadata.id, self.metadata.key_range
        )
    }
}
impl Segment {
    pub(crate) fn verify(&self) -> crate::Result<usize> {
        use block::checksum::Checksum;
        use block_index::IndexBlock;
        use value_block::ValueBlock;
        let mut data_block_count = 0;
        let mut broken_count = 0;
        let guard = self
            .descriptor_table
            .access(&(self.tree_id, self.metadata.id).into())?
            .expect("should have gotten file");
        let mut file = guard.file.lock().expect("lock is poisoned");
        #[allow(clippy::explicit_iter_loop)]
        for handle in self.block_index.top_level_index.iter() {
            let block = match IndexBlock::from_file(&mut *file, handle.offset) {
                Ok(v) => v,
                Err(e) => {
                    log::error!(
                        "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
                    );
                    broken_count += 1;
                    continue;
                }
            };
            for handle in &*block.items {
                let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
                    Ok(v) => v,
                    Err(e) => {
                        log::error!(
                            "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
                        );
                        broken_count += 1;
                        data_block_count += 1;
                        continue;
                    }
                };
                let (_, data) = ValueBlock::to_bytes_compressed(
                    &value_block.items,
                    value_block.header.previous_block_offset,
                    value_block.header.compression,
                )?;
                let actual_checksum = Checksum::from_bytes(&data);
                if value_block.header.checksum != actual_checksum {
                    log::error!("{handle:?} is corrupted, invalid checksum value");
                    broken_count += 1;
                }
                data_block_count += 1;
                if data_block_count % 1_000 == 0 {
                    log::debug!("Checked {data_block_count} data blocks");
                }
            }
        }
        assert_eq!(
            data_block_count, self.metadata.data_block_count,
            "not all data blocks were visited"
        );
        Ok(broken_count)
    }
    #[cfg(feature = "bloom")]
    pub(crate) fn load_bloom<P: AsRef<Path>>(
        path: P,
        ptr: value_block::BlockOffset,
    ) -> crate::Result<Option<BloomFilter>> {
        Ok(if *ptr > 0 {
            use crate::coding::Decode;
            use std::{
                fs::File,
                io::{Seek, SeekFrom},
            };
            let mut reader = File::open(path)?;
            reader.seek(SeekFrom::Start(*ptr))?;
            Some(BloomFilter::decode_from(&mut reader)?)
        } else {
            None
        })
    }
    pub(crate) fn recover<P: AsRef<Path>>(
        file_path: P,
        tree_id: TreeId,
        block_cache: Arc<BlockCache>,
        descriptor_table: Arc<FileDescriptorTable>,
    ) -> crate::Result<Self> {
        use block_index::two_level_index::TwoLevelBlockIndex;
        use trailer::SegmentFileTrailer;
        let file_path = file_path.as_ref();
        log::debug!("Recovering segment from file {file_path:?}");
        let trailer = SegmentFileTrailer::from_file(file_path)?;
        assert_eq!(
            0, *trailer.offsets.range_tombstones_ptr,
            "Range tombstones not supported"
        );
        log::debug!(
            "Creating block index, with tli_ptr={}",
            trailer.offsets.tli_ptr
        );
        let block_index = TwoLevelBlockIndex::from_file(
            file_path,
            trailer.offsets.tli_ptr,
            (tree_id, trailer.metadata.id).into(),
            descriptor_table.clone(),
            block_cache.clone(),
        )?;
        #[cfg(feature = "bloom")]
        let bloom_ptr = trailer.offsets.bloom_ptr;
        Ok(Self(Arc::new(Inner {
            tree_id,
            descriptor_table,
            metadata: trailer.metadata,
            offsets: trailer.offsets,
            block_index: Arc::new(block_index),
            block_cache,
            #[cfg(feature = "bloom")]
            bloom_filter: Self::load_bloom(file_path, bloom_ptr)?,
        })))
    }
    #[cfg(feature = "bloom")]
    #[must_use]
    pub fn bloom_filter_size(&self) -> usize {
        self.bloom_filter
            .as_ref()
            .map(super::bloom::BloomFilter::len)
            .unwrap_or_default()
    }
    #[cfg(feature = "bloom")]
    pub fn get_with_hash<K: AsRef<[u8]>>(
        &self,
        key: K,
        seqno: Option<SeqNo>,
        hash: CompositeHash,
    ) -> crate::Result<Option<InternalValue>> {
        if let Some(seqno) = seqno {
            if self.metadata.seqnos.0 >= seqno {
                return Ok(None);
            }
        }
        if !self.metadata.key_range.contains_key(&key) {
            return Ok(None);
        }
        if let Some(bf) = &self.bloom_filter {
            if !bf.contains_hash(hash) {
                return Ok(None);
            }
        }
        self.point_read(key, seqno)
    }
    fn point_read<K: AsRef<[u8]>>(
        &self,
        key: K,
        seqno: Option<SeqNo>,
    ) -> crate::Result<Option<InternalValue>> {
        use crate::{mvcc_stream::MvccStream, ValueType};
        use value_block::{CachePolicy, ValueBlock};
        use value_block_consumer::ValueBlockConsumer;
        let key = key.as_ref();
        let Some(first_block_handle) = self
            .block_index
            .get_lowest_data_block_handle_containing_item(key.as_ref(), CachePolicy::Write)?
        else {
            return Ok(None);
        };
        let Some(block) = ValueBlock::load_by_block_handle(
            &self.descriptor_table,
            &self.block_cache,
            (self.tree_id, self.metadata.id).into(),
            first_block_handle.offset,
            CachePolicy::Write,
        )?
        else {
            return Ok(None);
        };
        if seqno.is_none() {
            let Some(latest) = block.get_latest(key.as_ref()).cloned() else {
                return Ok(None);
            };
            if latest.key.value_type == ValueType::WeakTombstone {
                } else {
                return Ok(Some(latest));
            }
        }
        let mut reader = Reader::new(
            self.offsets.index_block_ptr,
            self.descriptor_table.clone(),
            (self.tree_id, self.metadata.id).into(),
            self.block_cache.clone(),
            first_block_handle.offset,
            None,
        );
        reader.lo_block_size = block.header.data_length.into();
        reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(
            block,
            &Some(key.into()),
            &None,
        ));
        reader.lo_initialized = true;
        let reader = reader.filter(|x| {
            match x {
                Ok(entry) => {
                    if let Some(seqno) = seqno {
                        entry.key.seqno < seqno
                    } else {
                        true
                    }
                }
                Err(_) => true,
            }
        });
        let Some(entry) = MvccStream::new(reader).next().transpose()? else {
            return Ok(None);
        };
        if &*entry.key.user_key > key {
            return Ok(None);
        }
        Ok(Some(entry))
    }
    pub fn is_key_in_key_range<K: AsRef<[u8]>>(&self, key: K) -> bool {
        self.metadata.key_range.contains_key(key)
    }
    #[allow(unused)]
    pub(crate) fn get<K: AsRef<[u8]>>(
        &self,
        key: K,
        seqno: Option<SeqNo>,
    ) -> crate::Result<Option<InternalValue>> {
        if let Some(seqno) = seqno {
            if self.metadata.seqnos.0 >= seqno {
                return Ok(None);
            }
        }
        if !self.is_key_in_key_range(&key) {
            return Ok(None);
        }
        let key = key.as_ref();
        #[cfg(feature = "bloom")]
        if let Some(bf) = &self.bloom_filter {
            debug_assert!(false, "Use Segment::get_with_hash instead");
            if !bf.contains(key) {
                return Ok(None);
            }
        }
        self.point_read(key, seqno)
    }
    #[must_use]
    #[allow(clippy::iter_without_into_iter)]
    #[doc(hidden)]
    pub fn iter(&self) -> Range {
        self.range((std::ops::Bound::Unbounded, std::ops::Bound::Unbounded))
    }
    #[must_use]
    pub(crate) fn range(&self, range: (Bound<UserKey>, Bound<UserKey>)) -> Range {
        Range::new(
            self.offsets.index_block_ptr,
            self.descriptor_table.clone(),
            (self.tree_id, self.metadata.id).into(),
            self.block_cache.clone(),
            self.block_index.clone(),
            range,
        )
    }
    #[must_use]
    pub fn get_highest_seqno(&self) -> SeqNo {
        self.metadata.seqnos.1
    }
    #[must_use]
    #[doc(hidden)]
    pub fn tombstone_count(&self) -> u64 {
        self.metadata.tombstone_count
    }
    pub(crate) fn check_key_range_overlap(
        &self,
        bounds: &(Bound<UserKey>, Bound<UserKey>),
    ) -> bool {
        self.metadata.key_range.overlaps_with_bounds(bounds)
    }
}