vecdb/variants/raw/
mod.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    marker::PhantomData,
4    mem,
5    path::PathBuf,
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10};
11
12use allocative::Allocative;
13use log::info;
14use rawdb::{Database, Reader, Region};
15use zerocopy::{FromBytes, IntoBytes};
16
17use crate::{
18    AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BUFFER_SIZE, BoxedVecIterator,
19    CollectableVec, Error, GenericStoredVec, Result, StoredIndex, StoredRaw, Version,
20};
21
22use super::Format;
23
24mod header;
25mod iterators;
26mod options;
27
28pub use header::*;
29pub use iterators::*;
30pub use options::*;
31
32const VERSION: Version = Version::ONE;
33
34#[derive(Debug, Allocative)]
35pub struct RawVec<I, T> {
36    #[allocative(skip)]
37    region: Region,
38
39    header: Header,
40    name: &'static str,
41    prev_pushed: Vec<T>,
42    pushed: Vec<T>,
43    has_stored_holes: bool,
44    holes: BTreeSet<usize>,
45    prev_holes: BTreeSet<usize>,
46    updated: BTreeMap<usize, T>,
47    prev_updated: BTreeMap<usize, T>,
48    prev_stored_len: usize,
49    stored_len: Arc<AtomicUsize>,
50    /// Default is 0
51    saved_stamped_changes: u16,
52
53    phantom: PhantomData<I>,
54}
55
56impl<I, T> RawVec<I, T>
57where
58    I: StoredIndex,
59    T: StoredRaw,
60{
61    /// Same as import but will reset the vec under certain errors, so be careful !
62    pub fn forced_import(db: &Database, name: &str, version: Version) -> Result<Self> {
63        Self::forced_import_with((db, name, version).into())
64    }
65
66    /// Same as import but will reset the vec under certain errors, so be careful !
67    pub fn forced_import_with(mut options: ImportOptions) -> Result<Self> {
68        options.version = options.version + VERSION;
69        let res = Self::import_with(options);
70        match res {
71            Err(Error::DifferentCompressionMode)
72            | Err(Error::WrongEndian)
73            | Err(Error::WrongLength)
74            | Err(Error::DifferentVersion { .. }) => {
75                info!("Resetting {}...", options.name);
76                let _ = options
77                    .db
78                    .remove_region_with_id(&Self::vec_region_name_(options.name));
79                let _ = options
80                    .db
81                    .remove_region_with_id(&Self::holes_region_name_(options.name));
82                Self::import_with(options)
83            }
84            _ => res,
85        }
86    }
87
88    pub fn import(db: &Database, name: &str, version: Version) -> Result<Self> {
89        Self::import_with((db, name, version).into())
90    }
91
92    pub fn import_with(options: ImportOptions) -> Result<Self> {
93        Self::import_(options, Format::Raw)
94    }
95
96    #[doc(hidden)]
97    pub fn import_(
98        ImportOptions {
99            db,
100            name,
101            version,
102            saved_stamped_changes,
103        }: ImportOptions,
104        format: Format,
105    ) -> Result<Self> {
106        let region = db.create_region_if_needed(&Self::vec_region_name_(name))?;
107
108        let region_len = region.meta().read().len() as usize;
109        if region_len > 0
110            && (region_len < HEADER_OFFSET as usize
111                || (format.is_raw()
112                    && !(region_len - HEADER_OFFSET as usize).is_multiple_of(Self::SIZE_OF_T)))
113        {
114            dbg!(region_len, region_len, HEADER_OFFSET);
115            return Err(Error::Str("Region was saved incorrectly"));
116        }
117
118        let header = if region_len == 0 {
119            Header::create_and_write(&region, version, format)?
120        } else {
121            Header::import_and_verify(&region, version, format)?
122        };
123
124        let holes = if let Some(holes) = db.get_region(&Self::holes_region_name_(name)) {
125            Some(
126                holes
127                    .create_reader()
128                    .read_all()
129                    .chunks(size_of::<usize>())
130                    .map(|b| -> Result<usize> { usize::read_from_bytes(b).map_err(|e| e.into()) })
131                    .collect::<Result<BTreeSet<usize>>>()?,
132            )
133        } else {
134            None
135        };
136
137        let mut this = Self {
138            region: region.clone(),
139            header,
140            name: Box::leak(Box::new(name.to_string())),
141            prev_pushed: vec![],
142            pushed: vec![],
143            has_stored_holes: holes.is_some(),
144            holes: holes.clone().unwrap_or_default(),
145            prev_holes: holes.unwrap_or_default(),
146            updated: BTreeMap::new(),
147            prev_updated: BTreeMap::new(),
148            phantom: PhantomData,
149            prev_stored_len: 0,
150            stored_len: Arc::new(AtomicUsize::new(0)),
151            saved_stamped_changes,
152        };
153
154        let len = this.real_stored_len();
155        *this.mut_prev_stored_len() = len;
156        this.update_stored_len(len);
157
158        Ok(this)
159    }
160
161    #[inline]
162    pub fn iter(&self) -> Result<RawVecIterator<'_, I, T>> {
163        RawVecIterator::new(self)
164    }
165
166    #[inline]
167    pub fn clean_iter(&self) -> Result<CleanRawVecIterator<'_, I, T>> {
168        CleanRawVecIterator::new(self)
169    }
170
171    #[inline]
172    pub fn dirty_iter(&self) -> Result<DirtyRawVecIterator<'_, I, T>> {
173        DirtyRawVecIterator::new(self)
174    }
175
176    pub fn write_header_if_needed(&mut self) -> Result<()> {
177        if self.header.modified() {
178            self.header.write(&self.region)?;
179        }
180        Ok(())
181    }
182
183    #[inline]
184    pub fn prev_holes(&self) -> &BTreeSet<usize> {
185        &self.prev_holes
186    }
187
188    #[inline]
189    pub fn is_dirty(&self) -> bool {
190        !self.is_pushed_empty() || !self.holes.is_empty() || !self.updated.is_empty()
191    }
192
193    /// Calculate optimal buffer size aligned to SIZE_OF_T
194    #[inline]
195    const fn aligned_buffer_size() -> usize {
196        (BUFFER_SIZE / Self::SIZE_OF_T) * Self::SIZE_OF_T
197    }
198}
199
200impl<I, T> Clone for RawVec<I, T> {
201    fn clone(&self) -> Self {
202        Self {
203            region: self.region.clone(),
204            header: self.header.clone(),
205            name: self.name,
206            prev_pushed: vec![],
207            pushed: vec![],
208            updated: BTreeMap::new(),
209            prev_updated: BTreeMap::new(),
210            has_stored_holes: false,
211            holes: BTreeSet::new(),
212            prev_holes: BTreeSet::new(),
213            prev_stored_len: 0,
214            stored_len: self.stored_len.clone(),
215            saved_stamped_changes: self.saved_stamped_changes,
216            phantom: PhantomData,
217        }
218    }
219}
220
221impl<I, T> AnyVec for RawVec<I, T>
222where
223    I: StoredIndex,
224    T: StoredRaw,
225{
226    #[inline]
227    fn version(&self) -> Version {
228        self.header.vec_version()
229    }
230
231    #[inline]
232    fn name(&self) -> &str {
233        self.name
234    }
235
236    #[inline]
237    fn len(&self) -> usize {
238        self.dirty_len()
239    }
240
241    #[inline]
242    fn index_type_to_string(&self) -> &'static str {
243        I::to_string()
244    }
245
246    #[inline]
247    fn value_type_to_size_of(&self) -> usize {
248        size_of::<T>()
249    }
250
251    #[inline]
252    fn region_names(&self) -> Vec<String> {
253        vec![self.index_to_name()]
254    }
255}
256
257impl<I, T> AnyStoredVec for RawVec<I, T>
258where
259    I: StoredIndex,
260    T: StoredRaw,
261{
262    #[inline]
263    fn db_path(&self) -> PathBuf {
264        self.region.db().path().to_path_buf()
265    }
266
267    #[inline]
268    fn header(&self) -> &Header {
269        &self.header
270    }
271
272    #[inline]
273    fn mut_header(&mut self) -> &mut Header {
274        &mut self.header
275    }
276
277    #[inline]
278    fn saved_stamped_changes(&self) -> u16 {
279        self.saved_stamped_changes
280    }
281
282    #[inline]
283    fn real_stored_len(&self) -> usize {
284        (self.region.meta().read().len() as usize - HEADER_OFFSET as usize) / Self::SIZE_OF_T
285    }
286
287    #[inline]
288    fn stored_len(&self) -> usize {
289        self.stored_len.load(Ordering::Acquire)
290    }
291
292    fn flush(&mut self) -> Result<()> {
293        self.write_header_if_needed()?;
294
295        let stored_len = self.stored_len();
296        let pushed_len = self.pushed_len();
297        let real_stored_len = self.real_stored_len();
298        // After rollback, stored_len can be > real_stored_len (missing items are in updated map)
299        let truncated = stored_len < real_stored_len;
300        let expanded = stored_len > real_stored_len;
301        let has_new_data = pushed_len != 0;
302        let has_updated_data = !self.updated.is_empty();
303        let has_holes = !self.holes.is_empty();
304        let had_holes = self.has_stored_holes;
305
306        if !truncated && !expanded && !has_new_data && !has_updated_data && !has_holes && !had_holes
307        {
308            return Ok(());
309        }
310
311        let from = (stored_len * Self::SIZE_OF_T + HEADER_OFFSET as usize) as u64;
312
313        if has_new_data {
314            self.region
315                .truncate_write_all(from, mem::take(&mut self.pushed).as_bytes())?;
316            self.update_stored_len(stored_len + pushed_len);
317        } else if truncated {
318            self.region.truncate(from)?;
319        }
320
321        if has_updated_data {
322            let updated = mem::take(&mut self.updated);
323            updated.into_iter().try_for_each(|(i, v)| -> Result<()> {
324                let bytes = v.as_bytes();
325                let at = (i * Self::SIZE_OF_T) as u64 + HEADER_OFFSET;
326                self.region.write_all_at(bytes, at)?;
327                Ok(())
328            })?;
329        }
330
331        if has_holes {
332            self.has_stored_holes = true;
333            let holes = self
334                .region
335                .db()
336                .create_region_if_needed(&self.holes_region_name())?;
337            let bytes = self
338                .holes
339                .iter()
340                .flat_map(|i| i.to_ne_bytes())
341                .collect::<Vec<_>>();
342            holes.truncate_write_all(0, &bytes)?;
343        } else if had_holes {
344            self.has_stored_holes = false;
345            let _ = self
346                .region
347                .db()
348                .remove_region_with_id(&self.holes_region_name());
349        }
350
351        Ok(())
352    }
353
354    fn region(&self) -> &Region {
355        &self.region
356    }
357
358    fn serialize_changes(&self) -> Result<Vec<u8>> {
359        let mut bytes = vec![];
360        let reader = self.create_reader();
361
362        bytes.extend(self.stamp().as_bytes());
363
364        // let real_stored_len = self.real_stored_len();
365        let prev_stored_len = self.prev_stored_len();
366        let stored_len = self.stored_len();
367
368        bytes.extend(prev_stored_len.as_bytes());
369        bytes.extend(stored_len.as_bytes());
370
371        let truncated = prev_stored_len.checked_sub(stored_len).unwrap_or_default();
372        bytes.extend(truncated.as_bytes());
373        if truncated > 0 {
374            let truncated_vals = (stored_len..prev_stored_len)
375                .map(|i| {
376                    // Prefer prev_updated, then read from disk
377                    self.prev_updated
378                        .get(&i)
379                        .cloned()
380                        .unwrap_or_else(|| self.read_at_unwrap(i, &reader))
381                })
382                .collect::<Vec<_>>();
383            bytes.extend(truncated_vals.as_bytes());
384        }
385
386        bytes.extend(self.prev_pushed.len().as_bytes());
387        bytes.extend(self.prev_pushed.iter().flat_map(|v| v.as_bytes()));
388
389        bytes.extend(self.pushed.len().as_bytes());
390        bytes.extend(self.pushed.iter().flat_map(|v| v.as_bytes()));
391
392        let (prev_modified_indexes, prev_modified_values) = self
393            .prev_updated
394            .iter()
395            .map(|(&i, v)| (i, v.clone()))
396            .collect::<(Vec<_>, Vec<_>)>();
397        bytes.extend(prev_modified_indexes.len().as_bytes());
398        bytes.extend(prev_modified_indexes.as_bytes());
399        bytes.extend(prev_modified_values.as_bytes());
400
401        let (modified_indexes, modified_values) = self
402            .updated
403            .keys()
404            .map(|&i| {
405                // Prefer prev_updated values over disk values (for post-rollback state)
406                let val = self
407                    .prev_updated
408                    .get(&i)
409                    .cloned()
410                    .unwrap_or_else(|| self.read_at_unwrap(i, &reader));
411                (i, val)
412            })
413            .collect::<(Vec<_>, Vec<_>)>();
414        bytes.extend(modified_indexes.len().as_bytes());
415        bytes.extend(modified_indexes.as_bytes());
416        bytes.extend(modified_values.as_bytes());
417
418        let prev_holes = self.prev_holes.iter().copied().collect::<Vec<_>>();
419        bytes.extend(prev_holes.len().as_bytes());
420        bytes.extend(prev_holes.as_bytes());
421
422        let holes = self.holes.iter().copied().collect::<Vec<_>>();
423        bytes.extend(holes.len().as_bytes());
424        bytes.extend(holes.as_bytes());
425
426        Ok(bytes)
427    }
428}
429
430impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
431where
432    I: StoredIndex,
433    T: StoredRaw,
434{
435    #[inline(always)]
436    fn read_at(&self, index: usize, reader: &Reader) -> Result<T> {
437        T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T) as u64 + HEADER_OFFSET))
438            .map(|(v, _)| v)
439            .map_err(Error::from)
440    }
441
442    #[inline]
443    fn pushed(&self) -> &[T] {
444        self.pushed.as_slice()
445    }
446    #[inline]
447    fn mut_pushed(&mut self) -> &mut Vec<T> {
448        &mut self.pushed
449    }
450    #[inline]
451    fn prev_pushed(&self) -> &[T] {
452        self.prev_pushed.as_slice()
453    }
454    #[inline]
455    fn mut_prev_pushed(&mut self) -> &mut Vec<T> {
456        &mut self.prev_pushed
457    }
458
459    #[inline(always)]
460    fn holes(&self) -> &BTreeSet<usize> {
461        &self.holes
462    }
463    #[inline]
464    fn mut_holes(&mut self) -> &mut BTreeSet<usize> {
465        &mut self.holes
466    }
467    #[inline]
468    fn prev_holes(&self) -> &BTreeSet<usize> {
469        &self.prev_holes
470    }
471    #[inline]
472    fn mut_prev_holes(&mut self) -> &mut BTreeSet<usize> {
473        &mut self.prev_holes
474    }
475
476    fn prev_stored_len(&self) -> usize {
477        self.prev_stored_len
478    }
479    fn mut_prev_stored_len(&mut self) -> &mut usize {
480        &mut self.prev_stored_len
481    }
482    fn update_stored_len(&self, val: usize) {
483        self.stored_len.store(val, Ordering::Release);
484    }
485
486    #[inline(always)]
487    fn updated(&self) -> &BTreeMap<usize, T> {
488        &self.updated
489    }
490    #[inline]
491    fn mut_updated(&mut self) -> &mut BTreeMap<usize, T> {
492        &mut self.updated
493    }
494    #[inline]
495    fn prev_updated(&self) -> &BTreeMap<usize, T> {
496        &self.prev_updated
497    }
498    #[inline]
499    fn mut_prev_updated(&mut self) -> &mut BTreeMap<usize, T> {
500        &mut self.prev_updated
501    }
502
503    fn reset(&mut self) -> Result<()> {
504        self.clear()
505    }
506}
507
508impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
509where
510    I: StoredIndex,
511    T: StoredRaw,
512{
513    type Item = T;
514    type IntoIter = RawVecIterator<'a, I, T>;
515
516    fn into_iter(self) -> Self::IntoIter {
517        self.iter().expect("RawVecIter::new(self) to work")
518    }
519}
520
521impl<I, T> AnyIterableVec<I, T> for RawVec<I, T>
522where
523    I: StoredIndex,
524    T: StoredRaw,
525{
526    fn boxed_iter(&self) -> BoxedVecIterator<'_, I, T> {
527        Box::new(self.into_iter())
528    }
529}
530
531impl<I, T> AnyCollectableVec for RawVec<I, T>
532where
533    I: StoredIndex,
534    T: StoredRaw,
535{
536    fn collect_range_json_bytes(&self, from: Option<usize>, to: Option<usize>) -> Vec<u8> {
537        CollectableVec::collect_range_json_bytes(self, from, to)
538    }
539
540    fn collect_range_string(&self, from: Option<usize>, to: Option<usize>) -> Vec<String> {
541        CollectableVec::collect_range_string(self, from, to)
542    }
543}