ps-datalake 0.1.0-2

Encrypted flat storage
Documentation
pub mod config;
pub mod util;
use crate::error::PsDataLakeError;
use crate::error::Result;
use crate::store::DataStore;
use config::DataLakeConfig;
use ps_datachunk::DataChunk;
use ps_datachunk::MbufDataChunk;
use ps_hash::Hash;
use ps_hkey::Hkey;
use ps_hkey::Resolved;
use util::verify_magic;

#[derive(Clone, Default)]
pub struct DataLakeStores<'lt> {
    pub readable: Vec<DataStore<'lt>>,
    pub writable: Vec<DataStore<'lt>>,
}

pub struct DataLake<'lt> {
    pub config: DataLakeConfig,
    pub stores: DataLakeStores<'lt>,
}

impl<'lt> DataLake<'lt> {
    pub fn init(config: DataLakeConfig) -> Result<Self> {
        let mut stores = DataLakeStores::default();

        for entry in &config.store {
            if entry.readonly {
                let store = DataStore::load(&entry.filename, true)?;

                stores.readable.push(store);

                continue;
            }

            let store = if verify_magic(&entry.filename)? {
                DataStore::load(&entry.filename, false)
            } else {
                DataStore::init(&entry.filename)
            }?;

            stores.readable.push(store.clone());
            stores.writable.push(store);
        }

        let lake = Self { config, stores };

        Ok(lake)
    }

    pub fn get_encrypted_chunk(&'lt self, hash: &Hash) -> Result<MbufDataChunk> {
        let mut error = PsDataLakeError::NotFound;

        for store in &self.stores.readable {
            match store.get_chunk_by_hash(hash) {
                Ok(chunk) => return Ok(chunk),
                Err(err) => match err {
                    PsDataLakeError::NotFound => (),
                    _ => error = err,
                },
            }
        }

        Err(error)
    }

    pub fn get_chunk_by_hkey(&'lt self, hkey: &Hkey) -> Result<Resolved<MbufDataChunk>> {
        hkey.resolve(&|hash| match self.get_encrypted_chunk(hash) {
            Ok(chunk) => Ok(chunk),
            Err(err) => Err(err),
        })
    }

    pub fn put_encrypted_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
        for store in &self.stores.writable {
            match store.put_encrypted_chunk(chunk) {
                Ok(chunk) => return Ok(chunk),
                Err(err) => match err {
                    PsDataLakeError::DataStoreOutOfSpace => (),
                    PsDataLakeError::DataStoreNotRw => (),
                    err => Err(err)?,
                },
            }
        }

        Err(PsDataLakeError::DataLakeOutOfStores)
    }

    pub fn put_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
        for store in &self.stores.writable {
            match store.put_chunk(chunk) {
                Ok(chunk) => return Ok(chunk),
                Err(err) => match err {
                    PsDataLakeError::DataStoreOutOfSpace => (),
                    PsDataLakeError::DataStoreNotRw => (),
                    _ => Err(err)?,
                },
            }
        }

        Err(PsDataLakeError::DataLakeOutOfStores)
    }

    pub fn put_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
        for store in &self.stores.writable {
            match store.put_blob(blob) {
                Ok(chunk) => return Ok(chunk),
                Err(err) => match err {
                    PsDataLakeError::DataStoreOutOfSpace => (),
                    PsDataLakeError::DataStoreNotRw => (),
                    _ => Err(err)?,
                },
            }
        }

        Err(PsDataLakeError::DataLakeOutOfStores)
    }
}