1use std::{
2 collections::{BTreeMap, BTreeSet},
3 marker::PhantomData,
4 mem,
5 path::PathBuf,
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10};
11
12use allocative::Allocative;
13use log::info;
14use rawdb::{Database, Reader, Region};
15use zerocopy::{FromBytes, IntoBytes};
16
17use crate::{
18 AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BUFFER_SIZE, BoxedVecIterator,
19 CollectableVec, Error, GenericStoredVec, Result, StoredIndex, StoredRaw, Version,
20};
21
22use super::Format;
23
24mod header;
25mod iterators;
26mod options;
27
28pub use header::*;
29pub use iterators::*;
30pub use options::*;
31
32const VERSION: Version = Version::ONE;
33
34#[derive(Debug, Allocative)]
35pub struct RawVec<I, T> {
36 region: Region,
37
38 header: Header,
39 name: &'static str,
40 prev_pushed: Vec<T>,
41 pushed: Vec<T>,
42 has_stored_holes: bool,
43 holes: BTreeSet<usize>,
44 prev_holes: BTreeSet<usize>,
45 updated: BTreeMap<usize, T>,
46 prev_updated: BTreeMap<usize, T>,
47 prev_stored_len: usize,
48 stored_len: Arc<AtomicUsize>,
49 saved_stamped_changes: u16,
51
52 phantom: PhantomData<I>,
53}
54
55impl<I, T> RawVec<I, T>
56where
57 I: StoredIndex,
58 T: StoredRaw,
59{
60 pub fn forced_import(db: &Database, name: &str, version: Version) -> Result<Self> {
62 Self::forced_import_with((db, name, version).into())
63 }
64
65 pub fn forced_import_with(mut options: ImportOptions) -> Result<Self> {
67 options.version = options.version + VERSION;
68 let res = Self::import_with(options);
69 match res {
70 Err(Error::DifferentCompressionMode)
71 | Err(Error::WrongEndian)
72 | Err(Error::WrongLength)
73 | Err(Error::DifferentVersion { .. }) => {
74 info!("Resetting {}...", options.name);
75 let _ = options
76 .db
77 .remove_region_with_id(&Self::vec_region_name_(options.name));
78 let _ = options
79 .db
80 .remove_region_with_id(&Self::holes_region_name_(options.name));
81 Self::import_with(options)
82 }
83 _ => res,
84 }
85 }
86
87 pub fn import(db: &Database, name: &str, version: Version) -> Result<Self> {
88 Self::import_with((db, name, version).into())
89 }
90
91 pub fn import_with(options: ImportOptions) -> Result<Self> {
92 Self::import_(options, Format::Raw)
93 }
94
95 #[doc(hidden)]
96 pub fn import_(
97 ImportOptions {
98 db,
99 name,
100 version,
101 saved_stamped_changes,
102 }: ImportOptions,
103 format: Format,
104 ) -> Result<Self> {
105 let region = db.create_region_if_needed(&Self::vec_region_name_(name))?;
106
107 let region_len = region.meta().read().len() as usize;
108 if region_len > 0
109 && (region_len < HEADER_OFFSET as usize
110 || (format.is_raw()
111 && !(region_len - HEADER_OFFSET as usize).is_multiple_of(Self::SIZE_OF_T)))
112 {
113 dbg!(region_len, region_len, HEADER_OFFSET);
114 return Err(Error::Str("Region was saved incorrectly"));
115 }
116
117 let header = if region_len == 0 {
118 Header::create_and_write(®ion, version, format)?
119 } else {
120 Header::import_and_verify(®ion, version, format)?
121 };
122
123 let holes = if let Some(holes) = db.get_region(&Self::holes_region_name_(name)) {
124 Some(
125 holes
126 .create_reader()
127 .read_all()
128 .chunks(size_of::<usize>())
129 .map(|b| -> Result<usize> { usize::read_from_bytes(b).map_err(|e| e.into()) })
130 .collect::<Result<BTreeSet<usize>>>()?,
131 )
132 } else {
133 None
134 };
135
136 let mut this = Self {
137 region: region.clone(),
138 header,
139 name: Box::leak(Box::new(name.to_string())),
140 prev_pushed: vec![],
141 pushed: vec![],
142 has_stored_holes: holes.is_some(),
143 holes: holes.clone().unwrap_or_default(),
144 prev_holes: holes.unwrap_or_default(),
145 updated: BTreeMap::new(),
146 prev_updated: BTreeMap::new(),
147 phantom: PhantomData,
148 prev_stored_len: 0,
149 stored_len: Arc::new(AtomicUsize::new(0)),
150 saved_stamped_changes,
151 };
152
153 let len = this.real_stored_len();
154 *this.mut_prev_stored_len() = len;
155 this.update_stored_len(len);
156
157 Ok(this)
158 }
159
160 #[inline]
161 pub fn iter(&self) -> Result<RawVecIterator<'_, I, T>> {
162 RawVecIterator::new(self)
163 }
164
165 #[inline]
166 pub fn clean_iter(&self) -> Result<CleanRawVecIterator<'_, I, T>> {
167 CleanRawVecIterator::new(self)
168 }
169
170 #[inline]
171 pub fn dirty_iter(&self) -> Result<DirtyRawVecIterator<'_, I, T>> {
172 DirtyRawVecIterator::new(self)
173 }
174
175 pub fn write_header_if_needed(&mut self) -> Result<()> {
176 if self.header.modified() {
177 self.header.write(&self.region)?;
178 }
179 Ok(())
180 }
181
182 #[inline]
183 pub fn prev_holes(&self) -> &BTreeSet<usize> {
184 &self.prev_holes
185 }
186
187 #[inline]
188 pub fn is_dirty(&self) -> bool {
189 !self.is_pushed_empty() || !self.holes.is_empty() || !self.updated.is_empty()
190 }
191
192 #[inline]
194 const fn aligned_buffer_size() -> usize {
195 (BUFFER_SIZE / Self::SIZE_OF_T) * Self::SIZE_OF_T
196 }
197}
198
199impl<I, T> Clone for RawVec<I, T> {
200 fn clone(&self) -> Self {
201 Self {
202 region: self.region.clone(),
203 header: self.header.clone(),
204 name: self.name,
205 prev_pushed: vec![],
206 pushed: vec![],
207 updated: BTreeMap::new(),
208 prev_updated: BTreeMap::new(),
209 has_stored_holes: false,
210 holes: BTreeSet::new(),
211 prev_holes: BTreeSet::new(),
212 prev_stored_len: 0,
213 stored_len: self.stored_len.clone(),
214 saved_stamped_changes: self.saved_stamped_changes,
215 phantom: PhantomData,
216 }
217 }
218}
219
220impl<I, T> AnyVec for RawVec<I, T>
221where
222 I: StoredIndex,
223 T: StoredRaw,
224{
225 #[inline]
226 fn version(&self) -> Version {
227 self.header.vec_version()
228 }
229
230 #[inline]
231 fn name(&self) -> &str {
232 self.name
233 }
234
235 #[inline]
236 fn len(&self) -> usize {
237 self.len_()
238 }
239
240 #[inline]
241 fn index_type_to_string(&self) -> &'static str {
242 I::to_string()
243 }
244
245 #[inline]
246 fn value_type_to_size_of(&self) -> usize {
247 size_of::<T>()
248 }
249
250 #[inline]
251 fn region_names(&self) -> Vec<String> {
252 vec![self.index_to_name()]
253 }
254}
255
256impl<I, T> AnyStoredVec for RawVec<I, T>
257where
258 I: StoredIndex,
259 T: StoredRaw,
260{
261 #[inline]
262 fn db_path(&self) -> PathBuf {
263 self.region.db().path().to_path_buf()
264 }
265
266 #[inline]
267 fn header(&self) -> &Header {
268 &self.header
269 }
270
271 #[inline]
272 fn mut_header(&mut self) -> &mut Header {
273 &mut self.header
274 }
275
276 #[inline]
277 fn saved_stamped_changes(&self) -> u16 {
278 self.saved_stamped_changes
279 }
280
281 #[inline]
282 fn real_stored_len(&self) -> usize {
283 (self.region.meta().read().len() as usize - HEADER_OFFSET as usize) / Self::SIZE_OF_T
284 }
285
286 #[inline]
287 fn stored_len(&self) -> usize {
288 self.stored_len.load(Ordering::Acquire)
289 }
290
291 fn flush(&mut self) -> Result<()> {
292 self.write_header_if_needed()?;
293
294 let stored_len = self.stored_len();
295 let pushed_len = self.pushed_len();
296 let real_stored_len = self.real_stored_len();
297 let truncated = stored_len < real_stored_len;
299 let expanded = stored_len > real_stored_len;
300 let has_new_data = pushed_len != 0;
301 let has_updated_data = !self.updated.is_empty();
302 let has_holes = !self.holes.is_empty();
303 let had_holes = self.has_stored_holes;
304
305 if !truncated && !expanded && !has_new_data && !has_updated_data && !has_holes && !had_holes
306 {
307 return Ok(());
308 }
309
310 let from = (stored_len * Self::SIZE_OF_T + HEADER_OFFSET as usize) as u64;
311
312 if has_new_data {
313 self.region
314 .truncate_write_all(from, mem::take(&mut self.pushed).as_bytes())?;
315 self.update_stored_len(stored_len + pushed_len);
316 } else if truncated {
317 self.region.truncate(from)?;
318 }
319
320 if has_updated_data {
321 let updated = mem::take(&mut self.updated);
322 updated.into_iter().try_for_each(|(i, v)| -> Result<()> {
323 let bytes = v.as_bytes();
324 let at = (i * Self::SIZE_OF_T) as u64 + HEADER_OFFSET;
325 self.region.write_all_at(bytes, at)?;
326 Ok(())
327 })?;
328 }
329
330 if has_holes {
331 self.has_stored_holes = true;
332 let holes = self
333 .region
334 .db()
335 .create_region_if_needed(&self.holes_region_name())?;
336 let bytes = self
337 .holes
338 .iter()
339 .flat_map(|i| i.to_ne_bytes())
340 .collect::<Vec<_>>();
341 holes.truncate_write_all(0, &bytes)?;
342 } else if had_holes {
343 self.has_stored_holes = false;
344 let _ = self
345 .region
346 .db()
347 .remove_region_with_id(&self.holes_region_name());
348 }
349
350 Ok(())
351 }
352
353 fn region(&self) -> &Region {
354 &self.region
355 }
356
357 fn serialize_changes(&self) -> Result<Vec<u8>> {
358 let mut bytes = vec![];
359 let reader = self.create_reader();
360
361 bytes.extend(self.stamp().as_bytes());
362
363 let prev_stored_len = self.prev_stored_len();
365 let stored_len = self.stored_len();
366
367 bytes.extend(prev_stored_len.as_bytes());
368 bytes.extend(stored_len.as_bytes());
369
370 let truncated = prev_stored_len.checked_sub(stored_len).unwrap_or_default();
371 bytes.extend(truncated.as_bytes());
372 if truncated > 0 {
373 let truncated_vals = (stored_len..prev_stored_len)
374 .map(|i| {
375 self.prev_updated
377 .get(&i)
378 .cloned()
379 .unwrap_or_else(|| self.unwrap_read_(i, &reader))
380 })
381 .collect::<Vec<_>>();
382 bytes.extend(truncated_vals.as_bytes());
383 }
384
385 bytes.extend(self.prev_pushed.len().as_bytes());
386 bytes.extend(self.prev_pushed.iter().flat_map(|v| v.as_bytes()));
387
388 bytes.extend(self.pushed.len().as_bytes());
389 bytes.extend(self.pushed.iter().flat_map(|v| v.as_bytes()));
390
391 let (prev_modified_indexes, prev_modified_values) = self
392 .prev_updated
393 .iter()
394 .map(|(&i, v)| (i, v.clone()))
395 .collect::<(Vec<_>, Vec<_>)>();
396 bytes.extend(prev_modified_indexes.len().as_bytes());
397 bytes.extend(prev_modified_indexes.as_bytes());
398 bytes.extend(prev_modified_values.as_bytes());
399
400 let (modified_indexes, modified_values) = self
401 .updated
402 .keys()
403 .map(|&i| {
404 let val = self
406 .prev_updated
407 .get(&i)
408 .cloned()
409 .unwrap_or_else(|| self.unwrap_read_(i, &reader));
410 (i, val)
411 })
412 .collect::<(Vec<_>, Vec<_>)>();
413 bytes.extend(modified_indexes.len().as_bytes());
414 bytes.extend(modified_indexes.as_bytes());
415 bytes.extend(modified_values.as_bytes());
416
417 let prev_holes = self.prev_holes.iter().copied().collect::<Vec<_>>();
418 bytes.extend(prev_holes.len().as_bytes());
419 bytes.extend(prev_holes.as_bytes());
420
421 let holes = self.holes.iter().copied().collect::<Vec<_>>();
422 bytes.extend(holes.len().as_bytes());
423 bytes.extend(holes.as_bytes());
424
425 Ok(bytes)
426 }
427}
428
429impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
430where
431 I: StoredIndex,
432 T: StoredRaw,
433{
434 #[inline(always)]
435 fn read_(&self, index: usize, reader: &Reader) -> Result<T> {
436 T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T) as u64 + HEADER_OFFSET))
437 .map(|(v, _)| v)
438 .map_err(Error::from)
439 }
440
441 #[inline]
442 fn pushed(&self) -> &[T] {
443 self.pushed.as_slice()
444 }
445 #[inline]
446 fn mut_pushed(&mut self) -> &mut Vec<T> {
447 &mut self.pushed
448 }
449 #[inline]
450 fn prev_pushed(&self) -> &[T] {
451 self.prev_pushed.as_slice()
452 }
453 #[inline]
454 fn mut_prev_pushed(&mut self) -> &mut Vec<T> {
455 &mut self.prev_pushed
456 }
457
458 #[inline(always)]
459 fn holes(&self) -> &BTreeSet<usize> {
460 &self.holes
461 }
462 #[inline]
463 fn mut_holes(&mut self) -> &mut BTreeSet<usize> {
464 &mut self.holes
465 }
466 #[inline]
467 fn prev_holes(&self) -> &BTreeSet<usize> {
468 &self.prev_holes
469 }
470 #[inline]
471 fn mut_prev_holes(&mut self) -> &mut BTreeSet<usize> {
472 &mut self.prev_holes
473 }
474
475 fn prev_stored_len(&self) -> usize {
476 self.prev_stored_len
477 }
478 fn mut_prev_stored_len(&mut self) -> &mut usize {
479 &mut self.prev_stored_len
480 }
481 fn update_stored_len(&self, val: usize) {
482 self.stored_len.store(val, Ordering::Release);
483 }
484
485 #[inline(always)]
486 fn updated(&self) -> &BTreeMap<usize, T> {
487 &self.updated
488 }
489 #[inline]
490 fn mut_updated(&mut self) -> &mut BTreeMap<usize, T> {
491 &mut self.updated
492 }
493 #[inline]
494 fn prev_updated(&self) -> &BTreeMap<usize, T> {
495 &self.prev_updated
496 }
497 #[inline]
498 fn mut_prev_updated(&mut self) -> &mut BTreeMap<usize, T> {
499 &mut self.prev_updated
500 }
501
502 fn reset(&mut self) -> Result<()> {
503 self.reset_()
504 }
505}
506
507impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
508where
509 I: StoredIndex,
510 T: StoredRaw,
511{
512 type Item = T;
513 type IntoIter = RawVecIterator<'a, I, T>;
514
515 fn into_iter(self) -> Self::IntoIter {
516 self.iter().expect("RawVecIter::new(self) to work")
517 }
518}
519
520impl<I, T> AnyIterableVec<I, T> for RawVec<I, T>
521where
522 I: StoredIndex,
523 T: StoredRaw,
524{
525 fn boxed_iter(&self) -> BoxedVecIterator<'_, I, T> {
526 Box::new(self.into_iter())
527 }
528}
529
530impl<I, T> AnyCollectableVec for RawVec<I, T>
531where
532 I: StoredIndex,
533 T: StoredRaw,
534{
535 fn collect_range_json_bytes(&self, from: Option<usize>, to: Option<usize>) -> Vec<u8> {
536 CollectableVec::collect_range_json_bytes(self, from, to)
537 }
538
539 fn collect_range_string(&self, from: Option<usize>, to: Option<usize>) -> Vec<String> {
540 CollectableVec::collect_range_string(self, from, to)
541 }
542}