1pub mod config;
2pub mod util;
3use crate::error::DataLakeError;
4use crate::error::Result;
5use crate::store::DataStore;
6use config::DataLakeConfig;
7use ps_datachunk::DataChunk;
8use ps_datachunk::MbufDataChunk;
9use ps_hash::Hash;
10use ps_hkey::Hkey;
11use ps_hkey::Store;
12use util::verify_magic;
13
14#[derive(Clone, Debug, Default)]
15pub struct DataLakeStores<'lt> {
16 pub readable: Vec<DataStore<'lt>>,
17 pub writable: Vec<DataStore<'lt>>,
18}
19
20#[derive(Clone, Debug, Default)]
21pub struct DataLake<'lt> {
22 pub config: DataLakeConfig,
23 pub stores: DataLakeStores<'lt>,
24}
25
26impl<'lt> DataLake<'lt> {
27 pub fn init(config: DataLakeConfig) -> Result<Self> {
28 let mut stores = DataLakeStores::default();
29
30 for entry in &config.stores {
31 if entry.readonly {
32 let store = DataStore::load(&entry.filename, true)?;
33
34 stores.readable.push(store);
35
36 continue;
37 }
38
39 let store = if verify_magic(&entry.filename)? {
40 DataStore::load(&entry.filename, false)
41 } else {
42 DataStore::init(&entry.filename)
43 }?;
44
45 stores.readable.push(store.clone());
46 stores.writable.push(store);
47 }
48
49 let lake = Self { config, stores };
50
51 Ok(lake)
52 }
53
54 pub fn get_encrypted_chunk(&'lt self, hash: &Hash) -> Result<MbufDataChunk<'lt>> {
55 let mut error = DataLakeError::NotFound;
56
57 for store in &self.stores.readable {
58 match store.get_chunk_by_hash(hash) {
59 Ok(chunk) => return Ok(chunk),
60 Err(err) => match err {
61 DataLakeError::NotFound => (),
62 _ => error = err,
63 },
64 }
65 }
66
67 Err(error)
68 }
69
70 pub fn put_encrypted_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
71 use DataLakeError::{DataStoreNotRw, DataStoreOutOfSpace};
72
73 for store in &self.stores.writable {
74 match store.put_encrypted_chunk(chunk) {
75 Ok(chunk) => return Ok(chunk),
76 Err(err) => match err {
77 DataStoreOutOfSpace | DataStoreNotRw => (),
78 err => Err(err)?,
79 },
80 }
81 }
82
83 Err(DataLakeError::DataLakeOutOfStores)
84 }
85
86 pub fn put_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
87 for store in &self.stores.writable {
88 match store.put_chunk(chunk) {
89 Ok(chunk) => return Ok(chunk),
90 Err(err) => match err {
91 DataLakeError::DataStoreOutOfSpace | DataLakeError::DataStoreNotRw => (),
92 _ => Err(err)?,
93 },
94 }
95 }
96
97 Err(DataLakeError::DataLakeOutOfStores)
98 }
99
100 pub fn put_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
101 for store in &self.stores.writable {
102 match store.put_blob(blob) {
103 Ok(chunk) => return Ok(chunk),
104 Err(err) => match err {
105 DataLakeError::DataStoreOutOfSpace | DataLakeError::DataStoreNotRw => (),
106 _ => Err(err)?,
107 },
108 }
109 }
110
111 Err(DataLakeError::DataLakeOutOfStores)
112 }
113}
114
115impl<'lt> Store for DataLake<'lt> {
116 type Chunk<'c>
117 = MbufDataChunk<'c>
118 where
119 'lt: 'c;
120 type Error = DataLakeError;
121
122 fn get<'a>(&'a self, hash: &Hash) -> std::result::Result<Self::Chunk<'a>, Self::Error> {
123 self.get_encrypted_chunk(hash)
124 }
125
126 fn put(&self, data: &[u8]) -> std::result::Result<Hkey, Self::Error> {
127 self.put_blob(data)
128 }
129
130 fn put_encrypted<C: DataChunk>(&self, chunk: C) -> std::result::Result<(), Self::Error> {
131 self.put_encrypted_chunk(&chunk)?;
132 Ok(())
133 }
134}