ps_datalake/store/
mod.rs

1mod atomic;
2mod shared;
3
4use std::marker::PhantomData;
5use std::path::Path;
6
7use crate::error::DataStoreCorrupted;
8use crate::error::PsDataLakeError;
9use crate::error::Result;
10use crate::helpers::sieve;
11use atomic::DataStoreWriteGuard;
12use ps_base64::base64;
13use ps_datachunk::utils::round_up;
14use ps_datachunk::BorrowedDataChunk;
15use ps_datachunk::DataChunk;
16use ps_datachunk::MbufDataChunk;
17use ps_datachunk::OwnedDataChunk;
18use ps_hash::Hash;
19use ps_hkey::Hkey;
20use ps_hkey::LongHkeyExpanded;
21use ps_hkey::Resolved;
22use ps_hkey::Store;
23use ps_hkey::MAX_DECRYPTED_SIZE;
24use ps_hkey::MAX_ENCRYPTED_SIZE;
25use ps_hkey::MAX_SIZE_RAW;
26use ps_mbuf::Mbuf;
27use ps_mmap::MemoryMap;
28use ps_str::Utf8Encoder;
29use rayon::iter::IntoParallelRefIterator;
30use rayon::iter::ParallelIterator;
31use shared::DataStoreReadGuard;
32
33pub const MAGIC: [u8; 16] = *b"DataLake\0\0\0\0\0\0\0\0";
34pub const PTR_SIZE: usize = 4;
35pub const CHUNK_SIZE: usize = std::mem::size_of::<DataStorePage>();
36
37#[repr(C)]
38pub struct DataStoreHeader {
39    /// the literal `b"DataLake"`
40    pub magic: [u8; 16],
41
42    /// the largest prime number < index size
43    pub index_modulo: u32,
44
45    /// the next usable offset
46    pub free_chunk: u32,
47
48    /// byte offset of the index
49    pub index_offset: u64,
50
51    /// byte offset of chunks
52    pub data_offset: u64,
53}
54
55pub type DataStorePageMbuf<'lt> = Mbuf<'lt, Hash, u8>;
56
57#[repr(C, align(256))]
58pub struct DataStorePage<'lt> {
59    mbuf: DataStorePageMbuf<'lt>,
60}
61
62impl<'lt> DataStorePage<'lt> {
63    #[must_use]
64    pub const fn mbuf(&'lt self) -> &'lt DataStorePageMbuf<'lt> {
65        &self.mbuf
66    }
67    #[must_use]
68    pub const fn bytes_to_pages(bytes: usize) -> usize {
69        (bytes + std::mem::size_of::<DataStorePageMbuf>()).div_ceil(CHUNK_SIZE)
70    }
71}
72
73pub type DataStoreIndex<'lt> = Mbuf<'lt, (), u32>;
74pub type DataStorePager<'lt> = Mbuf<'lt, (), DataStorePage<'lt>>;
75
76#[derive(Clone, Debug)]
77pub struct DataStore<'lt> {
78    mmap: MemoryMap,
79    readonly: bool,
80    _data: PhantomData<&'lt [u8]>,
81}
82
83impl<'lt> DataStore<'lt> {
84    pub fn load_mapping<P>(file_path: P, readonly: bool) -> Result<MemoryMap>
85    where
86        P: AsRef<Path>,
87    {
88        Ok(MemoryMap::map(file_path, readonly)?)
89    }
90
91    fn shared(&self) -> DataStoreReadGuard<'lt> {
92        self.into()
93    }
94
95    fn atomic(&self) -> Result<DataStoreWriteGuard> {
96        Ok(self.try_into()?)
97    }
98
99    #[must_use]
100    pub unsafe fn get_header(mapping: &MemoryMap) -> &'lt mut DataStoreHeader {
101        &mut *(mapping.as_ptr() as *mut DataStoreHeader)
102    }
103
104    #[must_use]
105    pub unsafe fn get_index(mapping: &MemoryMap) -> &'lt mut DataStoreIndex<'lt> {
106        DataStoreIndex::at_offset_mut(
107            mapping.as_ptr().cast_mut(),
108            Self::get_header(mapping).index_offset as usize,
109        )
110    }
111
112    #[must_use]
113    pub unsafe fn get_pager(mapping: &MemoryMap) -> &'lt mut DataStorePager<'lt> {
114        DataStorePager::at_offset_mut(
115            mapping.as_ptr().cast_mut(),
116            Self::get_header(mapping).data_offset as usize,
117        )
118    }
119
120    pub fn load<P>(file_path: P, readonly: bool) -> Result<Self>
121    where
122        P: AsRef<Path>,
123    {
124        use DataStoreCorrupted::{
125            DataEndsOutOfBounds, DataOffsetOutOfBounds, IndexDataOverlap, IndexEndsOutOfBounds,
126            IndexModuloTooSmall, IndexOffsetOutOfBounds, InvalidDataStoreSize, InvalidMagic,
127        };
128
129        let mmap = Self::load_mapping(file_path, readonly)?;
130        let size = mmap.len();
131
132        if size < std::mem::size_of::<DataStoreHeader>() {
133            Err(InvalidDataStoreSize(size))?;
134        }
135
136        let store = DataStore {
137            mmap,
138            readonly,
139            _data: PhantomData,
140        };
141
142        let shared = store.shared();
143        let header = shared.get_header();
144
145        if header.magic != MAGIC {
146            Err(InvalidMagic(header.magic.to_utf8_string()))?;
147        }
148
149        if header.index_offset > size as u64 {
150            Err(IndexOffsetOutOfBounds(header.index_offset, size))?;
151        }
152
153        if header.data_offset > size as u64 {
154            Err(DataOffsetOutOfBounds(header.data_offset, size))?;
155        }
156
157        let index = shared.get_index();
158        let pager = shared.get_pager();
159
160        let base_ptr = store.mmap.as_ptr();
161
162        let index_end_offset = index.as_ptr_range().end as usize - base_ptr as usize;
163        if index_end_offset > size {
164            Err(IndexEndsOutOfBounds(index_end_offset, size))?;
165        }
166
167        let data_end_offset = pager.as_ptr_range().end as usize - base_ptr as usize;
168        if data_end_offset > size {
169            Err(DataEndsOutOfBounds(data_end_offset, size))?;
170        }
171
172        let data_ptr = std::ptr::addr_of!(*pager.get_metadata());
173        let data_start_offset = data_ptr as usize;
174        if index_end_offset > data_start_offset {
175            Err(IndexDataOverlap(index_end_offset, data_start_offset))?;
176        }
177
178        if header.index_modulo > (index.len() as u32) {
179            Err(IndexModuloTooSmall(header.index_modulo, index.len() as u32))?;
180        }
181
182        Ok(store)
183    }
184
185    /// `store_length_in_bytes` -> (`index_offset`, `index_length`, `data_offset`)
186    /// - this should only be used when initializing a new [`DataStore`]
187    #[must_use]
188    pub const fn derive_index_bounds(total_len: usize) -> (usize, usize, usize) {
189        let offset = std::mem::size_of::<DataStoreHeader>();
190        let ihead = std::mem::size_of::<DataStoreIndex>();
191        let phead = std::mem::size_of::<DataStorePager>();
192        let base_items = 1 + (total_len >> 10);
193        let rup_items = round_up(base_items, 10);
194        let sub_bytes = offset + ihead + phead;
195        let sub_items = sub_bytes / std::mem::size_of::<u32>();
196        let index_length = rup_items - sub_items;
197        let data_offset = offset + ihead + index_length * std::mem::size_of::<u32>();
198
199        (offset, index_length, data_offset)
200    }
201
202    pub fn init<P>(file_path: &P) -> Result<Self>
203    where
204        P: AsRef<Path>,
205    {
206        let readonly = false;
207        let mapping = Self::load_mapping(file_path, readonly)?;
208
209        let total_length = mapping.len();
210        let (index_offset, index_length, data_offset) = Self::derive_index_bounds(total_length);
211        let index_modulo_max = index_length * 99 / 100;
212        let index_modulo = sieve::get_le_prime(index_modulo_max as u32);
213
214        if (data_offset + std::mem::size_of::<DataStorePager>()) > mapping.len() {
215            Err(PsDataLakeError::InitFailedNotEnoughSpace(mapping.len()))?;
216        }
217
218        let mut map = mapping.try_write()?;
219
220        unsafe {
221            DataStoreIndex::init_at_ptr(
222                map[index_offset..index_offset].as_mut_ptr(),
223                (),
224                index_length,
225            )
226        }
227        .fill(0);
228
229        unsafe {
230            DataStorePager::init_at_ptr(
231                map[data_offset..data_offset].as_mut_ptr(),
232                (),
233                (total_length - data_offset) / CHUNK_SIZE,
234            );
235        }
236
237        let mut guard = DataStoreWriteGuard::from(map);
238        let header = guard.get_header();
239
240        header.magic = MAGIC;
241        header.index_modulo = index_modulo;
242        header.free_chunk = 0;
243        header.index_offset = index_offset as u64;
244        header.data_offset = data_offset as u64;
245
246        drop(guard);
247
248        let store = Self::load(file_path, false)?;
249
250        store.put_opaque_chunk(&BorrowedDataChunk::from_data(
251            b"<< DATA SEGMENT BEGINS HERE >>",
252            // Chunk #0 cannot be accessed and is therefore reserved for metadata
253            // about this DataStore, the size of which shall not exceed 192 bytes
254        )?)?;
255
256        Ok(store)
257    }
258
259    pub fn get_chunk_by_index(&self, index: usize) -> Result<MbufDataChunk<'_>> {
260        match self.shared().get_pager().get(index) {
261            Some(page) => Ok(page.mbuf().into()),
262            None => Err(PsDataLakeError::RangeError),
263        }
264    }
265
266    #[must_use]
267    pub fn calculate_index_bucket(hash: &Hash, index_modulo: u32) -> u32 {
268        (u32::from_be_bytes(base64::sized_decode::<4>(&hash.as_bytes()[..6]))) % index_modulo
269    }
270
271    pub fn get_bucket_index_chunk_by_hash(
272        &self,
273        hash: &Hash,
274    ) -> Result<(u32, u32, Option<MbufDataChunk<'_>>)> {
275        let shared = self.shared();
276        let header = shared.get_header();
277        let index = shared.get_index();
278        let bucket = Self::calculate_index_bucket(hash, header.index_modulo);
279
280        drop(shared);
281
282        for bucket in bucket..index.len() as u32 {
283            let index = index
284                .get(bucket as usize)
285                .ok_or(PsDataLakeError::IndexBucketOverflow)?;
286
287            if *index == 0 {
288                return Ok((bucket, 0, None));
289            }
290
291            let chunk = self.get_chunk_by_index(*index as usize)?;
292
293            if chunk.hash_ref() == hash {
294                return Ok((bucket, *index, Some(chunk)));
295            }
296        }
297
298        Err(PsDataLakeError::IndexBucketOverflow)
299    }
300
301    pub fn get_bucket_by_hash(&'lt self, hash: &Hash) -> Result<u32> {
302        let (bucket, _, _) = self.get_bucket_index_chunk_by_hash(hash)?;
303
304        Ok(bucket)
305    }
306
307    pub fn get_index_by_hash(&'lt self, hash: &Hash) -> Result<u32> {
308        let (_, index, chunk) = self.get_bucket_index_chunk_by_hash(hash)?;
309
310        chunk.ok_or(PsDataLakeError::NotFound)?;
311
312        Ok(index)
313    }
314
315    pub fn get_chunk_by_hash(&'lt self, hash: &Hash) -> Result<MbufDataChunk<'lt>> {
316        let (_, _, chunk) = self.get_bucket_index_chunk_by_hash(hash)?;
317
318        chunk.ok_or(PsDataLakeError::NotFound)
319    }
320
321    pub fn get_chunk_by_hkey(&'lt self, key: &Hkey) -> Result<Resolved<MbufDataChunk<'lt>>> {
322        match key {
323            Hkey::Raw(raw) => Ok(Resolved::Data(raw.clone())),
324            Hkey::Base64(base64) => {
325                Ok(OwnedDataChunk::from_data(ps_base64::decode(base64.as_bytes()))?.into())
326            }
327            Hkey::Direct(hash) => Ok(Resolved::Custom(self.get_chunk_by_hash(hash)?)),
328            Hkey::Encrypted(hash, key) => {
329                let chunk = self.get_chunk_by_hash(hash)?;
330                let decrypted = chunk.decrypt(key.as_bytes())?;
331
332                Ok(decrypted.into())
333            }
334            Hkey::ListRef(hash, key) => {
335                let hkey = Hkey::Encrypted(hash.clone(), key.clone());
336                let long = Self::get_chunk_by_hkey(self, &hkey)?;
337                let hkey = Hkey::parse(long.data_ref());
338
339                self.get_chunk_by_hkey(&hkey)
340            }
341            Hkey::List(list) => {
342                // Parallel fetching of chunks
343                let chunks: Vec<Result<Resolved<MbufDataChunk>>> = list
344                    .par_iter()
345                    .map(|hkey| self.get_chunk_by_hkey(hkey))
346                    .collect();
347
348                // Collect and concatenate data from chunks
349                let buffer: Result<Vec<u8>> =
350                    chunks.into_iter().try_fold(vec![], |mut buffer, chunk| {
351                        buffer.extend_from_slice(chunk?.data_ref());
352                        Ok(buffer)
353                    });
354
355                // Convert the concatenated data into an OwnedDataChunk
356                let chunk = OwnedDataChunk::from_data(buffer?)?;
357
358                Ok(chunk.into())
359            }
360            _ => key.resolve(self),
361        }
362    }
363
364    /// Stores opaque data and returns a tuple containing the bucket, index, and [`DataChunk`].
365    ///
366    /// # Arguments
367    ///
368    /// * `opaque_chunk` - A [`DataChunk`] representing the opaque data to be stored.
369    ///
370    /// # Returns
371    ///
372    /// A `Result` containing:
373    /// - `Ok((bucket, index, datachunk))` on success:
374    ///   - `bucket` (u32): The hashmap index of the chunk.
375    ///   - `index` (u32): The chunk's index withing the storage file.
376    ///   - `datachunk` (MbufDataChunk): The chunk of data that was stored.
377    /// - `Err(PsDataLakeError)` on failure:
378    ///   - An error of type `PsDataLakeError` indicating the reason for failure.
379    pub fn put_opaque_chunk<C: DataChunk>(
380        &self,
381        opaque_chunk: &C,
382    ) -> Result<(u32, u32, MbufDataChunk<'_>)> {
383        let existing = self.get_bucket_index_chunk_by_hash(opaque_chunk.hash_ref())?;
384        let (bucket, index, existing) = existing;
385
386        if let Some(chunk) = existing {
387            return Ok((bucket, index, chunk));
388        }
389
390        if self.readonly {
391            Err(PsDataLakeError::DataStoreNotRw)?;
392        }
393
394        let mut atomic = self.atomic()?;
395
396        let next_free_chunk = atomic.get_header().free_chunk as usize;
397
398        let required_chunks = DataStorePage::bytes_to_pages(opaque_chunk.data_ref().len());
399        let available_chunks = atomic
400            .get_pager()
401            .len()
402            .checked_sub(next_free_chunk)
403            .ok_or(PsDataLakeError::DataStoreOutOfSpace)?;
404
405        if available_chunks < required_chunks {
406            Err(PsDataLakeError::DataStoreOutOfSpace)?;
407        }
408
409        let pointer = std::ptr::from_ref(
410            atomic
411                .get_pager()
412                .get(next_free_chunk)
413                .ok_or(PsDataLakeError::RangeError)?
414                .mbuf(),
415        ) as *mut u8;
416
417        unsafe {
418            DataStorePageMbuf::write_to_ptr(pointer, *opaque_chunk.hash(), opaque_chunk.data_ref())
419        };
420
421        atomic.get_header().free_chunk += required_chunks as u32;
422
423        atomic.get_index()[bucket as usize] = next_free_chunk as u32;
424
425        drop(atomic);
426
427        Ok((
428            bucket,
429            next_free_chunk as u32,
430            self.get_chunk_by_index(next_free_chunk)?,
431        ))
432    }
433
434    pub fn put_encrypted_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
435        let length = chunk.data_ref().len();
436
437        if length <= MAX_SIZE_RAW || length > MAX_ENCRYPTED_SIZE {
438            return self.put_chunk(chunk);
439        }
440
441        let encrypted = chunk.encrypt()?;
442
443        if encrypted.data_ref().len() > length {
444            Ok(self.put_opaque_chunk(chunk)?.2.hash().into())
445        } else {
446            let chunk = self.put_opaque_chunk(&encrypted)?.2;
447
448            if chunk.hash_ref() != encrypted.hash_ref() {
449                Err(PsDataLakeError::StorageFailure)?;
450            }
451
452            Ok((encrypted.hash(), encrypted.key()).into())
453        }
454    }
455
456    pub fn put_large_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
457        self.put_large_blob(chunk.data_ref())
458    }
459
460    pub fn put_chunk<C: DataChunk>(&'lt self, chunk: &C) -> Result<Hkey> {
461        if chunk.data_ref().len() <= MAX_SIZE_RAW {
462            return Ok(Hkey::from_raw(chunk.data_ref()));
463        }
464
465        if chunk.data_ref().len() > MAX_DECRYPTED_SIZE {
466            return self.put_large_chunk(chunk);
467        }
468
469        let encrypted = chunk.encrypt()?;
470
471        let stored_chunk = self.put_opaque_chunk(&encrypted)?.2;
472
473        if stored_chunk.hash_ref() != encrypted.hash_ref() {
474            Err(PsDataLakeError::StorageFailure)?;
475        }
476
477        Ok(Hkey::Encrypted(encrypted.hash(), encrypted.key()))
478    }
479
480    pub fn put_large_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
481        // sanity check
482        if blob.len() <= MAX_DECRYPTED_SIZE {
483            return self.put_blob(blob);
484        }
485
486        LongHkeyExpanded::from_blob(self, blob)?.shrink(self)
487    }
488
489    pub fn put_blob(&'lt self, blob: &[u8]) -> Result<Hkey> {
490        if blob.len() <= MAX_SIZE_RAW {
491            Ok(Hkey::from_raw(blob))
492        } else if blob.len() > MAX_DECRYPTED_SIZE {
493            self.put_large_blob(blob)
494        } else {
495            self.put_chunk(&BorrowedDataChunk::from_data(blob)?)
496        }
497    }
498}
499
500impl<'lt> Store for DataStore<'lt> {
501    type Chunk<'c>
502        = MbufDataChunk<'c>
503    where
504        'lt: 'c;
505    type Error = PsDataLakeError;
506
507    fn get<'a>(&'a self, hash: &Hash) -> std::result::Result<Self::Chunk<'a>, Self::Error> {
508        self.get_chunk_by_hash(hash)
509    }
510
511    fn put(&self, data: &[u8]) -> std::result::Result<Hkey, Self::Error> {
512        self.put_blob(data)
513    }
514
515    fn put_encrypted<C: DataChunk>(&self, chunk: C) -> std::result::Result<(), Self::Error> {
516        self.put_encrypted_chunk(&chunk)?;
517        Ok(())
518    }
519}