1use crate::{
48 column::ColId,
49 display::hex,
50 error::Result,
51 log::{LogOverlays, LogQuery, LogReader, LogWriter},
52 options::ColumnOptions as Options,
53 parking_lot::RwLock,
54 table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE},
55};
56use std::{
57 convert::TryInto,
58 mem::MaybeUninit,
59 sync::{
60 atomic::{AtomicBool, AtomicU64, Ordering},
61 Arc,
62 },
63};
64
65pub const SIZE_TIERS: usize = 1usize << SIZE_TIERS_BITS;
66pub const SIZE_TIERS_BITS: u8 = 8;
67pub const COMPRESSED_MASK: u16 = 0x80_00;
68pub const MAX_ENTRY_SIZE: usize = 0x7ff8; pub const MIN_ENTRY_SIZE: usize = 32;
70const REFS_SIZE: usize = 4;
71const SIZE_SIZE: usize = 2;
72const INDEX_SIZE: usize = 8;
73const MAX_ENTRY_BUF_SIZE: usize = 0x8000;
74
75const TOMBSTONE: &[u8] = &[0xff, 0xff];
76const MULTIPART_V4: &[u8] = &[0xff, 0xfe];
77const MULTIHEAD_V4: &[u8] = &[0xff, 0xfd];
78const MULTIPART: &[u8] = &[0xfe, 0xff];
79const MULTIHEAD: &[u8] = &[0xfd, 0xff];
80const MULTIHEAD_COMPRESSED: &[u8] = &[0xfd, 0x7f];
81const LOCKED_REF: u32 = u32::MAX;
83
84const MULTIPART_ENTRY_SIZE: u16 = 4096;
85
86pub type Value = Vec<u8>;
87
88#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
89pub struct TableId(u16);
90
91impl TableId {
92 pub fn new(col: ColId, size_tier: u8) -> TableId {
93 TableId(((col as u16) << 8) | size_tier as u16)
94 }
95
96 pub fn from_u16(id: u16) -> TableId {
97 TableId(id)
98 }
99
100 pub fn col(&self) -> ColId {
101 (self.0 >> 8) as ColId
102 }
103
104 pub fn size_tier(&self) -> u8 {
105 (self.0 & 0xff) as u8
106 }
107
108 pub fn file_name(&self) -> String {
109 format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()]))
110 }
111
112 pub fn is_file_name(col: ColId, name: &str) -> bool {
113 name.starts_with(&format!("table_{col:02}_"))
114 }
115
116 pub fn as_u16(&self) -> u16 {
117 self.0
118 }
119
120 pub fn log_index(&self) -> usize {
121 self.col() as usize * SIZE_TIERS + self.size_tier() as usize
122 }
123
124 pub const fn max_log_tables(num_columns: usize) -> usize {
125 SIZE_TIERS * num_columns
126 }
127
128 pub fn from_log_index(i: usize) -> Self {
129 let col = i / SIZE_TIERS;
130 let tier = i % SIZE_TIERS;
131 Self::new(col as ColId, tier as u8)
132 }
133}
134
135impl std::fmt::Display for TableId {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "t{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
138 }
139}
140
141#[derive(Debug)]
142struct FreeEntries {
143 stack: Vec<u64>,
144}
145
146#[derive(Debug)]
147pub struct ValueTable {
148 pub id: TableId,
149 pub entry_size: u16,
150 file: crate::file::TableFile,
151 filled: AtomicU64, written: AtomicU64, last_removed: AtomicU64,
154 dirty_header: AtomicBool,
155 needs_free_entries: bool,
156 free_entries: Option<RwLock<FreeEntries>>,
157 multipart: bool,
158 ref_counted: bool,
159 db_version: u32,
160}
161
162#[derive(Default, Clone, Copy)]
163struct Header([u8; 16]);
164
165impl Header {
166 fn last_removed(&self) -> u64 {
167 u64::from_le_bytes(self.0[0..INDEX_SIZE].try_into().unwrap())
168 }
169 fn set_last_removed(&mut self, last_removed: u64) {
170 self.0[0..INDEX_SIZE].copy_from_slice(&last_removed.to_le_bytes());
171 }
172 fn filled(&self) -> u64 {
173 u64::from_le_bytes(self.0[INDEX_SIZE..INDEX_SIZE * 2].try_into().unwrap())
174 }
175 fn set_filled(&mut self, filled: u64) {
176 self.0[INDEX_SIZE..INDEX_SIZE * 2].copy_from_slice(&filled.to_le_bytes());
177 }
178}
179
180pub struct Entry<B: AsRef<[u8]>>(usize, B);
181#[cfg(feature = "loom")]
182pub type FullEntry = Entry<Vec<u8>>;
183#[cfg(not(feature = "loom"))]
184pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>;
185pub type EntryRef<'a> = Entry<&'a [u8]>;
186type PartialEntry = Entry<[u8; 18]>;
187type PartialKeyEntry = Entry<[u8; 48]>; impl<const C: usize> Entry<[u8; C]> {
190 #[inline(always)]
191 #[allow(clippy::uninit_assumed_init)]
192 pub fn new_uninit() -> Self {
193 Entry(0, unsafe { MaybeUninit::uninit().assume_init() })
194 }
195}
196
197#[cfg(feature = "loom")]
198impl Entry<Vec<u8>> {
199 pub fn new_uninit_full_entry() -> Self {
200 Entry(0, vec![0; MAX_ENTRY_BUF_SIZE])
201 }
202}
203
204#[cfg(not(feature = "loom"))]
205impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> {
206 pub fn new_uninit_full_entry() -> Self {
207 Self::new_uninit()
208 }
209}
210
211impl<B: AsRef<[u8]>> Entry<B> {
212 #[inline(always)]
213 pub fn check_remaining_len(
214 &self,
215 len: usize,
216 error: impl Fn() -> crate::error::Error,
217 ) -> Result<()> {
218 if self.0 + len > self.1.as_ref().len() {
219 return Err(error())
220 }
221 Ok(())
222 }
223
224 #[inline(always)]
225 pub fn new(data: B) -> Self {
226 Entry(0, data)
227 }
228
229 pub fn set_offset(&mut self, offset: usize) {
230 self.0 = offset;
231 }
232
233 pub fn offset(&self) -> usize {
234 self.0
235 }
236
237 pub fn size(&self) -> usize {
238 self.1.as_ref().len() - self.0
239 }
240
241 pub fn read_slice(&mut self, size: usize) -> &[u8] {
242 let start = self.0;
243 self.0 += size;
244 &self.1.as_ref()[start..self.0]
245 }
246
247 fn is_tombstone(&self) -> bool {
248 &self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE
249 }
250
251 fn is_multipart(&self) -> bool {
252 &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART
253 }
254
255 fn is_multipart_v4(&self) -> bool {
256 &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4
257 }
258
259 fn is_multihead_compressed(&self) -> bool {
260 &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED
261 }
262
263 fn is_multihead(&self) -> bool {
264 self.is_multihead_compressed() || &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD
265 }
266
267 fn is_multihead_v4(&self) -> bool {
268 &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4
269 }
270
271 fn is_multi(&self, db_version: u32) -> bool {
272 self.is_multipart() ||
273 self.is_multihead() ||
274 (db_version <= 4 && (self.is_multipart_v4() || self.is_multihead_v4()))
275 }
276
277 fn read_size(&mut self) -> (u16, bool) {
278 let size = u16::from_le_bytes(self.read_slice(SIZE_SIZE).try_into().unwrap());
279 let compressed = (size & COMPRESSED_MASK) > 0;
280 (size & !COMPRESSED_MASK, compressed)
281 }
282
283 fn skip_size(&mut self) {
284 self.0 += SIZE_SIZE;
285 }
286
287 pub fn read_u64(&mut self) -> u64 {
288 u64::from_le_bytes(self.read_slice(8).try_into().unwrap())
289 }
290
291 fn read_next(&mut self) -> u64 {
292 self.read_u64()
293 }
294
295 pub fn skip_u64(&mut self) {
296 self.0 += 8;
297 }
298
299 pub fn skip_next(&mut self) {
300 self.skip_u64()
301 }
302
303 pub fn read_u32(&mut self) -> u32 {
304 u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap())
305 }
306
307 fn read_rc(&mut self) -> u32 {
308 self.read_u32()
309 }
310
311 fn read_partial(&mut self) -> &[u8] {
312 self.read_slice(PARTIAL_SIZE)
313 }
314}
315
316impl<B: AsRef<[u8]> + AsMut<[u8]>> Entry<B> {
317 pub fn write_slice(&mut self, buf: &[u8]) {
318 let start = self.0;
319 self.0 += buf.len();
320 self.1.as_mut()[start..self.0].copy_from_slice(buf);
321 }
322
323 fn write_tombstone(&mut self) {
324 self.write_slice(TOMBSTONE);
325 }
326
327 fn write_multipart(&mut self) {
328 self.write_slice(MULTIPART);
329 }
330
331 fn write_multihead(&mut self) {
332 self.write_slice(MULTIHEAD);
333 }
334
335 fn write_multihead_compressed(&mut self) {
336 self.write_slice(MULTIHEAD_COMPRESSED);
337 }
338
339 fn write_size(&mut self, mut size: u16, compressed: bool) {
340 if compressed {
341 size |= COMPRESSED_MASK;
342 }
343 self.write_slice(&size.to_le_bytes());
344 }
345 pub fn write_u64(&mut self, next_index: u64) {
346 self.write_slice(&next_index.to_le_bytes());
347 }
348
349 fn write_next(&mut self, next_index: u64) {
350 self.write_u64(next_index)
351 }
352
353 pub fn write_u32(&mut self, next_index: u32) {
354 self.write_slice(&next_index.to_le_bytes());
355 }
356
357 fn write_rc(&mut self, rc: u32) {
358 self.write_slice(&rc.to_le_bytes());
359 }
360
361 pub fn inner_mut(&mut self) -> &mut B {
362 &mut self.1
363 }
364}
365
366impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for Entry<B> {
367 fn as_mut(&mut self) -> &mut [u8] {
368 self.1.as_mut()
369 }
370}
371
372impl<B: AsRef<[u8]>> AsRef<[u8]> for Entry<B> {
373 fn as_ref(&self) -> &[u8] {
374 self.1.as_ref()
375 }
376}
377
378impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::Index<std::ops::Range<usize>> for Entry<B> {
379 type Output = [u8];
380
381 fn index(&self, index: std::ops::Range<usize>) -> &[u8] {
382 &self.1.as_ref()[index]
383 }
384}
385
386impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::IndexMut<std::ops::Range<usize>> for Entry<B> {
387 fn index_mut(&mut self, index: std::ops::Range<usize>) -> &mut [u8] {
388 &mut self.1.as_mut()[index]
389 }
390}
391
392enum LockedSlice<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> {
393 FromOverlay(O),
394 FromFile(F),
395}
396
397impl<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> LockedSlice<O, F> {
398 fn as_slice(&self) -> &[u8] {
399 match self {
400 LockedSlice::FromOverlay(slice) => &*slice,
401 LockedSlice::FromFile(slice) => &*slice,
402 }
403 }
404}
405
406impl ValueTable {
407 pub fn open(
408 path: Arc<std::path::PathBuf>,
409 id: TableId,
410 entry_size: Option<u16>,
411 options: &Options,
412 db_version: u32,
413 ) -> Result<ValueTable> {
414 let (multipart, entry_size) = match entry_size {
415 Some(s) => (false, s),
416 None => (true, MULTIPART_ENTRY_SIZE),
417 };
418 assert!(entry_size >= MIN_ENTRY_SIZE as u16);
419 assert!(entry_size <= MAX_ENTRY_SIZE as u16);
420
421 let mut filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path);
422 filepath.push(id.file_name());
423 let file = crate::file::TableFile::open(filepath, entry_size, id)?;
424 let mut filled = 1;
425 let mut last_removed = 0;
426 if file.map.read().is_some() {
427 let mut header = Header::default();
428 file.read_at(&mut header.0, 0)?;
429 last_removed = header.last_removed();
430 filled = header.filled();
431 if filled == 0 {
432 filled = 1;
433 }
434 if last_removed >= filled {
435 return Err(crate::error::Error::Corruption(format!(
436 "Bad removed ref {} out of {}",
437 last_removed, filled
438 )))
439 }
440 log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}, removed={}", id, filled, entry_size, last_removed);
441 }
442
443 Ok(ValueTable {
444 id,
445 entry_size,
446 file,
447 filled: AtomicU64::new(filled),
448 written: AtomicU64::new(filled),
449 last_removed: AtomicU64::new(last_removed),
450 dirty_header: AtomicBool::new(false),
451 needs_free_entries: options.multitree,
452 free_entries: None,
453 multipart,
454 ref_counted: options.ref_counted,
455 db_version,
456 })
457 }
458
459 pub fn init_table_data(&mut self) -> Result<()> {
460 let free_entries = if self.needs_free_entries {
461 let mut stack: Vec<u64> = Default::default();
462
463 let filled = self.filled.load(Ordering::Relaxed);
464 let last_removed = self.last_removed.load(Ordering::Relaxed);
465
466 let mut next = last_removed;
467 while next != 0 {
468 if next >= filled {
469 return Err(crate::error::Error::Corruption(format!(
470 "Bad removed ref {} out of {}",
471 next, filled
472 )))
473 }
474
475 stack.insert(0, next);
476
477 let mut buf = PartialEntry::new_uninit();
478 self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
479 buf.skip_size();
480 next = buf.read_next();
481 }
482
483 Some(RwLock::new(FreeEntries { stack }))
484 } else {
485 None
486 };
487 self.free_entries = free_entries;
488 Ok(())
489 }
490
491 pub fn value_size(&self, key: &TableKey) -> Option<u16> {
492 let base = self.entry_size - SIZE_SIZE as u16 - self.ref_size() as u16;
493 let k_encoded = key.encoded_size() as u16;
494 if base < k_encoded {
495 None
496 } else {
497 Some(base - k_encoded)
498 }
499 }
500
501 #[inline(always)]
502 fn parse_head(
503 &self,
504 key: &mut TableKeyQuery,
505 index: u64,
506 mut buf: &mut EntryRef,
507 ) -> Result<(u32, Option<u64>, u64, u16, bool)> {
508 let mut compressed = false;
510 let mut rc = 1;
511 let entry_size = self.entry_size as usize;
512 let mut total_size = None;
513
514 if buf.is_tombstone() {
515 return Ok((0, None, 0, 0, false))
516 }
517
518 if self.multipart && !buf.is_multihead() {
519 return Ok((0, None, 0, 0, false))
521 }
522
523 let (mut size, next) = if self.multipart && buf.is_multi(self.db_version) {
524 if self.db_version > 6 && buf.is_multihead_compressed() {
525 compressed = true;
526 }
527 buf.skip_size();
528 let mut buf_size = entry_size - SIZE_SIZE - INDEX_SIZE;
529 if self.db_version > 8 {
530 total_size = Some(buf.read_u64());
531 buf_size -= 8;
532 }
533 let next = buf.read_next();
534 (buf_size as u16, next)
535 } else {
536 let (size, read_compressed) = buf.read_size();
537 compressed = read_compressed;
538 total_size = Some(size as u64);
539 (size, 0)
540 };
541
542 if self.ref_counted {
543 if size < REFS_SIZE as u16 {
544 return Err(crate::error::Error::Corruption(format!(
545 "{}: Corrupted entry at {}. Size {}, expected at least {}",
546 self.id, index, size, REFS_SIZE,
547 )))
548 }
549 size -= REFS_SIZE as u16;
550 rc = buf.read_rc();
551 }
552 match key {
553 TableKeyQuery::Fetch(Some(to_fetch)) => {
554 size -= PARTIAL_SIZE as u16;
555 **to_fetch = TableKey::fetch_partial(&mut buf);
556 },
557 TableKeyQuery::Fetch(None) => (),
558 TableKeyQuery::Check(k) => {
559 size -= k.encoded_size() as u16;
560 let to_fetch = k.fetch(&mut buf);
561 if !k.compare(&to_fetch) {
562 log::debug!(
563 target: "parity-db",
564 "{}: Key mismatch at {}. Expected {}, got {:?}, size = {}",
565 self.id,
566 index,
567 k,
568 to_fetch.as_ref().map(hex),
569 self.entry_size,
570 );
571 return Ok((0, total_size, 0, 0, false))
572 }
573 },
574 }
575
576 Ok((rc, total_size, next, size, compressed))
577 }
578
579 #[inline(always)]
581 fn entry_info(
582 &self,
583 key: &mut TableKeyQuery,
584 index: u64,
585 log: &impl LogQuery,
586 ) -> Result<(u32, Option<u64>, bool)> {
587 let entry_size = self.entry_size as usize;
588
589 let vbuf = log.value_ref(self.id, index);
590 let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
591 log::trace!(
592 target: "parity-db",
593 "{}: Found in overlay {}",
594 self.id,
595 index,
596 );
597 LockedSlice::FromOverlay(buf)
598 } else {
599 log::trace!(
600 target: "parity-db",
601 "{}: Query slot {}",
602 self.id,
603 index,
604 );
605 let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
606 LockedSlice::FromFile(vbuf)
607 };
608 let mut buf = EntryRef::new(buf.as_slice());
609 let (rc, total_size, _next, _size, compressed) = self.parse_head(key, index, &mut buf)?;
610 Ok((rc, total_size, compressed))
611 }
612
613 #[inline(always)]
615 fn read(
616 &self,
617 key: &mut TableKeyQuery,
618 mut index: u64,
619 log: &impl LogQuery,
620 ) -> Result<(u32, bool, Vec<u8>)> {
622 let mut part = 0;
623 let mut compressed = false;
624 let mut rc = 1;
625 let entry_size = self.entry_size as usize;
626 let mut result = Vec::new();
627 loop {
628 let vbuf = log.value_ref(self.id, index);
629 let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
630 log::trace!(
631 target: "parity-db",
632 "{}: Found in overlay {}",
633 self.id,
634 index,
635 );
636 LockedSlice::FromOverlay(buf)
637 } else {
638 log::trace!(
639 target: "parity-db",
640 "{}: Query slot {}",
641 self.id,
642 index,
643 );
644 let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
645 LockedSlice::FromFile(vbuf)
646 };
647 let mut buf = EntryRef::new(buf.as_slice());
648
649 let (buf_size, next) = {
650 if part == 0 {
651 let (hrc, total_size, n, s, c) = self.parse_head(key, index, &mut buf)?;
652 if hrc == 0 {
653 return Ok((0, false, vec![]));
654 }
655 rc = hrc;
656 if let Some(s) = total_size {
657 result.reserve(s as usize);
658 }
659 compressed = c;
660 (s as usize, n)
661 } else {
662 if self.multipart && buf.is_multi(self.db_version) {
663 buf.skip_size();
664 let next = buf.read_next();
665 (entry_size - SIZE_SIZE - INDEX_SIZE, next)
666 } else {
667 let (size, read_compressed) = buf.read_size();
668 if part == 0 || self.db_version <= 6 {
669 compressed = read_compressed;
670 }
671 (size as usize, 0)
672 }
673 }
674 };
675
676 if buf.size() < buf_size {
677 return Err(crate::error::Error::Corruption(format!(
678 "Unexpected entry size. Expected at least {} bytes",
679 buf_size
680 )))
681 }
682
683 result.extend_from_slice(buf.read_slice(buf_size));
684
685 if next == 0 {
686 break
687 }
688 part += 1;
689 index = next;
690 }
691 Ok((rc, compressed, result))
692 }
693
694 pub fn get(
695 &self,
696 key: &TableKey,
697 index: u64,
698 log: &impl LogQuery,
699 ) -> Result<Option<(Value, bool)>> {
700 if let Some((value, compressed, _)) =
701 self.query(&mut TableKeyQuery::Check(key), index, log)?
702 {
703 Ok(Some((value, compressed)))
704 } else {
705 Ok(None)
706 }
707 }
708
709 pub fn dump_entry(&self, index: u64) -> Result<Vec<u8>> {
710 let entry_size = self.entry_size as usize;
711 let mut buf = FullEntry::new_uninit_full_entry();
712 self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?;
713 Ok(buf[0..entry_size].to_vec())
714 }
715
716 pub fn query(
717 &self,
718 key: &mut TableKeyQuery,
719 index: u64,
720 log: &impl LogQuery,
721 ) -> Result<Option<(Value, bool, u32)>> {
722 let (rc, compressed, result) = self.read(key, index, log)?;
723 if rc > 0 {
724 return Ok(Some((result, compressed, rc)))
725 }
726 Ok(None)
727 }
728
729 #[allow(clippy::type_complexity)]
730 pub fn get_with_meta(
731 &self,
732 index: u64,
733 log: &impl LogQuery,
734 ) -> Result<Option<(Value, u32, [u8; PARTIAL_SIZE], bool)>> {
735 let mut query_key = Default::default();
736 if let Some((value, compressed, rc)) =
737 self.query(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?
738 {
739 return Ok(Some((value, rc, query_key, compressed)))
740 }
741 Ok(None)
742 }
743
744 pub fn size(
745 &self,
746 key: &TableKey,
747 index: u64,
748 log: &impl LogQuery,
749 ) -> Result<Option<(u32, bool)>> {
750 let (rc, size, compressed) = self.entry_info(&mut TableKeyQuery::Check(key), index, log)?;
751 if rc > 0 {
752 if let Some(size) = size {
753 return Ok(Some((size as u32, compressed)))
754 }
755 return self
756 .read(&mut TableKeyQuery::Check(key), index, log)
757 .map(|(_rc, compressed, value)| Some((value.len() as u32, compressed)))
758 }
759 Ok(None)
760 }
761
762 pub fn has_key_at(&self, index: u64, key: &TableKey, log: &LogWriter) -> Result<bool> {
763 match key {
764 TableKey::Partial(k) => Ok(match self.partial_key_at(index, log)? {
765 Some(existing_key) => &existing_key[..] == key::partial_key(k),
766 None => false,
767 }),
768 TableKey::NoHash => Ok(!self.is_tombstone(index, log)?),
769 }
770 }
771
772 pub fn partial_key_at(
773 &self,
774 index: u64,
775 log: &impl LogQuery,
776 ) -> Result<Option<[u8; PARTIAL_SIZE]>> {
777 let mut query_key = Default::default();
778 let (rc, _size, _compressed) =
779 self.entry_info(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?;
780 Ok(if rc == 0 { None } else { Some(query_key) })
781 }
782
783 pub fn is_tombstone(&self, index: u64, log: &impl LogQuery) -> Result<bool> {
784 let mut buf = PartialKeyEntry::new_uninit();
785 let buf = if log.value(self.id, index, buf.as_mut()) {
786 &mut buf
787 } else {
788 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
789 &mut buf
790 };
791 Ok(buf.is_tombstone())
792 }
793
794 pub fn read_next_free(&self, index: u64, log: &LogWriter) -> Result<u64> {
795 let mut buf = PartialEntry::new_uninit();
796 let filled = self.filled.load(Ordering::Relaxed);
797 if !log.value(self.id, index, buf.as_mut()) {
798 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
799 }
800 buf.skip_size();
801 let next = buf.read_next();
802 if next >= filled {
803 return Err(crate::error::Error::Corruption(format!(
804 "Bad removed ref {} out of {}",
805 next, filled
806 )))
807 }
808 Ok(next)
809 }
810
811 pub fn read_next_part(&self, index: u64, log: &LogWriter) -> Result<Option<u64>> {
812 let mut buf = PartialEntry::new_uninit();
813 if !log.value(self.id, index, buf.as_mut()) {
814 self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
815 }
816 if self.multipart && buf.is_multi(self.db_version) {
817 buf.skip_size();
818 if self.db_version > 8 && buf.is_multihead() {
819 buf.skip_u64();
820 }
821 let next = buf.read_next();
822 return Ok(Some(next))
823 }
824 Ok(None)
825 }
826
827 pub fn next_free(&self, log: &mut LogWriter) -> Result<u64> {
828 let free_entries_guard = if let Some(free_entries) = &self.free_entries {
829 Some(free_entries.write())
830 } else {
831 None
832 };
833
834 let filled = self.filled.load(Ordering::Relaxed);
835 let last_removed = self.last_removed.load(Ordering::Relaxed);
836 let index = if last_removed != 0 {
837 let next_removed = self.read_next_free(last_removed, log)?;
838 log::trace!(
839 target: "parity-db",
840 "{}: Inserting into removed slot {}",
841 self.id,
842 last_removed,
843 );
844 self.last_removed.store(next_removed, Ordering::Relaxed);
845 if let Some(mut free_entries) = free_entries_guard {
846 let last = free_entries.stack.pop().unwrap();
847 debug_assert_eq!(last, last_removed);
848 }
849 last_removed
850 } else {
851 log::trace!(
852 target: "parity-db",
853 "{}: Inserting into new slot {}",
854 self.id,
855 filled,
856 );
857 self.filled.store(filled + 1, Ordering::Relaxed);
858 filled
859 };
860 self.dirty_header.store(true, Ordering::Relaxed);
861 Ok(index)
862 }
863
864 pub fn claim_entries(&self, num: usize) -> Result<Vec<u64>> {
865 match &self.free_entries {
866 Some(free_entries) => {
867 let mut free_entries = free_entries.write();
868
869 let mut entries: Vec<u64> = Default::default();
870
871 for _i in 0..num {
872 let filled = self.filled.load(Ordering::Relaxed);
873 let last_removed = self.last_removed.load(Ordering::Relaxed);
874 let index = if last_removed != 0 {
875 let last = free_entries.stack.pop().unwrap();
876 debug_assert_eq!(last, last_removed);
877
878 let next_removed = *free_entries.stack.last().unwrap_or(&0u64);
879
880 self.last_removed.store(next_removed, Ordering::Relaxed);
881 last_removed
882 } else {
883 self.filled.store(filled + 1, Ordering::Relaxed);
884 filled
885 };
886 entries.push(index);
887 }
888 self.dirty_header.store(true, Ordering::Relaxed);
889
890 Ok(entries)
891 },
892 None =>
893 return Err(crate::error::Error::InvalidConfiguration(format!(
894 "claim_entries called without free_entries"
895 ))),
896 }
897 }
898
899 fn overwrite_chain(
900 &self,
901 key: &TableKey,
902 value: &[u8],
903 log: &mut LogWriter,
904 at: Option<u64>,
905 claimed: bool,
906 compressed: bool,
907 ) -> Result<u64> {
908 let mut remainder = value.len() + self.ref_size() + key.encoded_size();
909 let mut offset = 0;
910 let mut start = 0;
911 assert!(self.multipart || value.len() <= self.value_size(key).unwrap() as usize);
912 let (mut index, mut follow) = match at {
913 Some(index) => (index, !claimed),
914 None => (self.next_free(log)?, false),
915 };
916 loop {
917 let mut next_index = 0;
918 if follow {
919 match self.read_next_part(index, log)? {
921 Some(next) => {
922 next_index = next;
923 },
924 None => {
925 follow = false;
926 },
927 }
928 }
929 log::trace!(
930 target: "parity-db",
931 "{}: Writing slot {}: {}",
932 self.id,
933 index,
934 key,
935 );
936 let mut buf = FullEntry::new_uninit_full_entry();
937 let mut free_space = self.entry_size as usize - SIZE_SIZE;
938 let value_len = if remainder > free_space {
939 if !follow {
940 next_index = self.next_free(log)?
941 }
942 if start == 0 {
943 if compressed {
944 buf.write_multihead_compressed();
945 } else {
946 buf.write_multihead();
947 }
948 if self.db_version > 8 {
949 free_space -= 8;
950 buf.write_u64(value.len() as u64);
951 }
952 } else {
953 buf.write_multipart();
954 }
955 buf.write_next(next_index);
956 free_space - INDEX_SIZE
957 } else {
958 buf.write_size(remainder as u16, compressed);
959 remainder
960 };
961 let init_offset = buf.offset();
962 if offset == 0 {
963 if self.ref_counted {
964 buf.write_rc(1u32);
966 }
967 key.write(&mut buf);
968 }
969 let written = buf.offset() - init_offset;
970 buf.write_slice(&value[offset..offset + value_len - written]);
971 offset += value_len - written;
972 log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
973 remainder -= value_len;
974 if start == 0 {
975 start = index;
976 }
977 index = next_index;
978 if remainder == 0 {
979 if index != 0 {
980 self.clear_chain(index, log)?;
982 }
983 break
984 }
985 }
986
987 Ok(start)
988 }
989
990 fn clear_chain(&self, mut index: u64, log: &mut LogWriter) -> Result<()> {
991 loop {
992 match self.read_next_part(index, log)? {
993 Some(next) => {
994 self.clear_slot(index, log)?;
995 index = next;
996 },
997 None => {
998 self.clear_slot(index, log)?;
999 return Ok(())
1000 },
1001 }
1002 }
1003 }
1004
1005 fn clear_slot(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1006 let free_entries_guard = if let Some(free_entries) = &self.free_entries {
1007 Some(free_entries.write())
1008 } else {
1009 None
1010 };
1011
1012 let last_removed = self.last_removed.load(Ordering::Relaxed);
1013 log::trace!(
1014 target: "parity-db",
1015 "{}: Freeing slot {}",
1016 self.id,
1017 index,
1018 );
1019
1020 let mut buf = PartialEntry::new_uninit();
1021 buf.write_tombstone();
1022 buf.write_next(last_removed);
1023
1024 log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
1025 self.last_removed.store(index, Ordering::Relaxed);
1026 self.dirty_header.store(true, Ordering::Relaxed);
1027
1028 if let Some(mut free_entries) = free_entries_guard {
1029 free_entries.stack.push(index);
1030 }
1031
1032 Ok(())
1033 }
1034
1035 pub fn write_insert_plan(
1036 &self,
1037 key: &TableKey,
1038 value: &[u8],
1039 log: &mut LogWriter,
1040 compressed: bool,
1041 ) -> Result<u64> {
1042 self.overwrite_chain(key, value, log, None, false, compressed)
1043 }
1044
1045 pub fn write_replace_plan(
1046 &self,
1047 index: u64,
1048 key: &TableKey,
1049 value: &[u8],
1050 log: &mut LogWriter,
1051 compressed: bool,
1052 ) -> Result<()> {
1053 self.overwrite_chain(key, value, log, Some(index), false, compressed)?;
1054 Ok(())
1055 }
1056
1057 pub fn write_claimed_plan(
1058 &self,
1059 index: u64,
1060 key: &TableKey,
1061 value: &[u8],
1062 log: &mut LogWriter,
1063 compressed: bool,
1064 ) -> Result<()> {
1065 self.overwrite_chain(key, value, log, Some(index), true, compressed)?;
1066 Ok(())
1067 }
1068
1069 pub fn write_remove_plan(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1070 if self.multipart {
1071 self.clear_chain(index, log)?;
1072 } else {
1073 self.clear_slot(index, log)?;
1074 }
1075 Ok(())
1076 }
1077
1078 pub fn write_inc_ref(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1079 self.change_ref(index, 1, log)?;
1080 Ok(())
1081 }
1082
1083 pub fn write_dec_ref(&self, index: u64, log: &mut LogWriter) -> Result<bool> {
1084 if self.change_ref(index, -1, log)? {
1085 return Ok(true)
1086 }
1087 self.write_remove_plan(index, log)?;
1088 Ok(false)
1089 }
1090
1091 pub fn change_ref(&self, index: u64, delta: i32, log: &mut LogWriter) -> Result<bool> {
1092 let mut buf = FullEntry::new_uninit_full_entry();
1093 let buf = if log.value(self.id, index, buf.as_mut()) {
1094 &mut buf
1095 } else {
1096 self.file
1097 .read_at(&mut buf[0..self.entry_size as usize], index * self.entry_size as u64)?;
1098 &mut buf
1099 };
1100
1101 if buf.is_tombstone() {
1102 return Ok(false)
1103 }
1104
1105 let size = if self.multipart && buf.is_multi(self.db_version) {
1106 buf.skip_size();
1107 if self.db_version > 8 && buf.is_multihead() {
1108 buf.skip_u64();
1109 }
1110 buf.skip_next();
1111 self.entry_size as usize
1112 } else {
1113 let (size, _compressed) = buf.read_size();
1114 buf.offset() + size as usize
1115 };
1116
1117 let rc_offset = buf.offset();
1118 let mut counter = buf.read_rc();
1119 if delta > 0 {
1120 if counter >= LOCKED_REF - delta as u32 {
1121 counter = LOCKED_REF
1122 } else {
1123 counter += delta as u32;
1124 }
1125 } else if counter != LOCKED_REF {
1126 counter = counter.saturating_sub(-delta as u32);
1127 if counter == 0 {
1128 return Ok(false)
1129 }
1130 }
1131
1132 buf.set_offset(rc_offset);
1133 buf.write_rc(counter);
1134 log.insert_value(self.id, index, buf[0..size].to_vec());
1136 Ok(true)
1137 }
1138
1139 pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
1140 while index >= self.file.capacity.load(Ordering::Relaxed) {
1141 self.file.grow(self.entry_size)?;
1142 }
1143 if index == 0 {
1144 let mut header = Header::default();
1145 log.read(&mut header.0)?;
1146 self.file.write_at(&header.0, 0)?;
1147 self.written.store(header.filled(), Ordering::Relaxed);
1148 log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
1149 return Ok(())
1150 }
1151
1152 let mut buf = FullEntry::new_uninit_full_entry();
1153 log.read(&mut buf[0..SIZE_SIZE])?;
1154 if buf.is_tombstone() {
1155 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
1156 self.file
1157 .write_at(&buf[0..SIZE_SIZE + INDEX_SIZE], index * (self.entry_size as u64))?;
1158 log::trace!(target: "parity-db", "{}: Enacted tombstone in slot {}", self.id, index);
1159 } else if self.multipart && buf.is_multi(self.db_version) {
1160 let entry_size = self.entry_size as usize;
1161 log.read(&mut buf[SIZE_SIZE..entry_size])?;
1162 self.file.write_at(&buf[0..entry_size], index * (entry_size as u64))?;
1163 log::trace!(target: "parity-db", "{}: Enacted multipart in slot {}", self.id, index);
1164 } else {
1165 let (len, _compressed) = buf.read_size();
1166 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
1167 self.file
1168 .write_at(&buf[0..(SIZE_SIZE + len as usize)], index * (self.entry_size as u64))?;
1169 log::trace!(target: "parity-db", "{}: Enacted {}: {}, {} bytes", self.id, index, hex(&buf.1[6..32]), len);
1170 }
1171 Ok(())
1172 }
1173
1174 pub fn validate_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
1175 if index == 0 {
1176 let mut header = Header::default();
1177 log.read(&mut header.0)?;
1178 return Ok(())
1180 }
1181 let mut buf = FullEntry::new_uninit_full_entry();
1182 log.read(&mut buf[0..SIZE_SIZE])?;
1183 if buf.is_tombstone() {
1184 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
1185 log::trace!(target: "parity-db", "{}: Validated tombstone in slot {}", self.id, index);
1186 } else if self.multipart && buf.is_multi(self.db_version) {
1187 let entry_size = self.entry_size as usize;
1188 log.read(&mut buf[SIZE_SIZE..entry_size])?;
1189 log::trace!(target: "parity-db", "{}: Validated multipart in slot {}", self.id, index);
1190 } else {
1191 let (len, _compressed) = buf.read_size();
1193 log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
1194 log::trace!(target: "parity-db", "{}: Validated {}: {}, {} bytes", self.id, index, hex(&buf[SIZE_SIZE..32]), len);
1195 }
1196 Ok(())
1197 }
1198
1199 pub fn refresh_metadata(&self) -> Result<()> {
1200 if self.file.map.read().is_none() {
1201 return Ok(())
1202 }
1203 let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1204 Some(free_entries.write())
1205 } else {
1206 None
1207 };
1208 let mut header = Header::default();
1209 self.file.read_at(&mut header.0, 0)?;
1210 let last_removed = header.last_removed();
1211 let mut filled = header.filled();
1212 if filled == 0 {
1213 filled = 1;
1214 }
1215 self.last_removed.store(last_removed, Ordering::Relaxed);
1216 self.filled.store(filled, Ordering::Relaxed);
1217 self.written.store(filled, Ordering::Relaxed);
1218 Ok(())
1219 }
1220
1221 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1222 let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1223 Some(free_entries.read())
1224 } else {
1225 None
1226 };
1227 if let Ok(true) =
1228 self.dirty_header
1229 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
1230 {
1231 let mut buf = Header::default();
1233 let last_removed = self.last_removed.load(Ordering::Relaxed);
1234 let filled = self.filled.load(Ordering::Relaxed);
1235 buf.set_last_removed(last_removed);
1236 buf.set_filled(filled);
1237 log.insert_value(self.id, 0, buf.0.to_vec());
1238 }
1239 Ok(())
1240 }
1241
1242 pub fn flush(&self) -> Result<()> {
1243 self.file.flush()
1244 }
1245
1246 fn ref_size(&self) -> usize {
1247 if self.ref_counted {
1248 REFS_SIZE
1249 } else {
1250 0
1251 }
1252 }
1253
1254 pub fn iter_while(
1255 &self,
1256 log: &impl LogQuery,
1257 mut f: impl FnMut(u64, u32, Vec<u8>, bool) -> bool,
1258 ) -> Result<()> {
1259 let written = self.written.load(Ordering::Relaxed);
1260 for index in 1..written {
1261 let mut _fetch_key = Default::default();
1262 match self.read(&mut TableKeyQuery::Fetch(Some(&mut _fetch_key)), index, log) {
1263 Ok((rc, compressed, value)) =>
1264 if rc > 0 && !f(index, rc, value, compressed) {
1265 break
1266 },
1267 Err(crate::error::Error::InvalidValueData) => (), Err(e) => return Err(e),
1269 }
1270 }
1271 Ok(())
1272 }
1273
1274 pub fn is_init(&self) -> bool {
1275 self.file.map.read().is_some()
1276 }
1277
1278 pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> {
1279 if let Err(e) = self.do_init_with_entry(entry) {
1280 log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path.display());
1281 let _ = self.file.remove(); return Err(e)
1283 }
1284 Ok(())
1285 }
1286
1287 fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> {
1288 self.file.grow(self.entry_size)?;
1289
1290 let empty_overlays = RwLock::new(LogOverlays::with_columns(0));
1291 let mut log = LogWriter::new(&empty_overlays, 0);
1292 let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false, false)?;
1293 self.complete_plan(&mut log)?;
1294 assert_eq!(at, 1);
1295 let log = log.drain();
1296 let change = log.local_values_changes(self.id).expect("entry written above");
1297 for (at, (_rec_id, entry)) in change.map.iter() {
1298 self.file.write_at(entry.as_slice(), *at * (self.entry_size as u64))?;
1299 }
1300 Ok(())
1301 }
1302
1303 pub fn check_free_refs(&self) -> Result<u64> {
1305 let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1306 Some(free_entries.read())
1307 } else {
1308 None
1309 };
1310 let written = self.written.load(Ordering::Relaxed);
1311 let mut next = self.last_removed.load(Ordering::Relaxed);
1312 let mut len = 0;
1313 while next != 0 {
1314 if next >= written {
1315 return Err(crate::error::Error::Corruption(format!(
1316 "Bad removed ref {} out of {}",
1317 next, written
1318 )))
1319 }
1320 let mut buf = PartialEntry::new_uninit();
1321 self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
1322 buf.skip_size();
1323 next = buf.read_next();
1324 len += 1;
1325 }
1326 Ok(len)
1327 }
1328
1329 pub fn get_num_entries(&self) -> Result<u64> {
1330 if let Some(free_entries) = &self.free_entries {
1331 let free_entries = free_entries.read();
1332 let filled = self.filled.load(Ordering::Relaxed);
1333 let num_free = free_entries.stack.len();
1334 let num = (filled - 1) - num_free as u64;
1335 if num > 0 && self.multipart {
1336 return Err(crate::error::Error::InvalidConfiguration(format!(
1338 "Unable to determine number of multipart entries"
1339 )))
1340 }
1341 return Ok(num)
1342 }
1343 Err(crate::error::Error::InvalidConfiguration(format!(
1344 "Unable to determine number of entries"
1345 )))
1346 }
1347}
1348
1349pub mod key {
1350 use super::{EntryRef, FullEntry};
1351 use crate::Key;
1352
1353 pub const PARTIAL_SIZE: usize = 26;
1354
1355 pub fn partial_key(hash: &Key) -> &[u8] {
1356 &hash[6..]
1357 }
1358
1359 pub enum TableKey {
1360 Partial(Key),
1361 NoHash,
1362 }
1363
1364 impl TableKey {
1365 pub fn encoded_size(&self) -> usize {
1366 match self {
1367 TableKey::Partial(_) => PARTIAL_SIZE,
1368 TableKey::NoHash => 0,
1369 }
1370 }
1371
1372 pub fn index_from_partial(partial: &[u8]) -> u64 {
1373 u64::from_be_bytes((partial[0..8]).try_into().unwrap())
1374 }
1375
1376 pub fn compare(&self, fetch: &Option<[u8; PARTIAL_SIZE]>) -> bool {
1377 match (self, fetch) {
1378 (TableKey::Partial(k), Some(fetch)) => partial_key(k) == fetch,
1379 (TableKey::NoHash, _) => true,
1380 _ => false,
1381 }
1382 }
1383
1384 pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> [u8; PARTIAL_SIZE] {
1385 let mut result = [0u8; PARTIAL_SIZE];
1386 let pks = buf.read_partial();
1387 result.copy_from_slice(pks);
1388 result
1389 }
1390
1391 pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Option<[u8; PARTIAL_SIZE]> {
1392 match self {
1393 TableKey::Partial(_k) => Some(Self::fetch_partial(buf)),
1394 TableKey::NoHash => None,
1395 }
1396 }
1397
1398 pub fn write(&self, buf: &mut FullEntry) {
1399 match self {
1400 TableKey::Partial(k) => {
1401 buf.write_slice(partial_key(k));
1402 },
1403 TableKey::NoHash => (),
1404 }
1405 }
1406 }
1407
1408 impl std::fmt::Display for TableKey {
1409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1410 match self {
1411 TableKey::Partial(k) => write!(f, "{}", crate::display::hex(k)),
1412 TableKey::NoHash => write!(f, "no_hash"),
1413 }
1414 }
1415 }
1416
1417 pub enum TableKeyQuery<'a> {
1418 Check(&'a TableKey),
1419 Fetch(Option<&'a mut [u8; PARTIAL_SIZE]>),
1420 }
1421}
1422
1423#[cfg(test)]
1424mod test {
1425 const ENTRY_SIZE: u16 = 64;
1426
1427 use super::{TableId, Value, ValueTable, MULTIPART_ENTRY_SIZE};
1428 use crate::{
1429 log::{Log, LogAction, LogWriter},
1430 options::{ColumnOptions, Options, CURRENT_VERSION},
1431 table::key::TableKey,
1432 Key,
1433 };
1434 use std::sync::{atomic::Ordering, Arc};
1435 use tempfile::{tempdir, TempDir};
1436
1437 fn new_table(dir: &TempDir, size: Option<u16>, options: &ColumnOptions) -> ValueTable {
1438 let id = TableId::new(0, 0);
1439 ValueTable::open(Arc::new(dir.path().to_path_buf()), id, size, options, CURRENT_VERSION)
1440 .unwrap()
1441 }
1442
1443 fn new_log(dir: &TempDir) -> Log {
1444 let options = Options::with_columns(dir.path(), 1);
1445 Log::open(&options).unwrap()
1446 }
1447
1448 fn write_ops<F: FnOnce(&mut LogWriter)>(table: &ValueTable, log: &Log, f: F) {
1449 let mut writer = log.begin_record();
1450 f(&mut writer);
1451 let bytes_written = log.end_record(writer.drain()).unwrap();
1452 let _ = log.read_next(false);
1453 log.flush_one(0).unwrap();
1454 let mut reader = log.read_next(false).unwrap().unwrap();
1455 loop {
1456 match reader.next().unwrap() {
1457 LogAction::BeginRecord |
1458 LogAction::InsertIndex { .. } |
1459 LogAction::InsertRefCount { .. } |
1460 LogAction::DropTable { .. } |
1461 LogAction::DropRefCountTable { .. } => {
1462 panic!("Unexpected log entry");
1463 },
1464 LogAction::EndRecord => {
1465 let bytes_read = reader.read_bytes();
1466 assert_eq!(bytes_written, bytes_read);
1467 break
1468 },
1469 LogAction::InsertValue(insertion) => {
1470 table.enact_plan(insertion.index, &mut reader).unwrap();
1471 },
1472 }
1473 }
1474 }
1475
1476 fn key(k: u32) -> Key {
1477 use blake2::{digest::typenum::U32, Blake2b, Digest};
1478 let mut key = Key::default();
1479 let hash = Blake2b::<U32>::digest(k.to_le_bytes());
1480 key.copy_from_slice(&hash);
1481 key
1482 }
1483
1484 fn simple_key(k: Key) -> TableKey {
1485 TableKey::Partial(k)
1486 }
1487
1488 fn no_hash(_: Key) -> TableKey {
1489 TableKey::NoHash
1490 }
1491
1492 fn value(size: usize) -> Value {
1493 use rand::RngCore;
1494 let mut result = vec![0; size];
1495 rand::rng().fill_bytes(&mut result);
1496 result
1497 }
1498
1499 fn rc_options() -> ColumnOptions {
1500 ColumnOptions { ref_counted: true, ..Default::default() }
1501 }
1502
1503 #[test]
1504 fn insert_simple() {
1505 insert_simple_inner(&Default::default());
1506 insert_simple_inner(&rc_options());
1507 }
1508 fn insert_simple_inner(options: &ColumnOptions) {
1509 let dir = tempdir().unwrap();
1510 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1511 let log = new_log(&dir);
1512
1513 let key = key(1);
1514 let key = TableKey::Partial(key);
1515 let key = &key;
1516 let val = value(19);
1517 let compressed = true;
1518
1519 write_ops(&table, &log, |writer| {
1520 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1521 assert_eq!(table.get(key, 1, writer).unwrap(), Some((val.clone(), compressed)));
1522 });
1523
1524 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1525 assert_eq!(table.filled.load(Ordering::Relaxed), 2);
1526 }
1527
1528 #[test]
1529 #[should_panic(expected = "assertion failed: entry_size <= MAX_ENTRY_SIZE as u16")]
1530 fn oversized_into_fixed_panics() {
1531 let dir = tempdir().unwrap();
1532 let _table = new_table(&dir, Some(65534), &Default::default());
1533 }
1534
1535 #[test]
1536 fn remove_simple() {
1537 remove_simple_inner(&Default::default());
1538 remove_simple_inner(&rc_options());
1539 }
1540 fn remove_simple_inner(options: &ColumnOptions) {
1541 let dir = tempdir().unwrap();
1542 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1543 let log = new_log(&dir);
1544
1545 let key1 = key(1);
1546 let key1 = &TableKey::Partial(key1);
1547 let key2 = key(2);
1548 let key2 = &TableKey::Partial(key2);
1549 let val1 = value(11);
1550 let val2 = value(21);
1551 let compressed = false;
1552
1553 write_ops(&table, &log, |writer| {
1554 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1555 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1556 });
1557
1558 write_ops(&table, &log, |writer| {
1559 table.write_remove_plan(1, writer).unwrap();
1560 });
1561
1562 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), None);
1563 assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1564
1565 write_ops(&table, &log, |writer| {
1566 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1567 });
1568 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1569 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1570 }
1571
1572 #[test]
1573 fn replace_simple() {
1574 replace_simple_inner(&Default::default(), simple_key);
1575 replace_simple_inner(&rc_options(), simple_key);
1576 replace_simple_inner(&Default::default(), no_hash);
1577 replace_simple_inner(&rc_options(), no_hash);
1578 }
1579 fn replace_simple_inner(options: &ColumnOptions, table_key: fn(Key) -> TableKey) {
1580 let dir = tempdir().unwrap();
1581 let table = new_table(&dir, Some(ENTRY_SIZE), options);
1582 let log = new_log(&dir);
1583
1584 let key1 = key(1);
1585 let key1 = &table_key(key1);
1586 let key2 = key(2);
1587 let key2 = &table_key(key2);
1588 let val1 = value(11);
1589 let val2 = value(21);
1590 let val3 = value(26); let compressed = true;
1592
1593 write_ops(&table, &log, |writer| {
1594 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1595 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1596 });
1597
1598 write_ops(&table, &log, |writer| {
1599 table.write_replace_plan(1, key2, &val3, writer, false).unwrap();
1600 });
1601
1602 assert_eq!(table.get(key2, 1, log.overlays()).unwrap(), Some((val3, false)));
1603 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1604 }
1605
1606 #[test]
1607 fn replace_multipart_shorter() {
1608 replace_multipart_shorter_inner(&Default::default());
1609 replace_multipart_shorter_inner(&rc_options());
1610 }
1611 fn replace_multipart_shorter_inner(options: &ColumnOptions) {
1612 let dir = tempdir().unwrap();
1613 let table = new_table(&dir, None, options);
1614 let log = new_log(&dir);
1615
1616 let key1 = key(1);
1617 let key1 = &TableKey::Partial(key1);
1618 let key2 = key(2);
1619 let key2 = &TableKey::Partial(key2);
1620 let val1 = value(20000);
1621 let val2 = value(30);
1622 let val1s = value(5000);
1623 let compressed = false;
1624
1625 write_ops(&table, &log, |writer| {
1626 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1627 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1628 });
1629
1630 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1631 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1632 assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1633
1634 write_ops(&table, &log, |writer| {
1635 table.write_replace_plan(1, key1, &val1s, writer, compressed).unwrap();
1636 });
1637 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1s, compressed)));
1638 assert_eq!(table.last_removed.load(Ordering::Relaxed), 5);
1639 write_ops(&table, &log, |writer| {
1640 assert_eq!(table.read_next_free(5, writer).unwrap(), 4);
1641 assert_eq!(table.read_next_free(4, writer).unwrap(), 3);
1642 assert_eq!(table.read_next_free(3, writer).unwrap(), 0);
1643 });
1644 }
1645
1646 #[test]
1647 fn replace_multipart_longer() {
1648 replace_multipart_longer_inner(&Default::default());
1649 replace_multipart_longer_inner(&rc_options());
1650 }
1651 fn replace_multipart_longer_inner(options: &ColumnOptions) {
1652 let dir = tempdir().unwrap();
1653 let table = new_table(&dir, None, options);
1654 let log = new_log(&dir);
1655
1656 let key1 = key(1);
1657 let key1 = &TableKey::Partial(key1);
1658 let key2 = key(2);
1659 let key2 = &TableKey::Partial(key2);
1660 let val1 = value(5000);
1661 let val2 = value(30);
1662 let val1l = value(20000);
1663 let compressed = false;
1664
1665 write_ops(&table, &log, |writer| {
1666 table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1667 table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1668 });
1669
1670 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1671 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1672 assert_eq!(table.filled.load(Ordering::Relaxed), 4);
1673
1674 write_ops(&table, &log, |writer| {
1675 table.write_replace_plan(1, key1, &val1l, writer, compressed).unwrap();
1676 });
1677 assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1l, compressed)));
1678 assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1679 assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1680 }
1681
1682 #[test]
1683 fn ref_counting() {
1684 for compressed in [false, true] {
1685 let dir = tempdir().unwrap();
1686 let table = new_table(&dir, None, &rc_options());
1687 let log = new_log(&dir);
1688
1689 let key = key(1);
1690 let key = &TableKey::Partial(key);
1691 let val = value(5000);
1692
1693 write_ops(&table, &log, |writer| {
1694 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1695 table.write_inc_ref(1, writer).unwrap();
1696 });
1697 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val.clone(), compressed)));
1698 write_ops(&table, &log, |writer| {
1699 table.write_dec_ref(1, writer).unwrap();
1700 });
1701 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1702 write_ops(&table, &log, |writer| {
1703 table.write_dec_ref(1, writer).unwrap();
1704 });
1705 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1706 }
1707 }
1708
1709 #[test]
1710 fn iteration() {
1711 for multipart in [false, true] {
1712 for compressed in [false, true] {
1713 let (entry_size, size_mul) =
1714 if multipart { (None, 100) } else { (Some(MULTIPART_ENTRY_SIZE / 2), 1) };
1715
1716 let dir = tempdir().unwrap();
1717 let table = new_table(&dir, entry_size, &rc_options());
1718 let log = new_log(&dir);
1719
1720 let (v1, v2, v3) = (
1721 value(MULTIPART_ENTRY_SIZE as usize / 8 * size_mul),
1722 value(MULTIPART_ENTRY_SIZE as usize / 4 * size_mul),
1723 value(MULTIPART_ENTRY_SIZE as usize * 3 / 8 * size_mul),
1724 );
1725 let entries = [
1726 (TableKey::Partial(key(1)), &v1),
1727 (TableKey::Partial(key(2)), &v2),
1728 (TableKey::Partial(key(3)), &v3),
1729 ];
1730
1731 write_ops(&table, &log, |writer| {
1732 for (k, v) in &entries {
1733 table.write_insert_plan(k, &v, writer, compressed).unwrap();
1734 }
1735 table.complete_plan(writer).unwrap();
1736 });
1737
1738 let mut res = Vec::new();
1739 table
1740 .iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1741 res.push((v.len(), cmpr));
1742 true
1743 })
1744 .unwrap();
1745 assert_eq!(
1746 res,
1747 vec![(v1.len(), compressed), (v2.len(), compressed), (v3.len(), compressed)]
1748 );
1749
1750 let v2_index = 2 + v1.len() as u64 / super::MULTIPART_ENTRY_SIZE as u64;
1751 write_ops(&table, &log, |writer| {
1752 table.write_remove_plan(v2_index, writer).unwrap();
1753 });
1754
1755 let mut res = Vec::new();
1756 table
1757 .iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1758 res.push((v.len(), cmpr));
1759 true
1760 })
1761 .unwrap();
1762 assert_eq!(res, vec![(v1.len(), compressed), (v3.len(), compressed)]);
1763 }
1764 }
1765 }
1766
1767 #[test]
1768 fn ref_underflow() {
1769 let dir = tempdir().unwrap();
1770 let table = new_table(&dir, Some(ENTRY_SIZE), &rc_options());
1771 let log = new_log(&dir);
1772
1773 let key = key(1);
1774 let key = &TableKey::Partial(key);
1775 let val = value(10);
1776
1777 let compressed = false;
1778 write_ops(&table, &log, |writer| {
1779 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1780 table.write_inc_ref(1, writer).unwrap();
1781 });
1782 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1783 write_ops(&table, &log, |writer| {
1784 table.write_dec_ref(1, writer).unwrap();
1785 table.write_dec_ref(1, writer).unwrap();
1786 table.write_dec_ref(1, writer).unwrap();
1787 });
1788 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1789 }
1790
1791 #[test]
1792 fn multipart_collision() {
1793 use super::MAX_ENTRY_SIZE;
1794 let mut entry = super::Entry::new(super::MULTIPART.to_vec());
1795 let size = entry.read_size().0 as usize;
1796 assert!(size > MAX_ENTRY_SIZE);
1797 let mut entry = super::Entry::new(super::MULTIHEAD.to_vec());
1798 let size = entry.read_size().0 as usize;
1799 assert!(size > MAX_ENTRY_SIZE);
1800 let mut entry = super::Entry::new(super::MULTIHEAD_COMPRESSED.to_vec());
1801 let size = entry.read_size().0 as usize;
1802 assert!(size > MAX_ENTRY_SIZE);
1803 let dir = tempdir().unwrap();
1804 let table = new_table(&dir, Some(MAX_ENTRY_SIZE as u16), &rc_options());
1805 let log = new_log(&dir);
1806
1807 let key = key(1);
1808 let key = &TableKey::Partial(key);
1809 let val = value(32225); let compressed = true;
1812 write_ops(&table, &log, |writer| {
1813 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1814 });
1815 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1816 write_ops(&table, &log, |writer| {
1817 table.write_dec_ref(1, writer).unwrap();
1818 });
1819 assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1820
1821 let value_size = table.value_size(key).unwrap();
1823 assert_eq!(0x7fd8, table.value_size(key).unwrap()); let val = value(value_size as usize); write_ops(&table, &log, |writer| {
1826 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1827 });
1828 assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1829 }
1830
1831 #[test]
1832 fn bad_size_header() {
1833 let dir = tempdir().unwrap();
1834 let table = new_table(&dir, Some(36), &rc_options());
1835 let log = new_log(&dir);
1836
1837 let key = &TableKey::Partial(key(1));
1838 let val = value(4);
1839
1840 let compressed = false;
1841 write_ops(&table, &log, |writer| {
1842 table.write_insert_plan(key, &val, writer, compressed).unwrap();
1843 });
1844 let zeroes = [0u8, 0u8];
1846 table.file.write_at(&zeroes, table.entry_size as u64).unwrap();
1847 let log = new_log(&dir);
1848 assert!(matches!(
1849 table.get(key, 1, log.overlays()),
1850 Err(crate::error::Error::Corruption(_))
1851 ));
1852 }
1853}