iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use std::collections::BTreeMap;
use std::path::Path;

use crate::core::reactor::Reactor;
use crate::features::storage::sstable::{self, Entry, SstableError};

#[derive(Debug)]
pub enum MemTableError {
    Sstable(SstableError),
}

pub type Result<T> = std::result::Result<T, MemTableError>;

impl From<SstableError> for MemTableError {
    fn from(err: SstableError) -> Self {
        MemTableError::Sstable(err)
    }
}

pub struct MemTable {
    entries: BTreeMap<u64, Vec<Entry>>,
}

impl Default for MemTable {
    fn default() -> Self {
        Self::new()
    }
}

impl MemTable {
    pub fn new() -> Self {
        Self {
            entries: BTreeMap::new(),
        }
    }

    pub fn put(&mut self, entry: Entry) {
        self.entries.entry(entry.key).or_default().push(entry);
    }

    pub fn entries_for_key(&self, key: u64) -> Vec<Entry> {
        self.entries.get(&key).cloned().unwrap_or_default()
    }

    pub fn flush_to_sstable(&self, path: &Path) -> Result<sstable::Sstable> {
        self.flush_to_sstable_with_reactor(path, &crate::core::reactor::SystemReactor)
    }

    pub fn flush_to_sstable_with_reactor(
        &self,
        path: &Path,
        reactor: &dyn Reactor,
    ) -> Result<sstable::Sstable> {
        let mut flattened = Vec::new();
        for bucket in self.entries.values() {
            for entry in bucket {
                flattened.push(entry.clone());
            }
        }
        let table = sstable::write_sstable_with_reactor(path, &flattened, reactor)?;
        Ok(table)
    }

    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }

    pub fn clear(&mut self) {
        self.entries.clear();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::features::storage::sstable::EntryKind;
    use std::fs;

    fn temp_path(name: &str) -> std::path::PathBuf {
        let mut dir = std::env::temp_dir();
        let stamp = format!(
            "{}_{}_{}",
            name,
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        dir.push(format!("{}.sst", stamp));
        dir
    }

    #[test]
    fn memtable_flush_round_trip() {
        let path = temp_path("memtable_flush");
        let mut memtable = MemTable::new();

        memtable.put(Entry {
            key: 1,
            version: 1,
            kind: EntryKind::EdgeDelta,
            value: b"one".to_vec(),
        });
        memtable.put(Entry {
            key: 2,
            version: 1,
            kind: EntryKind::VectorDelta,
            value: b"two".to_vec(),
        });
        memtable.put(Entry {
            key: 1,
            version: 2,
            kind: EntryKind::FullNode,
            value: b"one-v2".to_vec(),
        });

        let table = memtable.flush_to_sstable(&path).unwrap();
        let loaded = sstable::read_sstable(&path).unwrap();

        assert_eq!(table.entries, loaded.entries);
        assert_eq!(table.entries.len(), 3);

        fs::remove_file(path).ok();
    }
}