vecdb/variants/raw/
mod.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    marker::PhantomData,
4    mem,
5    path::PathBuf,
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10};
11
12use allocative::Allocative;
13use log::info;
14use rawdb::{Database, Reader, Region};
15use zerocopy::{FromBytes, IntoBytes};
16
17use crate::{
18    AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BUFFER_SIZE, BoxedVecIterator,
19    CollectableVec, Error, GenericStoredVec, Result, StoredIndex, StoredRaw, Version,
20};
21
22use super::Format;
23
24mod header;
25mod iterators;
26mod options;
27
28pub use header::*;
29pub use iterators::*;
30pub use options::*;
31
32const VERSION: Version = Version::ONE;
33
34#[derive(Debug, Allocative)]
35pub struct RawVec<I, T> {
36    region: Region,
37
38    header: Header,
39    name: &'static str,
40    prev_pushed: Vec<T>,
41    pushed: Vec<T>,
42    has_stored_holes: bool,
43    holes: BTreeSet<usize>,
44    prev_holes: BTreeSet<usize>,
45    updated: BTreeMap<usize, T>,
46    prev_updated: BTreeMap<usize, T>,
47    prev_stored_len: usize,
48    stored_len: Arc<AtomicUsize>,
49    /// Default is 0
50    saved_stamped_changes: u16,
51
52    phantom: PhantomData<I>,
53}
54
55impl<I, T> RawVec<I, T>
56where
57    I: StoredIndex,
58    T: StoredRaw,
59{
60    /// Same as import but will reset the vec under certain errors, so be careful !
61    pub fn forced_import(db: &Database, name: &str, version: Version) -> Result<Self> {
62        Self::forced_import_with((db, name, version).into())
63    }
64
65    /// Same as import but will reset the vec under certain errors, so be careful !
66    pub fn forced_import_with(mut options: ImportOptions) -> Result<Self> {
67        options.version = options.version + VERSION;
68        let res = Self::import_with(options);
69        match res {
70            Err(Error::DifferentCompressionMode)
71            | Err(Error::WrongEndian)
72            | Err(Error::WrongLength)
73            | Err(Error::DifferentVersion { .. }) => {
74                info!("Resetting {}...", options.name);
75                let _ = options
76                    .db
77                    .remove_region_with_id(&Self::vec_region_name_(options.name));
78                let _ = options
79                    .db
80                    .remove_region_with_id(&Self::holes_region_name_(options.name));
81                Self::import_with(options)
82            }
83            _ => res,
84        }
85    }
86
87    pub fn import(db: &Database, name: &str, version: Version) -> Result<Self> {
88        Self::import_with((db, name, version).into())
89    }
90
91    pub fn import_with(options: ImportOptions) -> Result<Self> {
92        Self::import_(options, Format::Raw)
93    }
94
95    #[doc(hidden)]
96    pub fn import_(
97        ImportOptions {
98            db,
99            name,
100            version,
101            saved_stamped_changes,
102        }: ImportOptions,
103        format: Format,
104    ) -> Result<Self> {
105        let region = db.create_region_if_needed(&Self::vec_region_name_(name))?;
106
107        let region_len = region.meta().read().len() as usize;
108        if region_len > 0
109            && (region_len < HEADER_OFFSET as usize
110                || (format.is_raw()
111                    && !(region_len - HEADER_OFFSET as usize).is_multiple_of(Self::SIZE_OF_T)))
112        {
113            dbg!(region_len, region_len, HEADER_OFFSET);
114            return Err(Error::Str("Region was saved incorrectly"));
115        }
116
117        let header = if region_len == 0 {
118            Header::create_and_write(&region, version, format)?
119        } else {
120            Header::import_and_verify(&region, version, format)?
121        };
122
123        let holes = if let Some(holes) = db.get_region(&Self::holes_region_name_(name)) {
124            Some(
125                holes
126                    .create_reader()
127                    .read_all()
128                    .chunks(size_of::<usize>())
129                    .map(|b| -> Result<usize> { usize::read_from_bytes(b).map_err(|e| e.into()) })
130                    .collect::<Result<BTreeSet<usize>>>()?,
131            )
132        } else {
133            None
134        };
135
136        let mut this = Self {
137            region: region.clone(),
138            header,
139            name: Box::leak(Box::new(name.to_string())),
140            prev_pushed: vec![],
141            pushed: vec![],
142            has_stored_holes: holes.is_some(),
143            holes: holes.clone().unwrap_or_default(),
144            prev_holes: holes.unwrap_or_default(),
145            updated: BTreeMap::new(),
146            prev_updated: BTreeMap::new(),
147            phantom: PhantomData,
148            prev_stored_len: 0,
149            stored_len: Arc::new(AtomicUsize::new(0)),
150            saved_stamped_changes,
151        };
152
153        let len = this.real_stored_len();
154        *this.mut_prev_stored_len() = len;
155        this.update_stored_len(len);
156
157        Ok(this)
158    }
159
160    #[inline]
161    pub fn iter(&self) -> Result<RawVecIterator<'_, I, T>> {
162        RawVecIterator::new(self)
163    }
164
165    #[inline]
166    pub fn clean_iter(&self) -> Result<CleanRawVecIterator<'_, I, T>> {
167        CleanRawVecIterator::new(self)
168    }
169
170    #[inline]
171    pub fn dirty_iter(&self) -> Result<DirtyRawVecIterator<'_, I, T>> {
172        DirtyRawVecIterator::new(self)
173    }
174
175    pub fn write_header_if_needed(&mut self) -> Result<()> {
176        if self.header.modified() {
177            self.header.write(&self.region)?;
178        }
179        Ok(())
180    }
181
182    #[inline]
183    pub fn prev_holes(&self) -> &BTreeSet<usize> {
184        &self.prev_holes
185    }
186
187    #[inline]
188    pub fn is_dirty(&self) -> bool {
189        !self.is_pushed_empty() || !self.holes.is_empty() || !self.updated.is_empty()
190    }
191
192    /// Calculate optimal buffer size aligned to SIZE_OF_T
193    #[inline]
194    const fn aligned_buffer_size() -> usize {
195        (BUFFER_SIZE / Self::SIZE_OF_T) * Self::SIZE_OF_T
196    }
197}
198
199impl<I, T> Clone for RawVec<I, T> {
200    fn clone(&self) -> Self {
201        Self {
202            region: self.region.clone(),
203            header: self.header.clone(),
204            name: self.name,
205            prev_pushed: vec![],
206            pushed: vec![],
207            updated: BTreeMap::new(),
208            prev_updated: BTreeMap::new(),
209            has_stored_holes: false,
210            holes: BTreeSet::new(),
211            prev_holes: BTreeSet::new(),
212            prev_stored_len: 0,
213            stored_len: self.stored_len.clone(),
214            saved_stamped_changes: self.saved_stamped_changes,
215            phantom: PhantomData,
216        }
217    }
218}
219
220impl<I, T> AnyVec for RawVec<I, T>
221where
222    I: StoredIndex,
223    T: StoredRaw,
224{
225    #[inline]
226    fn version(&self) -> Version {
227        self.header.vec_version()
228    }
229
230    #[inline]
231    fn name(&self) -> &str {
232        self.name
233    }
234
235    #[inline]
236    fn len(&self) -> usize {
237        self.len_()
238    }
239
240    #[inline]
241    fn index_type_to_string(&self) -> &'static str {
242        I::to_string()
243    }
244
245    #[inline]
246    fn value_type_to_size_of(&self) -> usize {
247        size_of::<T>()
248    }
249
250    #[inline]
251    fn region_names(&self) -> Vec<String> {
252        vec![self.index_to_name()]
253    }
254}
255
256impl<I, T> AnyStoredVec for RawVec<I, T>
257where
258    I: StoredIndex,
259    T: StoredRaw,
260{
261    #[inline]
262    fn db_path(&self) -> PathBuf {
263        self.region.db().path().to_path_buf()
264    }
265
266    #[inline]
267    fn header(&self) -> &Header {
268        &self.header
269    }
270
271    #[inline]
272    fn mut_header(&mut self) -> &mut Header {
273        &mut self.header
274    }
275
276    #[inline]
277    fn saved_stamped_changes(&self) -> u16 {
278        self.saved_stamped_changes
279    }
280
281    #[inline]
282    fn real_stored_len(&self) -> usize {
283        (self.region.meta().read().len() as usize - HEADER_OFFSET as usize) / Self::SIZE_OF_T
284    }
285
286    #[inline]
287    fn stored_len(&self) -> usize {
288        self.stored_len.load(Ordering::Acquire)
289    }
290
291    fn flush(&mut self) -> Result<()> {
292        self.write_header_if_needed()?;
293
294        let stored_len = self.stored_len();
295        let pushed_len = self.pushed_len();
296        let real_stored_len = self.real_stored_len();
297        // After rollback, stored_len can be > real_stored_len (missing items are in updated map)
298        let truncated = stored_len < real_stored_len;
299        let expanded = stored_len > real_stored_len;
300        let has_new_data = pushed_len != 0;
301        let has_updated_data = !self.updated.is_empty();
302        let has_holes = !self.holes.is_empty();
303        let had_holes = self.has_stored_holes;
304
305        if !truncated && !expanded && !has_new_data && !has_updated_data && !has_holes && !had_holes
306        {
307            return Ok(());
308        }
309
310        let from = (stored_len * Self::SIZE_OF_T + HEADER_OFFSET as usize) as u64;
311
312        if has_new_data {
313            self.region
314                .truncate_write_all(from, mem::take(&mut self.pushed).as_bytes())?;
315            self.update_stored_len(stored_len + pushed_len);
316        } else if truncated {
317            self.region.truncate(from)?;
318        }
319
320        if has_updated_data {
321            let updated = mem::take(&mut self.updated);
322            updated.into_iter().try_for_each(|(i, v)| -> Result<()> {
323                let bytes = v.as_bytes();
324                let at = (i * Self::SIZE_OF_T) as u64 + HEADER_OFFSET;
325                self.region.write_all_at(bytes, at)?;
326                Ok(())
327            })?;
328        }
329
330        if has_holes {
331            self.has_stored_holes = true;
332            let holes = self
333                .region
334                .db()
335                .create_region_if_needed(&self.holes_region_name())?;
336            let bytes = self
337                .holes
338                .iter()
339                .flat_map(|i| i.to_ne_bytes())
340                .collect::<Vec<_>>();
341            holes.truncate_write_all(0, &bytes)?;
342        } else if had_holes {
343            self.has_stored_holes = false;
344            let _ = self
345                .region
346                .db()
347                .remove_region_with_id(&self.holes_region_name());
348        }
349
350        Ok(())
351    }
352
353    fn region(&self) -> &Region {
354        &self.region
355    }
356
357    fn serialize_changes(&self) -> Result<Vec<u8>> {
358        let mut bytes = vec![];
359        let reader = self.create_reader();
360
361        bytes.extend(self.stamp().as_bytes());
362
363        // let real_stored_len = self.real_stored_len();
364        let prev_stored_len = self.prev_stored_len();
365        let stored_len = self.stored_len();
366
367        bytes.extend(prev_stored_len.as_bytes());
368        bytes.extend(stored_len.as_bytes());
369
370        let truncated = prev_stored_len.checked_sub(stored_len).unwrap_or_default();
371        bytes.extend(truncated.as_bytes());
372        if truncated > 0 {
373            let truncated_vals = (stored_len..prev_stored_len)
374                .map(|i| {
375                    // Prefer prev_updated, then read from disk
376                    self.prev_updated
377                        .get(&i)
378                        .cloned()
379                        .unwrap_or_else(|| self.unwrap_read_(i, &reader))
380                })
381                .collect::<Vec<_>>();
382            bytes.extend(truncated_vals.as_bytes());
383        }
384
385        bytes.extend(self.prev_pushed.len().as_bytes());
386        bytes.extend(self.prev_pushed.iter().flat_map(|v| v.as_bytes()));
387
388        bytes.extend(self.pushed.len().as_bytes());
389        bytes.extend(self.pushed.iter().flat_map(|v| v.as_bytes()));
390
391        let (prev_modified_indexes, prev_modified_values) = self
392            .prev_updated
393            .iter()
394            .map(|(&i, v)| (i, v.clone()))
395            .collect::<(Vec<_>, Vec<_>)>();
396        bytes.extend(prev_modified_indexes.len().as_bytes());
397        bytes.extend(prev_modified_indexes.as_bytes());
398        bytes.extend(prev_modified_values.as_bytes());
399
400        let (modified_indexes, modified_values) = self
401            .updated
402            .keys()
403            .map(|&i| {
404                // Prefer prev_updated values over disk values (for post-rollback state)
405                let val = self
406                    .prev_updated
407                    .get(&i)
408                    .cloned()
409                    .unwrap_or_else(|| self.unwrap_read_(i, &reader));
410                (i, val)
411            })
412            .collect::<(Vec<_>, Vec<_>)>();
413        bytes.extend(modified_indexes.len().as_bytes());
414        bytes.extend(modified_indexes.as_bytes());
415        bytes.extend(modified_values.as_bytes());
416
417        let prev_holes = self.prev_holes.iter().copied().collect::<Vec<_>>();
418        bytes.extend(prev_holes.len().as_bytes());
419        bytes.extend(prev_holes.as_bytes());
420
421        let holes = self.holes.iter().copied().collect::<Vec<_>>();
422        bytes.extend(holes.len().as_bytes());
423        bytes.extend(holes.as_bytes());
424
425        Ok(bytes)
426    }
427}
428
429impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
430where
431    I: StoredIndex,
432    T: StoredRaw,
433{
434    #[inline(always)]
435    fn read_(&self, index: usize, reader: &Reader) -> Result<T> {
436        T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T) as u64 + HEADER_OFFSET))
437            .map(|(v, _)| v)
438            .map_err(Error::from)
439    }
440
441    #[inline]
442    fn pushed(&self) -> &[T] {
443        self.pushed.as_slice()
444    }
445    #[inline]
446    fn mut_pushed(&mut self) -> &mut Vec<T> {
447        &mut self.pushed
448    }
449    #[inline]
450    fn prev_pushed(&self) -> &[T] {
451        self.prev_pushed.as_slice()
452    }
453    #[inline]
454    fn mut_prev_pushed(&mut self) -> &mut Vec<T> {
455        &mut self.prev_pushed
456    }
457
458    #[inline(always)]
459    fn holes(&self) -> &BTreeSet<usize> {
460        &self.holes
461    }
462    #[inline]
463    fn mut_holes(&mut self) -> &mut BTreeSet<usize> {
464        &mut self.holes
465    }
466    #[inline]
467    fn prev_holes(&self) -> &BTreeSet<usize> {
468        &self.prev_holes
469    }
470    #[inline]
471    fn mut_prev_holes(&mut self) -> &mut BTreeSet<usize> {
472        &mut self.prev_holes
473    }
474
475    fn prev_stored_len(&self) -> usize {
476        self.prev_stored_len
477    }
478    fn mut_prev_stored_len(&mut self) -> &mut usize {
479        &mut self.prev_stored_len
480    }
481    fn update_stored_len(&self, val: usize) {
482        self.stored_len.store(val, Ordering::Release);
483    }
484
485    #[inline(always)]
486    fn updated(&self) -> &BTreeMap<usize, T> {
487        &self.updated
488    }
489    #[inline]
490    fn mut_updated(&mut self) -> &mut BTreeMap<usize, T> {
491        &mut self.updated
492    }
493    #[inline]
494    fn prev_updated(&self) -> &BTreeMap<usize, T> {
495        &self.prev_updated
496    }
497    #[inline]
498    fn mut_prev_updated(&mut self) -> &mut BTreeMap<usize, T> {
499        &mut self.prev_updated
500    }
501
502    fn reset(&mut self) -> Result<()> {
503        self.reset_()
504    }
505}
506
507impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
508where
509    I: StoredIndex,
510    T: StoredRaw,
511{
512    type Item = T;
513    type IntoIter = RawVecIterator<'a, I, T>;
514
515    fn into_iter(self) -> Self::IntoIter {
516        self.iter().expect("RawVecIter::new(self) to work")
517    }
518}
519
520impl<I, T> AnyIterableVec<I, T> for RawVec<I, T>
521where
522    I: StoredIndex,
523    T: StoredRaw,
524{
525    fn boxed_iter(&self) -> BoxedVecIterator<'_, I, T> {
526        Box::new(self.into_iter())
527    }
528}
529
530impl<I, T> AnyCollectableVec for RawVec<I, T>
531where
532    I: StoredIndex,
533    T: StoredRaw,
534{
535    fn collect_range_json_bytes(&self, from: Option<usize>, to: Option<usize>) -> Vec<u8> {
536        CollectableVec::collect_range_json_bytes(self, from, to)
537    }
538
539    fn collect_range_string(&self, from: Option<usize>, to: Option<usize>) -> Vec<String> {
540        CollectableVec::collect_range_string(self, from, to)
541    }
542}