use crate::key::InternalKey;
use crate::segment::block::ItemSize;
use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
use crossbeam_skiplist::SkipMap;
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicU32, AtomicU64};
#[derive(Default)]
pub struct Memtable {
    #[doc(hidden)]
    pub items: SkipMap<InternalKey, UserValue>,
    pub(crate) approximate_size: AtomicU32,
    pub(crate) highest_seqno: AtomicU64,
}
impl Memtable {
    pub fn clear(&mut self) {
        self.items.clear();
        self.highest_seqno = AtomicU64::new(0);
        self.approximate_size
            .store(0, std::sync::atomic::Ordering::Release);
    }
    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
        self.items.iter().map(|entry| InternalValue {
            key: entry.key().clone(),
            value: entry.value().clone(),
        })
    }
    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
        &'a self,
        range: R,
    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
        self.items.range(range).map(|entry| InternalValue {
            key: entry.key().clone(),
            value: entry.value().clone(),
        })
    }
    #[doc(hidden)]
    pub fn get<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> Option<InternalValue> {
        if seqno == Some(0) {
            return None;
        }
        let key = key.as_ref();
        let lower_bound = InternalKey::new(
            key,
            match seqno {
                Some(seqno) => seqno - 1,
                None => SeqNo::MAX,
            },
            ValueType::Value,
        );
        let mut iter = self
            .items
            .range(lower_bound..)
            .take_while(|entry| &*entry.key().user_key == key);
        iter.next().map(|entry| InternalValue {
            key: entry.key().clone(),
            value: entry.value().clone(),
        })
    }
    pub fn size(&self) -> u32 {
        self.approximate_size
            .load(std::sync::atomic::Ordering::Acquire)
    }
    pub fn len(&self) -> usize {
        self.items.len()
    }
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.items.is_empty()
    }
    #[doc(hidden)]
    pub fn insert(&self, item: InternalValue) -> (u32, u32) {
        #[allow(clippy::cast_possible_truncation)]
        let item_size = item.size() as u32;
        let size_before = self
            .approximate_size
            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
        self.items.insert(key, item.value);
        self.highest_seqno
            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
        (item_size, size_before + item_size)
    }
    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
        if self.is_empty() {
            None
        } else {
            Some(
                self.highest_seqno
                    .load(std::sync::atomic::Ordering::Acquire),
            )
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::value::ValueType;
    use test_log::test;
    #[test]
    #[allow(clippy::unwrap_used)]
    fn memtable_mvcc_point_read() {
        let memtable = Memtable::default();
        memtable.insert(InternalValue::from_components(
            *b"hello-key-999991",
            *b"hello-value-999991",
            0,
            ValueType::Value,
        ));
        let item = memtable.get("hello-key-99999", None);
        assert_eq!(None, item);
        let item = memtable.get("hello-key-999991", None);
        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
        memtable.insert(InternalValue::from_components(
            *b"hello-key-999991",
            *b"hello-value-999991-2",
            1,
            ValueType::Value,
        ));
        let item = memtable.get("hello-key-99999", None);
        assert_eq!(None, item);
        let item = memtable.get("hello-key-999991", None);
        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
        let item = memtable.get("hello-key-99999", Some(1));
        assert_eq!(None, item);
        let item = memtable.get("hello-key-999991", Some(1));
        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
        let item = memtable.get("hello-key-99999", Some(2));
        assert_eq!(None, item);
        let item = memtable.get("hello-key-999991", Some(2));
        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
    }
    #[test]
    fn memtable_get() {
        let memtable = Memtable::default();
        let value =
            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
        memtable.insert(value.clone());
        assert_eq!(Some(value), memtable.get("abc", None));
    }
    #[test]
    fn memtable_get_highest_seqno() {
        let memtable = Memtable::default();
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            0,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            1,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            2,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            3,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            4,
            ValueType::Value,
        ));
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc".to_vec(),
                b"abc".to_vec(),
                4,
                ValueType::Value,
            )),
            memtable.get("abc", None)
        );
    }
    #[test]
    fn memtable_get_prefix() {
        let memtable = Memtable::default();
        memtable.insert(InternalValue::from_components(
            b"abc0".to_vec(),
            b"abc".to_vec(),
            0,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            255,
            ValueType::Value,
        ));
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc".to_vec(),
                b"abc".to_vec(),
                255,
                ValueType::Value,
            )),
            memtable.get("abc", None)
        );
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc0".to_vec(),
                b"abc".to_vec(),
                0,
                ValueType::Value,
            )),
            memtable.get("abc0", None)
        );
    }
    #[test]
    fn memtable_get_old_version() {
        let memtable = Memtable::default();
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            0,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            99,
            ValueType::Value,
        ));
        memtable.insert(InternalValue::from_components(
            b"abc".to_vec(),
            b"abc".to_vec(),
            255,
            ValueType::Value,
        ));
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc".to_vec(),
                b"abc".to_vec(),
                255,
                ValueType::Value,
            )),
            memtable.get("abc", None)
        );
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc".to_vec(),
                b"abc".to_vec(),
                99,
                ValueType::Value,
            )),
            memtable.get("abc", Some(100))
        );
        assert_eq!(
            Some(InternalValue::from_components(
                b"abc".to_vec(),
                b"abc".to_vec(),
                0,
                ValueType::Value,
            )),
            memtable.get("abc", Some(50))
        );
    }
}