Skip to main content

ps_datalake/lake/
mod.rs

1pub mod config;
2pub mod util;
3use crate::error::DataLakeError;
4use crate::error::Result;
5use crate::store::DataStore;
6use config::DataLakeConfig;
7use ps_datachunk::DataChunk;
8use ps_datachunk::MbufDataChunk;
9use ps_hash::Hash;
10use ps_hkey::Hkey;
11use ps_hkey::Store;
12use util::verify_magic;
13
14#[derive(Clone, Debug, Default)]
15pub struct DataLakeStores<'lt> {
16    pub readable: Vec<DataStore<'lt>>,
17    pub writable: Vec<DataStore<'lt>>,
18}
19
20#[derive(Clone, Debug, Default)]
21pub struct DataLake<'lt> {
22    pub config: DataLakeConfig,
23    pub stores: DataLakeStores<'lt>,
24}
25
26impl<'lt> DataLake<'lt> {
27    pub fn init(config: DataLakeConfig) -> Result<Self> {
28        let mut stores = DataLakeStores::default();
29
30        for entry in &config.stores {
31            if entry.readonly {
32                let store = DataStore::load(&entry.filename, true)?;
33
34                stores.readable.push(store);
35
36                continue;
37            }
38
39            let store = if verify_magic(&entry.filename)? {
40                DataStore::load(&entry.filename, false)
41            } else {
42                DataStore::init(&entry.filename)
43            }?;
44
45            stores.readable.push(store.clone());
46            stores.writable.push(store);
47        }
48
49        let lake = Self { config, stores };
50
51        Ok(lake)
52    }
53
54    pub fn get_encrypted_chunk(&'lt self, hash: &Hash) -> Result<MbufDataChunk<'lt>> {
55        let mut error = DataLakeError::NotFound;
56
57        for store in &self.stores.readable {
58            match store.get_chunk_by_hash(hash) {
59                Ok(chunk) => return Ok(chunk),
60                Err(err) => match err {
61                    DataLakeError::NotFound => (),
62                    _ => error = err,
63                },
64            }
65        }
66
67        Err(error)
68    }
69
70    pub fn put_encrypted_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
71        use DataLakeError::{DataStoreNotRw, DataStoreOutOfSpace};
72
73        for store in &self.stores.writable {
74            match store.put_encrypted_chunk(chunk) {
75                Ok(chunk) => return Ok(chunk),
76                Err(err) => match err {
77                    DataStoreOutOfSpace | DataStoreNotRw => (),
78                    err => Err(err)?,
79                },
80            }
81        }
82
83        Err(DataLakeError::DataLakeOutOfStores)
84    }
85
86    pub fn put_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
87        for store in &self.stores.writable {
88            match store.put_chunk(chunk) {
89                Ok(chunk) => return Ok(chunk),
90                Err(err) => match err {
91                    DataLakeError::DataStoreOutOfSpace | DataLakeError::DataStoreNotRw => (),
92                    _ => Err(err)?,
93                },
94            }
95        }
96
97        Err(DataLakeError::DataLakeOutOfStores)
98    }
99
100    pub fn put_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
101        for store in &self.stores.writable {
102            match store.put_blob(blob) {
103                Ok(chunk) => return Ok(chunk),
104                Err(err) => match err {
105                    DataLakeError::DataStoreOutOfSpace | DataLakeError::DataStoreNotRw => (),
106                    _ => Err(err)?,
107                },
108            }
109        }
110
111        Err(DataLakeError::DataLakeOutOfStores)
112    }
113}
114
115impl<'lt> Store for DataLake<'lt> {
116    type Chunk<'c>
117        = MbufDataChunk<'c>
118    where
119        'lt: 'c;
120    type Error = DataLakeError;
121
122    fn get<'a>(&'a self, hash: &Hash) -> std::result::Result<Self::Chunk<'a>, Self::Error> {
123        self.get_encrypted_chunk(hash)
124    }
125
126    fn put(&self, data: &[u8]) -> std::result::Result<Hkey, Self::Error> {
127        self.put_blob(data)
128    }
129
130    fn put_encrypted<C: DataChunk>(&self, chunk: C) -> std::result::Result<(), Self::Error> {
131        self.put_encrypted_chunk(&chunk)?;
132        Ok(())
133    }
134}