1use std::{
2 collections::{BTreeMap, BTreeSet},
3 marker::PhantomData,
4 mem,
5 path::PathBuf,
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10};
11
12use allocative::Allocative;
13use log::info;
14use rawdb::{Database, Reader, Region};
15use zerocopy::{FromBytes, IntoBytes};
16
17use crate::{
18 AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BUFFER_SIZE, BoxedVecIterator,
19 CollectableVec, Error, GenericStoredVec, Result, StoredIndex, StoredRaw, Version,
20};
21
22use super::Format;
23
24mod header;
25mod iterators;
26mod options;
27
28pub use header::*;
29pub use iterators::*;
30pub use options::*;
31
32const VERSION: Version = Version::ONE;
33
34#[derive(Debug, Allocative)]
35pub struct RawVec<I, T> {
36 #[allocative(skip)]
37 region: Region,
38
39 header: Header,
40 name: &'static str,
41 prev_pushed: Vec<T>,
42 pushed: Vec<T>,
43 has_stored_holes: bool,
44 holes: BTreeSet<usize>,
45 prev_holes: BTreeSet<usize>,
46 updated: BTreeMap<usize, T>,
47 prev_updated: BTreeMap<usize, T>,
48 prev_stored_len: usize,
49 stored_len: Arc<AtomicUsize>,
50 saved_stamped_changes: u16,
52
53 phantom: PhantomData<I>,
54}
55
56impl<I, T> RawVec<I, T>
57where
58 I: StoredIndex,
59 T: StoredRaw,
60{
61 pub fn forced_import(db: &Database, name: &str, version: Version) -> Result<Self> {
63 Self::forced_import_with((db, name, version).into())
64 }
65
66 pub fn forced_import_with(mut options: ImportOptions) -> Result<Self> {
68 options.version = options.version + VERSION;
69 let res = Self::import_with(options);
70 match res {
71 Err(Error::DifferentCompressionMode)
72 | Err(Error::WrongEndian)
73 | Err(Error::WrongLength)
74 | Err(Error::DifferentVersion { .. }) => {
75 info!("Resetting {}...", options.name);
76 let _ = options
77 .db
78 .remove_region_with_id(&Self::vec_region_name_(options.name));
79 let _ = options
80 .db
81 .remove_region_with_id(&Self::holes_region_name_(options.name));
82 Self::import_with(options)
83 }
84 _ => res,
85 }
86 }
87
88 pub fn import(db: &Database, name: &str, version: Version) -> Result<Self> {
89 Self::import_with((db, name, version).into())
90 }
91
92 pub fn import_with(options: ImportOptions) -> Result<Self> {
93 Self::import_(options, Format::Raw)
94 }
95
96 #[doc(hidden)]
97 pub fn import_(
98 ImportOptions {
99 db,
100 name,
101 version,
102 saved_stamped_changes,
103 }: ImportOptions,
104 format: Format,
105 ) -> Result<Self> {
106 let region = db.create_region_if_needed(&Self::vec_region_name_(name))?;
107
108 let region_len = region.meta().read().len() as usize;
109 if region_len > 0
110 && (region_len < HEADER_OFFSET as usize
111 || (format.is_raw()
112 && !(region_len - HEADER_OFFSET as usize).is_multiple_of(Self::SIZE_OF_T)))
113 {
114 dbg!(region_len, region_len, HEADER_OFFSET);
115 return Err(Error::Str("Region was saved incorrectly"));
116 }
117
118 let header = if region_len == 0 {
119 Header::create_and_write(®ion, version, format)?
120 } else {
121 Header::import_and_verify(®ion, version, format)?
122 };
123
124 let holes = if let Some(holes) = db.get_region(&Self::holes_region_name_(name)) {
125 Some(
126 holes
127 .create_reader()
128 .read_all()
129 .chunks(size_of::<usize>())
130 .map(|b| -> Result<usize> { usize::read_from_bytes(b).map_err(|e| e.into()) })
131 .collect::<Result<BTreeSet<usize>>>()?,
132 )
133 } else {
134 None
135 };
136
137 let mut this = Self {
138 region: region.clone(),
139 header,
140 name: Box::leak(Box::new(name.to_string())),
141 prev_pushed: vec![],
142 pushed: vec![],
143 has_stored_holes: holes.is_some(),
144 holes: holes.clone().unwrap_or_default(),
145 prev_holes: holes.unwrap_or_default(),
146 updated: BTreeMap::new(),
147 prev_updated: BTreeMap::new(),
148 phantom: PhantomData,
149 prev_stored_len: 0,
150 stored_len: Arc::new(AtomicUsize::new(0)),
151 saved_stamped_changes,
152 };
153
154 let len = this.real_stored_len();
155 *this.mut_prev_stored_len() = len;
156 this.update_stored_len(len);
157
158 Ok(this)
159 }
160
161 #[inline]
162 pub fn iter(&self) -> Result<RawVecIterator<'_, I, T>> {
163 RawVecIterator::new(self)
164 }
165
166 #[inline]
167 pub fn clean_iter(&self) -> Result<CleanRawVecIterator<'_, I, T>> {
168 CleanRawVecIterator::new(self)
169 }
170
171 #[inline]
172 pub fn dirty_iter(&self) -> Result<DirtyRawVecIterator<'_, I, T>> {
173 DirtyRawVecIterator::new(self)
174 }
175
176 pub fn write_header_if_needed(&mut self) -> Result<()> {
177 if self.header.modified() {
178 self.header.write(&self.region)?;
179 }
180 Ok(())
181 }
182
183 #[inline]
184 pub fn prev_holes(&self) -> &BTreeSet<usize> {
185 &self.prev_holes
186 }
187
188 #[inline]
189 pub fn is_dirty(&self) -> bool {
190 !self.is_pushed_empty() || !self.holes.is_empty() || !self.updated.is_empty()
191 }
192
193 #[inline]
195 const fn aligned_buffer_size() -> usize {
196 (BUFFER_SIZE / Self::SIZE_OF_T) * Self::SIZE_OF_T
197 }
198}
199
200impl<I, T> Clone for RawVec<I, T> {
201 fn clone(&self) -> Self {
202 Self {
203 region: self.region.clone(),
204 header: self.header.clone(),
205 name: self.name,
206 prev_pushed: vec![],
207 pushed: vec![],
208 updated: BTreeMap::new(),
209 prev_updated: BTreeMap::new(),
210 has_stored_holes: false,
211 holes: BTreeSet::new(),
212 prev_holes: BTreeSet::new(),
213 prev_stored_len: 0,
214 stored_len: self.stored_len.clone(),
215 saved_stamped_changes: self.saved_stamped_changes,
216 phantom: PhantomData,
217 }
218 }
219}
220
221impl<I, T> AnyVec for RawVec<I, T>
222where
223 I: StoredIndex,
224 T: StoredRaw,
225{
226 #[inline]
227 fn version(&self) -> Version {
228 self.header.vec_version()
229 }
230
231 #[inline]
232 fn name(&self) -> &str {
233 self.name
234 }
235
236 #[inline]
237 fn len(&self) -> usize {
238 self.dirty_len()
239 }
240
241 #[inline]
242 fn index_type_to_string(&self) -> &'static str {
243 I::to_string()
244 }
245
246 #[inline]
247 fn value_type_to_size_of(&self) -> usize {
248 size_of::<T>()
249 }
250
251 #[inline]
252 fn region_names(&self) -> Vec<String> {
253 vec![self.index_to_name()]
254 }
255}
256
257impl<I, T> AnyStoredVec for RawVec<I, T>
258where
259 I: StoredIndex,
260 T: StoredRaw,
261{
262 #[inline]
263 fn db_path(&self) -> PathBuf {
264 self.region.db().path().to_path_buf()
265 }
266
267 #[inline]
268 fn header(&self) -> &Header {
269 &self.header
270 }
271
272 #[inline]
273 fn mut_header(&mut self) -> &mut Header {
274 &mut self.header
275 }
276
277 #[inline]
278 fn saved_stamped_changes(&self) -> u16 {
279 self.saved_stamped_changes
280 }
281
282 #[inline]
283 fn real_stored_len(&self) -> usize {
284 (self.region.meta().read().len() as usize - HEADER_OFFSET as usize) / Self::SIZE_OF_T
285 }
286
287 #[inline]
288 fn stored_len(&self) -> usize {
289 self.stored_len.load(Ordering::Acquire)
290 }
291
292 fn flush(&mut self) -> Result<()> {
293 self.write_header_if_needed()?;
294
295 let stored_len = self.stored_len();
296 let pushed_len = self.pushed_len();
297 let real_stored_len = self.real_stored_len();
298 let truncated = stored_len < real_stored_len;
300 let expanded = stored_len > real_stored_len;
301 let has_new_data = pushed_len != 0;
302 let has_updated_data = !self.updated.is_empty();
303 let has_holes = !self.holes.is_empty();
304 let had_holes = self.has_stored_holes;
305
306 if !truncated && !expanded && !has_new_data && !has_updated_data && !has_holes && !had_holes
307 {
308 return Ok(());
309 }
310
311 let from = (stored_len * Self::SIZE_OF_T + HEADER_OFFSET as usize) as u64;
312
313 if has_new_data {
314 self.region
315 .truncate_write_all(from, mem::take(&mut self.pushed).as_bytes())?;
316 self.update_stored_len(stored_len + pushed_len);
317 } else if truncated {
318 self.region.truncate(from)?;
319 }
320
321 if has_updated_data {
322 let updated = mem::take(&mut self.updated);
323 updated.into_iter().try_for_each(|(i, v)| -> Result<()> {
324 let bytes = v.as_bytes();
325 let at = (i * Self::SIZE_OF_T) as u64 + HEADER_OFFSET;
326 self.region.write_all_at(bytes, at)?;
327 Ok(())
328 })?;
329 }
330
331 if has_holes {
332 self.has_stored_holes = true;
333 let holes = self
334 .region
335 .db()
336 .create_region_if_needed(&self.holes_region_name())?;
337 let bytes = self
338 .holes
339 .iter()
340 .flat_map(|i| i.to_ne_bytes())
341 .collect::<Vec<_>>();
342 holes.truncate_write_all(0, &bytes)?;
343 } else if had_holes {
344 self.has_stored_holes = false;
345 let _ = self
346 .region
347 .db()
348 .remove_region_with_id(&self.holes_region_name());
349 }
350
351 Ok(())
352 }
353
354 fn region(&self) -> &Region {
355 &self.region
356 }
357
358 fn serialize_changes(&self) -> Result<Vec<u8>> {
359 let mut bytes = vec![];
360 let reader = self.create_reader();
361
362 bytes.extend(self.stamp().as_bytes());
363
364 let prev_stored_len = self.prev_stored_len();
366 let stored_len = self.stored_len();
367
368 bytes.extend(prev_stored_len.as_bytes());
369 bytes.extend(stored_len.as_bytes());
370
371 let truncated = prev_stored_len.checked_sub(stored_len).unwrap_or_default();
372 bytes.extend(truncated.as_bytes());
373 if truncated > 0 {
374 let truncated_vals = (stored_len..prev_stored_len)
375 .map(|i| {
376 self.prev_updated
378 .get(&i)
379 .cloned()
380 .unwrap_or_else(|| self.read_at_unwrap(i, &reader))
381 })
382 .collect::<Vec<_>>();
383 bytes.extend(truncated_vals.as_bytes());
384 }
385
386 bytes.extend(self.prev_pushed.len().as_bytes());
387 bytes.extend(self.prev_pushed.iter().flat_map(|v| v.as_bytes()));
388
389 bytes.extend(self.pushed.len().as_bytes());
390 bytes.extend(self.pushed.iter().flat_map(|v| v.as_bytes()));
391
392 let (prev_modified_indexes, prev_modified_values) = self
393 .prev_updated
394 .iter()
395 .map(|(&i, v)| (i, v.clone()))
396 .collect::<(Vec<_>, Vec<_>)>();
397 bytes.extend(prev_modified_indexes.len().as_bytes());
398 bytes.extend(prev_modified_indexes.as_bytes());
399 bytes.extend(prev_modified_values.as_bytes());
400
401 let (modified_indexes, modified_values) = self
402 .updated
403 .keys()
404 .map(|&i| {
405 let val = self
407 .prev_updated
408 .get(&i)
409 .cloned()
410 .unwrap_or_else(|| self.read_at_unwrap(i, &reader));
411 (i, val)
412 })
413 .collect::<(Vec<_>, Vec<_>)>();
414 bytes.extend(modified_indexes.len().as_bytes());
415 bytes.extend(modified_indexes.as_bytes());
416 bytes.extend(modified_values.as_bytes());
417
418 let prev_holes = self.prev_holes.iter().copied().collect::<Vec<_>>();
419 bytes.extend(prev_holes.len().as_bytes());
420 bytes.extend(prev_holes.as_bytes());
421
422 let holes = self.holes.iter().copied().collect::<Vec<_>>();
423 bytes.extend(holes.len().as_bytes());
424 bytes.extend(holes.as_bytes());
425
426 Ok(bytes)
427 }
428}
429
430impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
431where
432 I: StoredIndex,
433 T: StoredRaw,
434{
435 #[inline(always)]
436 fn read_at(&self, index: usize, reader: &Reader) -> Result<T> {
437 T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T) as u64 + HEADER_OFFSET))
438 .map(|(v, _)| v)
439 .map_err(Error::from)
440 }
441
442 #[inline]
443 fn pushed(&self) -> &[T] {
444 self.pushed.as_slice()
445 }
446 #[inline]
447 fn mut_pushed(&mut self) -> &mut Vec<T> {
448 &mut self.pushed
449 }
450 #[inline]
451 fn prev_pushed(&self) -> &[T] {
452 self.prev_pushed.as_slice()
453 }
454 #[inline]
455 fn mut_prev_pushed(&mut self) -> &mut Vec<T> {
456 &mut self.prev_pushed
457 }
458
459 #[inline(always)]
460 fn holes(&self) -> &BTreeSet<usize> {
461 &self.holes
462 }
463 #[inline]
464 fn mut_holes(&mut self) -> &mut BTreeSet<usize> {
465 &mut self.holes
466 }
467 #[inline]
468 fn prev_holes(&self) -> &BTreeSet<usize> {
469 &self.prev_holes
470 }
471 #[inline]
472 fn mut_prev_holes(&mut self) -> &mut BTreeSet<usize> {
473 &mut self.prev_holes
474 }
475
476 fn prev_stored_len(&self) -> usize {
477 self.prev_stored_len
478 }
479 fn mut_prev_stored_len(&mut self) -> &mut usize {
480 &mut self.prev_stored_len
481 }
482 fn update_stored_len(&self, val: usize) {
483 self.stored_len.store(val, Ordering::Release);
484 }
485
486 #[inline(always)]
487 fn updated(&self) -> &BTreeMap<usize, T> {
488 &self.updated
489 }
490 #[inline]
491 fn mut_updated(&mut self) -> &mut BTreeMap<usize, T> {
492 &mut self.updated
493 }
494 #[inline]
495 fn prev_updated(&self) -> &BTreeMap<usize, T> {
496 &self.prev_updated
497 }
498 #[inline]
499 fn mut_prev_updated(&mut self) -> &mut BTreeMap<usize, T> {
500 &mut self.prev_updated
501 }
502
503 fn reset(&mut self) -> Result<()> {
504 self.clear()
505 }
506}
507
508impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
509where
510 I: StoredIndex,
511 T: StoredRaw,
512{
513 type Item = T;
514 type IntoIter = RawVecIterator<'a, I, T>;
515
516 fn into_iter(self) -> Self::IntoIter {
517 self.iter().expect("RawVecIter::new(self) to work")
518 }
519}
520
521impl<I, T> AnyIterableVec<I, T> for RawVec<I, T>
522where
523 I: StoredIndex,
524 T: StoredRaw,
525{
526 fn boxed_iter(&self) -> BoxedVecIterator<'_, I, T> {
527 Box::new(self.into_iter())
528 }
529}
530
531impl<I, T> AnyCollectableVec for RawVec<I, T>
532where
533 I: StoredIndex,
534 T: StoredRaw,
535{
536 fn collect_range_json_bytes(&self, from: Option<usize>, to: Option<usize>) -> Vec<u8> {
537 CollectableVec::collect_range_json_bytes(self, from, to)
538 }
539
540 fn collect_range_string(&self, from: Option<usize>, to: Option<usize>) -> Vec<String> {
541 CollectableVec::collect_range_string(self, from, to)
542 }
543}