1use std::{
2 cmp::Ordering,
3 collections::{BTreeMap, BTreeSet},
4 fs,
5 path::PathBuf,
6};
7
8use log::info;
9use rawdb::Reader;
10use zerocopy::FromBytes;
11
12use crate::{AnyStoredVec, Error, Exit, Result, SEPARATOR, Stamp, Version};
13
14const ONE_KIB: usize = 1024;
15const ONE_MIB: usize = ONE_KIB * ONE_KIB;
16const MAX_CACHE_SIZE: usize = 256 * ONE_MIB;
17
18use super::{StoredIndex, StoredRaw};
19
20pub trait GenericStoredVec<I, T>: Send + Sync
21where
22 Self: AnyStoredVec,
23 I: StoredIndex,
24 T: StoredRaw,
25{
26 const SIZE_OF_T: usize = size_of::<T>();
27
28 fn create_reader(&'_ self) -> Reader<'_> {
34 self.create_static_reader()
35 }
36
37 fn create_static_reader(&self) -> Reader<'static> {
43 unsafe { std::mem::transmute(self.region().create_reader()) }
44 }
45
46 #[inline]
48 fn read_unwrap(&self, index: I, reader: &Reader) -> T {
49 self.read_with(index, reader).unwrap()
50 }
51
52 #[inline]
54 fn read_unwrap_at(&self, index: usize, reader: &Reader) -> T {
55 self.read_at(index, reader).unwrap()
56 }
57
58 #[inline]
60 fn read_with(&self, index: I, reader: &Reader) -> Result<T> {
61 self.read_at(index.to_usize(), reader)
62 }
63
64 #[inline]
67 fn read(&self, index: I) -> Result<T> {
68 self.read_with(index, &self.create_reader())
69 }
70
71 fn read_at(&self, index: usize, reader: &Reader) -> Result<T>;
73
74 #[inline]
76 fn get_or_read_unwrap(&self, index: I) -> T {
77 self.get_or_read(index).unwrap().unwrap()
78 }
79
80 #[inline]
83 fn get_or_read(&self, index: I) -> Result<Option<T>> {
84 self.get_or_read_with(index, &self.create_reader())
85 }
86
87 #[inline]
89 fn get_or_read_with(&self, index: I, reader: &Reader) -> Result<Option<T>> {
90 self.get_or_read_at(index.to_usize(), reader)
91 }
92
93 #[inline]
95 fn get_or_read_at(&self, index: usize, reader: &Reader) -> Result<Option<T>> {
96 let holes = self.holes();
98 if !holes.is_empty() && holes.contains(&index) {
99 return Ok(None);
100 }
101
102 let stored_len = self.stored_len();
103
104 if index >= stored_len {
106 return Ok(self.get_pushed_at(index, stored_len).cloned());
107 }
108
109 let updated = self.updated();
111 if !updated.is_empty()
112 && let Some(updated_value) = updated.get(&index)
113 {
114 return Ok(Some(updated_value.clone()));
115 }
116
117 Ok(Some(self.read_at(index, reader)?))
119 }
120
121 #[inline]
124 fn get_pushed_or_read(&self, index: I) -> Result<Option<T>> {
125 self.get_pushed_or_read_with(index, &self.create_reader())
126 }
127
128 #[inline]
130 fn get_pushed_or_read_with(&self, index: I, reader: &Reader) -> Result<Option<T>> {
131 self.get_pushed_or_read_at(index.to_usize(), reader)
132 }
133
134 #[inline]
136 fn get_pushed_or_read_at(&self, index: usize, reader: &Reader) -> Result<Option<T>> {
137 let stored_len = self.stored_len();
138
139 if index >= stored_len {
140 return Ok(self.get_pushed_at(index, stored_len).cloned());
141 }
142
143 Ok(Some(self.read_at(index, reader)?))
144 }
145
146 #[inline(always)]
148 fn get_pushed_at(&self, index: usize, stored_len: usize) -> Option<&T> {
149 let pushed = self.pushed();
150 let offset = index.checked_sub(stored_len)?;
151 pushed.get(offset)
152 }
153
154 #[inline]
155 fn len_(&self) -> usize {
156 self.stored_len() + self.pushed_len()
157 }
158
159 fn prev_pushed(&self) -> &[T];
160 fn mut_prev_pushed(&mut self) -> &mut Vec<T>;
161 fn pushed(&self) -> &[T];
162 fn mut_pushed(&mut self) -> &mut Vec<T>;
163 #[inline]
164 fn pushed_len(&self) -> usize {
165 self.pushed().len()
166 }
167 #[inline]
168 fn push(&mut self, value: T) {
169 self.mut_pushed().push(value)
170 }
171
172 #[inline]
173 fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
174 let index_usize = index.to_usize();
175 let len = self.len();
176
177 if index_usize == len {
178 self.push(value);
179 return Ok(());
180 }
181
182 if index_usize < len {
184 return Ok(());
185 }
186
187 debug_assert!(
189 false,
190 "Index too high: idx={}, len={}, header={:?}, region={}",
191 index_usize,
192 len,
193 self.header(),
194 self.region().index()
195 );
196
197 Err(Error::IndexTooHigh)
198 }
199
200 #[inline]
201 fn forced_push_at(&mut self, index: I, value: T, exit: &Exit) -> Result<()> {
202 self.forced_push_at_(index.to_usize(), value, exit)
203 }
204
205 #[inline]
206 fn forced_push_at_(&mut self, index: usize, value: T, exit: &Exit) -> Result<()> {
207 match self.len().cmp(&index.to_usize()) {
208 Ordering::Less => {
209 return Err(Error::IndexTooHigh);
210 }
211 ord => {
212 if ord == Ordering::Greater {
213 self.truncate_if_needed_(index)?;
214 }
215 self.push(value);
216 }
217 }
218
219 let pushed_bytes = self.pushed_len() * Self::SIZE_OF_T;
220 if pushed_bytes >= MAX_CACHE_SIZE {
221 self.safe_flush(exit)?;
223 }
224
225 Ok(())
226 }
227
228 #[inline]
229 fn update_or_push(&mut self, index: I, value: T) -> Result<()> {
230 let len = self.len();
231 match len.cmp(&index.to_usize()) {
232 Ordering::Less => {
233 dbg!(index, value, len, self.header());
234 Err(Error::IndexTooHigh)
235 }
236 Ordering::Equal => {
237 self.push(value);
238 Ok(())
239 }
240 Ordering::Greater => self.update(index, value),
241 }
242 }
243
244 #[inline]
245 fn get_first_empty_index(&self) -> I {
246 self.holes()
247 .first()
248 .cloned()
249 .unwrap_or_else(|| self.len_())
250 .into()
251 }
252
253 #[inline]
254 fn fill_first_hole_or_push(&mut self, value: T) -> Result<I> {
255 Ok(
256 if let Some(hole) = self.mut_holes().pop_first().map(I::from) {
257 self.update(hole, value)?;
258 hole
259 } else {
260 self.push(value);
261 I::from(self.len() - 1)
262 },
263 )
264 }
265
266 fn holes(&self) -> &BTreeSet<usize>;
267 fn mut_holes(&mut self) -> &mut BTreeSet<usize>;
268
269 fn prev_holes(&self) -> &BTreeSet<usize>;
270 fn mut_prev_holes(&mut self) -> &mut BTreeSet<usize>;
271
272 fn take(&mut self, index: I, reader: &Reader) -> Result<Option<T>> {
273 let opt = self.get_or_read_with(index, reader)?;
274 if opt.is_some() {
275 self.unchecked_delete(index);
276 }
277 Ok(opt)
278 }
279
280 #[inline]
281 fn delete(&mut self, index: I) {
282 if index.to_usize() < self.len() {
283 self.unchecked_delete(index);
284 }
285 }
286 #[inline]
287 #[doc(hidden)]
288 fn unchecked_delete(&mut self, index: I) {
289 let uindex = index.to_usize();
290 let updated = self.mut_updated();
291 if !updated.is_empty() {
292 updated.remove(&uindex);
293 }
294 self.mut_holes().insert(uindex);
295 }
296
297 fn updated(&self) -> &BTreeMap<usize, T>;
298 fn mut_updated(&mut self) -> &mut BTreeMap<usize, T>;
299
300 fn prev_updated(&self) -> &BTreeMap<usize, T>;
301 fn mut_prev_updated(&mut self) -> &mut BTreeMap<usize, T>;
302
303 #[inline]
304 fn update(&mut self, index: I, value: T) -> Result<()> {
305 self.update_(index.to_usize(), value)
306 }
307
308 #[inline]
309 fn update_(&mut self, index: usize, value: T) -> Result<()> {
310 let stored_len = self.stored_len();
311
312 if index >= stored_len {
313 if let Some(prev) = self.mut_pushed().get_mut(index - stored_len) {
314 *prev = value;
315 return Ok(());
316 } else {
317 return Err(Error::IndexTooHigh);
318 }
319 }
320
321 let holes = self.mut_holes();
322 if !holes.is_empty() {
323 holes.remove(&index);
324 }
325
326 self.mut_updated().insert(index, value);
327
328 Ok(())
329 }
330
331 fn reset(&mut self) -> Result<()>;
332
333 #[inline]
334 fn reset_(&mut self) -> Result<()> {
335 self.truncate_if_needed_(0)
336 }
337
338 fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> {
339 if version != self.header().computed_version() {
340 self.mut_header().update_computed_version(version);
341 if !self.is_empty() {
342 self.reset()?;
343 }
344 }
345
346 if self.is_empty() {
347 info!(
348 "Computing {}_to_{}...",
349 self.index_type_to_string(),
350 self.name()
351 )
352 }
353
354 Ok(())
355 }
356
357 #[inline]
358 fn is_pushed_empty(&self) -> bool {
359 self.pushed_len() == 0
360 }
361
362 #[inline]
363 fn has(&self, index: I) -> bool {
364 self.has_(index.to_usize())
365 }
366 #[inline]
367 fn has_(&self, index: usize) -> bool {
368 index < self.len_()
369 }
370
371 fn prev_stored_len(&self) -> usize;
372 fn mut_prev_stored_len(&mut self) -> &mut usize;
373 #[doc(hidden)]
374 fn update_stored_len(&self, val: usize);
375
376 fn truncate_if_needed(&mut self, index: I) -> Result<()> {
377 self.truncate_if_needed_(index.to_usize())
378 }
379 fn truncate_if_needed_(&mut self, index: usize) -> Result<()> {
380 let stored_len = self.stored_len();
381 let pushed_len = self.pushed_len();
382 let len = stored_len + pushed_len;
383
384 if index >= len {
385 return Ok(());
386 }
387
388 if self.holes().last().is_some_and(|&h| h >= index) {
389 self.mut_holes().retain(|&i| i < index);
390 }
391
392 if self
393 .updated()
394 .last_key_value()
395 .is_some_and(|(&k, _)| k >= index)
396 {
397 self.mut_updated().retain(|&i, _| i < index);
398 }
399
400 if index <= stored_len {
401 self.mut_pushed().clear();
402 } else {
403 self.mut_pushed().truncate(index - stored_len);
404 }
405
406 if index >= stored_len {
407 return Ok(());
408 }
409
410 self.update_stored_len(index);
411
412 Ok(())
413 }
414
415 #[inline]
416 fn truncate_if_needed_with_stamp(&mut self, index: I, stamp: Stamp) -> Result<()> {
417 self.update_stamp(stamp);
418 self.truncate_if_needed(index)
419 }
420
421 fn deserialize_then_undo_changes(&mut self, bytes: &[u8]) -> Result<()> {
422 let mut pos = 0;
423 let mut len = 8;
424
425 let prev_stamp = u64::read_from_bytes(&bytes[..pos + len])?;
426 self.mut_header().update_stamp(Stamp::new(prev_stamp));
427 pos += len;
428
429 let prev_stored_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
430 pos += len;
431
432 let _stored_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
433 pos += len;
434
435 let current_stored_len = self.stored_len();
436
437 if prev_stored_len < current_stored_len {
439 self.truncate_if_needed_(prev_stored_len)?;
441 } else if prev_stored_len > current_stored_len {
442 self.update_stored_len(prev_stored_len);
444 }
445 let truncated_count = usize::read_from_bytes(&bytes[pos..pos + len])?;
448 pos += len;
449
450 self.mut_pushed().clear();
452
453 if truncated_count > 0 {
460 len = Self::SIZE_OF_T * truncated_count;
461 let truncated_values = bytes[pos..pos + len]
462 .chunks(Self::SIZE_OF_T)
463 .map(|b| T::read_from_bytes(b).map_err(|_| Error::ZeroCopyError))
464 .collect::<Result<Vec<_>>>()?;
465 pos += len;
466
467 let start_index = prev_stored_len - truncated_count;
469 for (i, val) in truncated_values.into_iter().enumerate() {
470 self.mut_updated().insert(start_index + i, val);
471 }
472 }
473
474 len = 8;
475 let prev_pushed_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
476 pos += len;
477 len = Self::SIZE_OF_T * prev_pushed_len;
478 let mut prev_pushed = bytes[pos..pos + len]
479 .chunks(Self::SIZE_OF_T)
480 .map(|s| T::read_from_bytes(s).map_err(|_| Error::ZeroCopyError))
481 .collect::<Result<Vec<_>>>()?;
482 pos += len;
483 self.mut_pushed().append(&mut prev_pushed);
484
485 len = 8;
486 let pushed_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
487 pos += len;
488 len = Self::SIZE_OF_T * pushed_len;
489 let _pushed = bytes[pos..pos + len]
490 .chunks(Self::SIZE_OF_T)
491 .map(|s| T::read_from_bytes(s).map_err(|_| Error::ZeroCopyError))
492 .collect::<Result<Vec<_>>>()?;
493 pos += len;
494
495 len = 8;
496 let prev_modified_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
497 pos += len;
498 len = size_of::<usize>() * prev_modified_len;
499 let prev_indexes = bytes[pos..pos + len].chunks(8);
500 pos += len;
501 len = Self::SIZE_OF_T * prev_modified_len;
502 let prev_values = bytes[pos..pos + len].chunks(Self::SIZE_OF_T);
503 let _prev_updated: BTreeMap<usize, T> = prev_indexes
504 .zip(prev_values)
505 .map(|(i, v)| {
506 let idx = usize::read_from_bytes(i).map_err(|_| Error::ZeroCopyError)?;
507 let val = T::read_from_bytes(v).map_err(|_| Error::ZeroCopyError)?;
508 Ok((idx, val))
509 })
510 .collect::<Result<_>>()?;
511 pos += len;
512
513 len = 8;
514 let modified_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
515 pos += len;
516 len = size_of::<usize>() * modified_len;
517 let indexes = bytes[pos..pos + len].chunks(8);
518 pos += len;
519 len = Self::SIZE_OF_T * modified_len;
520 let values = bytes[pos..pos + len].chunks(Self::SIZE_OF_T);
521 let old_values_to_restore: BTreeMap<usize, T> = indexes
522 .zip(values)
523 .map(|(i, v)| {
524 let idx = usize::read_from_bytes(i).map_err(|_| Error::ZeroCopyError)?;
525 let val = T::read_from_bytes(v).map_err(|_| Error::ZeroCopyError)?;
526 Ok((idx, val))
527 })
528 .collect::<Result<_>>()?;
529 pos += len;
530
531 len = 8;
532 let prev_holes_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
533 pos += len;
534 len = size_of::<usize>() * prev_holes_len;
535 let prev_holes = bytes[pos..pos + len]
536 .chunks(8)
537 .map(|b| usize::read_from_bytes(b).map_err(|_| Error::ZeroCopyError))
538 .collect::<Result<BTreeSet<_>>>()?;
539 pos += len;
540
541 len = 8;
542 let holes_len = usize::read_from_bytes(&bytes[pos..pos + len])?;
543 pos += len;
544 len = size_of::<usize>() * holes_len;
545 let _holes = bytes[pos..pos + len]
546 .chunks(8)
547 .map(|b| usize::read_from_bytes(b).map_err(|_| Error::ZeroCopyError))
548 .collect::<Result<BTreeSet<_>>>()?;
549
550 if !self.holes().is_empty() || !self.prev_holes().is_empty() || !prev_holes.is_empty() {
551 *self.mut_holes() = prev_holes.clone();
552 *self.mut_prev_holes() = prev_holes;
553 }
554
555 old_values_to_restore
557 .into_iter()
558 .try_for_each(|(i, v)| self.update_(i, v))?;
559
560 *self.mut_prev_updated() = self.updated().clone();
562 *self.mut_prev_pushed() = self.pushed().to_vec();
563 Ok(())
566 }
567
568 #[inline]
569 fn stamped_flush_maybe_with_changes(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> {
570 if with_changes {
571 self.stamped_flush_with_changes(stamp)
572 } else {
573 self.stamped_flush(stamp)
574 }
575 }
576
577 fn changes_path(&self) -> PathBuf {
578 self.db_path().join(self.index_to_name()).join("changes")
579 }
580
581 #[inline]
582 fn stamped_flush_with_changes(&mut self, stamp: Stamp) -> Result<()> {
583 let saved_stamped_changes = self.saved_stamped_changes();
584
585 if saved_stamped_changes == 0 {
586 return self.stamped_flush(stamp);
587 }
588
589 let path = self.changes_path();
590
591 fs::create_dir_all(&path)?;
592
593 let files: BTreeMap<Stamp, PathBuf> = fs::read_dir(&path)?
594 .filter_map(|entry| {
595 let path = entry.ok()?.path();
596 let name = path.file_name()?.to_str()?;
597 if let Ok(s) = name.parse::<u64>().map(Stamp::from) {
598 if s < stamp {
599 Some((s, path))
600 } else {
601 let _ = fs::remove_file(path);
602 None
603 }
604 } else {
605 None
606 }
607 })
608 .collect();
609
610 for (_, path) in files.iter().take(
611 files
612 .len()
613 .saturating_sub((saved_stamped_changes - 1) as usize),
614 ) {
615 fs::remove_file(path)?;
616 }
617
618 let holes_before_flush = self.holes().clone();
620
621 fs::write(
622 path.join(u64::from(stamp).to_string()),
623 self.serialize_changes()?,
624 )?;
625
626 self.stamped_flush(stamp)?;
627
628 *self.mut_prev_stored_len() = self.stored_len(); *self.mut_prev_pushed() = vec![]; *self.mut_prev_updated() = BTreeMap::new(); *self.mut_prev_holes() = holes_before_flush;
635
636 Ok(())
637 }
638
639 fn rollback_before(&mut self, stamp: Stamp) -> Result<Stamp> {
640 if self.stamp() < stamp {
641 return Ok(self.stamp());
642 }
643
644 let dir = fs::read_dir(self.changes_path())?
645 .filter_map(|entry| {
646 let path = entry.ok()?.path();
647 let name = path.file_name()?.to_str()?;
648 if let Ok(stamp) = name.parse::<u64>().map(Stamp::from) {
649 Some((stamp, path))
650 } else {
651 None
652 }
653 })
654 .collect::<BTreeMap<Stamp, PathBuf>>();
655
656 let mut iter = dir.range(..=self.stamp());
657
658 while let Some((&s, _)) = iter.next_back()
659 && self.stamp() >= stamp
660 {
661 if s != self.stamp() {
662 dbg!((s, self.stamp(), stamp));
663 return Err(Error::Str("File stamp should be the same as vec stamp"));
664 }
665 self.rollback()?;
666 }
667
668 *self.mut_prev_stored_len() = self.stored_len();
670 *self.mut_prev_pushed() = self.pushed().to_vec();
671 *self.mut_prev_updated() = self.updated().clone();
672 *self.mut_prev_holes() = self.holes().clone();
673
674 Ok(self.stamp())
675 }
676
677 fn is_dirty(&mut self) -> bool {
678 !self.is_pushed_empty() || !self.updated().is_empty()
679 }
680
681 fn rollback(&mut self) -> Result<()> {
682 let path = self
683 .changes_path()
684 .join(u64::from(self.stamp()).to_string());
685 let bytes = fs::read(&path)?;
686 self.deserialize_then_undo_changes(&bytes)
687 }
688
689 fn reset_unsaved(&mut self) {
690 self.mut_pushed().clear();
691 if !self.holes().is_empty() {
692 self.mut_holes().clear();
693 }
694 if !self.updated().is_empty() {
695 self.mut_updated().clear();
696 }
697 }
698
699 fn collect_holed(&self) -> Result<Vec<Option<T>>> {
700 self.collect_holed_range(None, None)
701 }
702
703 fn collect_holed_range(
704 &self,
705 from: Option<usize>,
706 to: Option<usize>,
707 ) -> Result<Vec<Option<T>>> {
708 let len = self.len();
709 let from = from.unwrap_or_default();
710 let to = to.map_or(len, |to| to.min(len));
711
712 if from >= len || from >= to {
713 return Ok(vec![]);
714 }
715
716 let reader = self.create_reader();
717
718 (from..to)
719 .map(|i| self.get_or_read_at(i, &reader))
720 .collect::<Result<Vec<_>>>()
721 }
722
723 fn vec_region_name(&self) -> String {
724 Self::vec_region_name_(self.name())
725 }
726 fn vec_region_name_(name: &str) -> String {
728 format!("{}{SEPARATOR}{}", I::to_string(), name)
729 }
730
731 fn holes_region_name(&self) -> String {
732 Self::holes_region_name_(self.name())
733 }
734 fn holes_region_name_(name: &str) -> String {
735 format!("{}_holes", Self::vec_region_name_(name))
736 }
737}