slatedb 0.10.0

A cloud native embedded storage engine built on object storage.
Documentation
use std::collections::HashMap;

use crate::db_state::SsTableId;
use crate::db_state::SsTableId::{Compacted, Wal};
use crate::error::SlateDBError;
use object_store::path::Path;
use ulid::Ulid;

const WAL_PATH: &str = "wal";
const COMPACTED_PATH: &str = "compacted";

#[derive(Clone, Debug)]
pub(crate) struct PathResolver {
    root_path: Path,
    external_ssts: HashMap<SsTableId, Path>,
}

impl PathResolver {
    pub(crate) fn new<P: Into<Path>>(root_path: P) -> Self {
        Self {
            root_path: root_path.into(),
            external_ssts: HashMap::new(),
        }
    }

    pub(crate) fn new_with_external_ssts<P: Into<Path>>(
        root_path: P,
        external_ssts: HashMap<SsTableId, P>,
    ) -> Self {
        Self {
            root_path: root_path.into(),
            external_ssts: external_ssts
                .into_iter()
                .map(|(k, v)| (k, v.into()))
                .collect(),
        }
    }

    pub(crate) fn wal_path(&self) -> Path {
        Path::from(format!("{}/{}/", &self.root_path, WAL_PATH))
    }

    pub(crate) fn compacted_path(&self) -> Path {
        Path::from(format!("{}/{}/", &self.root_path, COMPACTED_PATH))
    }

    pub(crate) fn parse_table_id(&self, path: &Path) -> Result<Option<SsTableId>, SlateDBError> {
        if let Some(mut suffix_iter) = path.prefix_match(&self.root_path) {
            match suffix_iter.next() {
                Some(a) if a.as_ref() == WAL_PATH => suffix_iter
                    .next()
                    .and_then(|s| s.as_ref().split('.').next().map(|s| s.parse::<u64>()))
                    .transpose()
                    .map(|r| r.map(SsTableId::Wal))
                    .map_err(|_| SlateDBError::InvalidDBState),
                Some(a) if a.as_ref() == COMPACTED_PATH => suffix_iter
                    .next()
                    .and_then(|s| s.as_ref().split('.').next().map(Ulid::from_string))
                    .transpose()
                    .map(|r| r.map(SsTableId::Compacted))
                    .map_err(|_| SlateDBError::InvalidDBState),
                _ => Ok(None),
            }
        } else {
            Ok(None)
        }
    }

    pub(crate) fn table_path(&self, table_id: &SsTableId) -> Path {
        let root_path = match self.external_ssts.get(table_id) {
            Some(external_path) => external_path,
            None => &self.root_path,
        };
        match table_id {
            Wal(id) => Path::from(format!("{}/{}/{:020}.sst", root_path, WAL_PATH, id)),
            Compacted(ulid) => Path::from(format!(
                "{}/{}/{}.sst",
                root_path,
                COMPACTED_PATH,
                ulid.to_string()
            )),
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::db_state::SsTableId;
    use crate::paths::PathResolver;
    use object_store::path::Path;
    use proptest::arbitrary::any;
    use proptest::proptest;
    use ulid::Ulid;

    const ROOT: &str = "/root";

    proptest! {
        #[test]
        fn should_serialize_and_deserialize_wal_paths(
            wal_id in any::<u64>(),
        ) {
            let path_resolver = PathResolver::new(Path::from(ROOT));
            let table_id = SsTableId::Wal(wal_id);
            let path = path_resolver.table_path(&table_id);
            let parsed_table_id = path_resolver.parse_table_id(&path).unwrap();
            assert_eq!(Some(table_id), parsed_table_id);
        }

        #[test]
        fn should_serialize_and_deserialize_compacted_paths(
            compacted_id in any::<u128>(),
        ) {
            let path_resolver = PathResolver::new(Path::from(ROOT));
            let table_id = SsTableId::Compacted(Ulid::from(compacted_id));
            let path = path_resolver.table_path(&table_id);
            let parsed_table_id = path_resolver.parse_table_id(&path).unwrap();
            assert_eq!(Some(table_id), parsed_table_id);
        }
    }

    #[test]
    fn test_parse_id() {
        let path_resolver = PathResolver::new(Path::from(ROOT));
        let path = Path::from("/root/wal/00000000000000000003.sst");
        let id = path_resolver.parse_table_id(&path).unwrap();
        assert_eq!(id, Some(SsTableId::Wal(3)));

        let path = Path::from("/root/compacted/01J79C21YKR31J2BS1EFXJZ7MR.sst");
        let id = path_resolver.parse_table_id(&path).unwrap();
        assert_eq!(
            id,
            Some(SsTableId::Compacted(
                Ulid::from_string("01J79C21YKR31J2BS1EFXJZ7MR").unwrap()
            ))
        );

        let path = Path::from("/root/invalid/00000000000000000001.sst");
        let id = path_resolver.parse_table_id(&path).unwrap();
        assert_eq!(id, None);
    }
}