Skip to main content

ps_datalake/store/
mod.rs

1mod atomic;
2mod shared;
3
4use std::marker::PhantomData;
5use std::path::Path;
6
7use crate::error::DataLakeError;
8use crate::error::DataStoreCorrupted;
9use crate::error::Result;
10use crate::helpers::sieve;
11use atomic::DataStoreWriteGuard;
12use ps_datachunk::utils::round_up;
13use ps_datachunk::BorrowedDataChunk;
14use ps_datachunk::DataChunk;
15use ps_datachunk::MbufDataChunk;
16use ps_hash::Hash;
17use ps_hkey::Hkey;
18use ps_hkey::LongHkeyExpanded;
19use ps_hkey::Store;
20use ps_hkey::MAX_DECRYPTED_SIZE;
21use ps_hkey::MAX_ENCRYPTED_SIZE;
22use ps_hkey::MAX_SIZE_RAW;
23use ps_mbuf::Mbuf;
24use ps_mbuf::MbufValue;
25use ps_mmap::MemoryMap;
26use ps_str::Utf8Encoder;
27use ps_util::Array;
28use shared::DataStoreReadGuard;
29
30pub const MAGIC: [u8; 16] = *b"DataLake\0\0\0\0\0\0\0\0";
31pub const PTR_SIZE: usize = 4;
32pub const CHUNK_SIZE: usize = std::mem::size_of::<DataStorePage>();
33
34#[repr(C)]
35pub struct DataStoreHeader {
36    /// the literal `b"DataLake"`
37    pub magic: [u8; 16],
38
39    /// the largest prime number < index size
40    pub index_modulo: u32,
41
42    /// the next usable offset
43    pub free_chunk: u32,
44
45    /// byte offset of the index
46    pub index_offset: u64,
47
48    /// byte offset of chunks
49    pub data_offset: u64,
50}
51
52pub type DataStorePageMbuf<'lt> = Mbuf<'lt, Hash, u8>;
53
54#[repr(C, align(256))]
55pub struct DataStorePage<'lt> {
56    mbuf: DataStorePageMbuf<'lt>,
57}
58
59impl MbufValue for DataStorePage<'_> {}
60
61impl<'lt> DataStorePage<'lt> {
62    #[must_use]
63    pub const fn mbuf(&'lt self) -> &'lt DataStorePageMbuf<'lt> {
64        &self.mbuf
65    }
66    #[must_use]
67    pub const fn bytes_to_pages(bytes: usize) -> usize {
68        (bytes + std::mem::size_of::<DataStorePageMbuf>()).div_ceil(CHUNK_SIZE)
69    }
70}
71
72pub type DataStoreIndex<'lt> = Mbuf<'lt, (), u32>;
73pub type DataStorePager<'lt> = Mbuf<'lt, (), DataStorePage<'lt>>;
74
75#[derive(Clone, Debug)]
76pub struct DataStore<'lt> {
77    mmap: MemoryMap,
78    readonly: bool,
79    _data: PhantomData<&'lt [u8]>,
80}
81
82impl<'lt> DataStore<'lt> {
83    pub fn load_mapping<P>(file_path: P, readonly: bool) -> Result<MemoryMap>
84    where
85        P: AsRef<Path>,
86    {
87        Ok(MemoryMap::map(file_path, readonly)?)
88    }
89
90    fn shared(&self) -> DataStoreReadGuard<'lt> {
91        self.into()
92    }
93
94    fn atomic(&self) -> Result<DataStoreWriteGuard> {
95        Ok(self.try_into()?)
96    }
97
98    pub fn load<P>(file_path: P, readonly: bool) -> Result<Self>
99    where
100        P: AsRef<Path>,
101    {
102        let mmap = Self::load_mapping(file_path, readonly)?;
103
104        Self::load_with_mmap(mmap, readonly)
105    }
106
107    fn load_with_mmap(mmap: MemoryMap, readonly: bool) -> Result<Self> {
108        use DataStoreCorrupted::{
109            DataEndsOutOfBounds, DataOffsetOutOfBounds, IndexDataOverlap, IndexEndsOutOfBounds,
110            IndexModuloTooSmall, IndexOffsetOutOfBounds, InvalidDataStoreSize, InvalidMagic,
111        };
112
113        let size = mmap.len();
114
115        if size < std::mem::size_of::<DataStoreHeader>() {
116            Err(InvalidDataStoreSize(size))?;
117        }
118
119        let store = DataStore {
120            mmap,
121            readonly,
122            _data: PhantomData,
123        };
124
125        let shared = store.shared();
126        let header = shared.get_header()?;
127
128        if header.magic != MAGIC {
129            Err(InvalidMagic(header.magic.to_utf8_string()))?;
130        }
131
132        if header.index_offset > size as u64 {
133            Err(IndexOffsetOutOfBounds(header.index_offset, size))?;
134        }
135
136        if header.data_offset > size as u64 {
137            Err(DataOffsetOutOfBounds(header.data_offset, size))?;
138        }
139
140        let index = shared.get_index()?;
141        let pager = shared.get_pager()?;
142
143        let base_ptr = store.mmap.as_ptr();
144
145        let index_end_offset = index.as_ptr_range().end as usize - base_ptr as usize;
146        if index_end_offset > size {
147            Err(IndexEndsOutOfBounds(index_end_offset, size))?;
148        }
149
150        let data_end_offset = pager.as_ptr_range().end as usize - base_ptr as usize;
151        if data_end_offset > size {
152            Err(DataEndsOutOfBounds(data_end_offset, size))?;
153        }
154
155        let data_ptr = std::ptr::addr_of!(*pager.get_metadata());
156        let data_start_offset = data_ptr as usize;
157        if index_end_offset > data_start_offset {
158            Err(IndexDataOverlap(index_end_offset, data_start_offset))?;
159        }
160
161        let index_len_u32 = u32::try_from(index.len())?;
162
163        if header.index_modulo > (index_len_u32) {
164            Err(IndexModuloTooSmall(header.index_modulo, index_len_u32))?;
165        }
166
167        drop(shared);
168
169        Ok(store)
170    }
171
172    /// `store_length_in_bytes` -> (`index_offset`, `index_length`, `data_offset`)
173    /// - this should only be used when initializing a new [`DataStore`]
174    #[must_use]
175    pub const fn derive_index_bounds(total_len: usize) -> (usize, usize, usize) {
176        let offset = std::mem::size_of::<DataStoreHeader>();
177        let ihead = std::mem::size_of::<DataStoreIndex>();
178        let phead = std::mem::size_of::<DataStorePager>();
179        let base_items = 1 + (total_len >> 10);
180        let rup_items = round_up(base_items, 10);
181        let sub_bytes = offset + ihead + phead;
182        let sub_items = sub_bytes / std::mem::size_of::<u32>();
183        let index_length = rup_items - sub_items;
184        let data_offset = offset + ihead + index_length * std::mem::size_of::<u32>();
185
186        (offset, index_length, data_offset)
187    }
188
189    pub fn init<P>(file_path: &P) -> Result<Self>
190    where
191        P: AsRef<Path>,
192    {
193        let readonly = false;
194        let mapping = Self::load_mapping(file_path, readonly)?;
195
196        let total_length = mapping.len();
197        let (index_offset, index_length, data_offset) = Self::derive_index_bounds(total_length);
198        let index_modulo_max = index_length * 99 / 100;
199        let index_modulo = sieve::get_le_prime(u32::try_from(index_modulo_max)?);
200
201        if (data_offset + std::mem::size_of::<DataStorePager>()) > mapping.len() {
202            Err(DataLakeError::InitFailedNotEnoughSpace(mapping.len()))?;
203        }
204
205        let map_ptr = mapping.try_as_mut_ptr()?;
206
207        unsafe { DataStoreIndex::init_at_offset(map_ptr, index_offset, (), index_length).fill(0) };
208
209        unsafe {
210            DataStorePager::init_at_offset(
211                map_ptr,
212                data_offset,
213                (),
214                (total_length - data_offset) / CHUNK_SIZE,
215            );
216        }
217
218        let mut guard = DataStoreWriteGuard::from(mapping.try_write()?);
219        let header = guard.get_header()?;
220
221        header.magic = MAGIC;
222        header.index_modulo = index_modulo;
223        header.free_chunk = 0;
224        header.index_offset = index_offset as u64;
225        header.data_offset = data_offset as u64;
226
227        drop(guard);
228
229        let store = Self::load_with_mmap(mapping, false)?;
230
231        store.put_opaque_chunk(&BorrowedDataChunk::from_data(
232            b"<< DATA SEGMENT BEGINS HERE >>",
233            // Chunk #0 cannot be accessed and is therefore reserved for metadata
234            // about this DataStore, the size of which shall not exceed 192 bytes
235        )?)?;
236
237        Ok(store)
238    }
239
240    pub fn get_chunk_by_index(&self, index: usize) -> Result<MbufDataChunk<'_>> {
241        self.shared()
242            .get_pager()?
243            .get(index)
244            .map_or(Err(DataLakeError::Range), |page| Ok(page.mbuf().into()))
245    }
246
247    #[must_use]
248    pub fn calculate_index_bucket(hash: &Hash, index_modulo: u32) -> u32 {
249        (u32::from_be_bytes(*hash.digest().subarray(0))) % index_modulo
250    }
251
252    pub fn get_bucket_index_chunk_by_hash(
253        &self,
254        hash: &Hash,
255    ) -> Result<(u32, u32, Option<MbufDataChunk<'_>>)> {
256        let shared = self.shared();
257        let header = shared.get_header()?;
258        let index = shared.get_index()?;
259        let bucket = Self::calculate_index_bucket(hash, header.index_modulo);
260
261        drop(shared);
262
263        for bucket in bucket..u32::try_from(index.len())? {
264            let index = index
265                .get(bucket as usize)
266                .ok_or(DataLakeError::IndexBucketOverflow)?;
267
268            if *index == 0 {
269                return Ok((bucket, 0, None));
270            }
271
272            let chunk = self.get_chunk_by_index(*index as usize)?;
273
274            if chunk.hash_ref() == hash {
275                return Ok((bucket, *index, Some(chunk)));
276            }
277        }
278
279        Err(DataLakeError::IndexBucketOverflow)
280    }
281
282    pub fn get_bucket_by_hash(&'lt self, hash: &Hash) -> Result<u32> {
283        let (bucket, _, _) = self.get_bucket_index_chunk_by_hash(hash)?;
284
285        Ok(bucket)
286    }
287
288    pub fn get_index_by_hash(&'lt self, hash: &Hash) -> Result<u32> {
289        let (_, index, chunk) = self.get_bucket_index_chunk_by_hash(hash)?;
290
291        chunk.ok_or(DataLakeError::NotFound)?;
292
293        Ok(index)
294    }
295
296    pub fn get_chunk_by_hash(&'lt self, hash: &Hash) -> Result<MbufDataChunk<'lt>> {
297        let (_, _, chunk) = self.get_bucket_index_chunk_by_hash(hash)?;
298
299        chunk.ok_or(DataLakeError::NotFound)
300    }
301
302    /// Stores opaque data and returns a tuple containing the bucket, index, and [`DataChunk`].
303    ///
304    /// # Arguments
305    ///
306    /// * `opaque_chunk` - A [`DataChunk`] representing the opaque data to be stored.
307    ///
308    /// # Returns
309    ///
310    /// A `Result` containing:
311    /// - `Ok((bucket, index, datachunk))` on success:
312    ///   - `bucket` (u32): The hashmap index of the chunk.
313    ///   - `index` (u32): The chunk's index withing the storage file.
314    ///   - `datachunk` (MbufDataChunk): The chunk of data that was stored.
315    /// - `Err(PsDataLakeError)` on failure:
316    ///   - An error of type `PsDataLakeError` indicating the reason for failure.
317    pub fn put_opaque_chunk<C: DataChunk>(
318        &self,
319        opaque_chunk: &C,
320    ) -> Result<(u32, u32, MbufDataChunk<'_>)> {
321        let existing = self.get_bucket_index_chunk_by_hash(opaque_chunk.hash_ref())?;
322        let (bucket, index, existing) = existing;
323
324        if let Some(chunk) = existing {
325            return Ok((bucket, index, chunk));
326        }
327
328        if self.readonly {
329            Err(DataLakeError::DataStoreNotRw)?;
330        }
331
332        let mut atomic = self.atomic()?;
333
334        let next_free_chunk_u32 = atomic.get_header()?.free_chunk;
335        let next_free_chunk = usize::try_from(next_free_chunk_u32)?;
336
337        let required_chunks = DataStorePage::bytes_to_pages(opaque_chunk.data_ref().len());
338        let available_chunks = atomic
339            .get_pager()?
340            .len()
341            .checked_sub(next_free_chunk)
342            .ok_or(DataLakeError::DataStoreOutOfSpace)?;
343
344        if available_chunks < required_chunks {
345            Err(DataLakeError::DataStoreOutOfSpace)?;
346        }
347
348        let pointer = std::ptr::from_ref(
349            atomic
350                .get_pager()?
351                .get(next_free_chunk)
352                .ok_or(DataLakeError::Range)?
353                .mbuf(),
354        ) as *mut u8;
355
356        unsafe {
357            DataStorePageMbuf::write_to_ptr(pointer, opaque_chunk.hash(), opaque_chunk.data_ref())
358        };
359
360        atomic.get_header()?.free_chunk += u32::try_from(required_chunks)?;
361
362        atomic.get_index()?[bucket as usize] = next_free_chunk_u32;
363
364        drop(atomic);
365
366        Ok((
367            bucket,
368            next_free_chunk_u32,
369            self.get_chunk_by_index(next_free_chunk)?,
370        ))
371    }
372
373    pub fn put_encrypted_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
374        let length = chunk.data_ref().len();
375
376        if length <= MAX_SIZE_RAW || length > MAX_ENCRYPTED_SIZE {
377            return self.put_chunk(chunk);
378        }
379
380        let encrypted = chunk.encrypt()?;
381
382        if encrypted.data_ref().len() > length {
383            Ok(self.put_opaque_chunk(chunk)?.2.hash().into())
384        } else {
385            let chunk = self.put_opaque_chunk(&encrypted)?.2;
386
387            if chunk.hash_ref() != encrypted.hash_ref() {
388                Err(DataLakeError::StorageFailure)?;
389            }
390
391            Ok((encrypted.hash(), encrypted.key()).into())
392        }
393    }
394
395    pub fn put_large_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
396        self.put_large_blob(chunk.data_ref())
397    }
398
399    pub fn put_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
400        if chunk.data_ref().len() <= MAX_SIZE_RAW {
401            return Ok(Hkey::from_raw(chunk.data_ref())?);
402        }
403
404        if chunk.data_ref().len() > MAX_DECRYPTED_SIZE {
405            return self.put_large_chunk(chunk);
406        }
407
408        let encrypted = chunk.encrypt()?;
409
410        let stored_chunk = self.put_opaque_chunk(&encrypted)?.2;
411
412        if stored_chunk.hash_ref() != encrypted.hash_ref() {
413            Err(DataLakeError::StorageFailure)?;
414        }
415
416        Ok(Hkey::Encrypted(encrypted.hash(), encrypted.key()))
417    }
418
419    pub fn put_large_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
420        // sanity check
421        if blob.len() <= MAX_DECRYPTED_SIZE {
422            return self.put_blob(blob);
423        }
424
425        LongHkeyExpanded::from_blob(self, blob)?.shrink(self)
426    }
427
428    pub fn put_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
429        if blob.len() <= MAX_SIZE_RAW {
430            Ok(Hkey::from_raw(blob)?)
431        } else if blob.len() > MAX_DECRYPTED_SIZE {
432            self.put_large_blob(blob)
433        } else {
434            self.put_chunk(&BorrowedDataChunk::from_data(blob)?)
435        }
436    }
437}
438
439impl<'lt> Store for DataStore<'lt> {
440    type Chunk<'c>
441        = MbufDataChunk<'c>
442    where
443        'lt: 'c;
444    type Error = DataLakeError;
445
446    fn get<'a>(&'a self, hash: &Hash) -> std::result::Result<Self::Chunk<'a>, Self::Error> {
447        self.get_chunk_by_hash(hash)
448    }
449
450    fn put(&self, data: &[u8]) -> std::result::Result<Hkey, Self::Error> {
451        self.put_blob(data)
452    }
453
454    fn put_encrypted<C: DataChunk>(&self, chunk: C) -> std::result::Result<(), Self::Error> {
455        self.put_encrypted_chunk(&chunk)?;
456        Ok(())
457    }
458}