parity_db/
table.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4// On disk data layout for value tables.
5// All numerical values are little endian.
6//
7// Entry 0 (metadata)
8// [LAST_REMOVED: 8][FILLED: 8]
9// LAST_REMOVED - 64-bit index of removed entries linked list head
10// FILLED - highest index filled with live data
11//
12// Complete entry:
13// [SIZE: 2][REFS: 4][KEY: 26][VALUE]
14// SIZE: 15-bit value size. Sizes up to 0x7ffc are allowed.
15// This includes size of REFS and KEY.
16// The highest bit is reserved to indicate if compression is applied.
17// REF: 32-bit reference counter (optional).
18// KEY: lower 26 bytes of the key (optional for btree nodes).
19// VALUE: payload bytes.
20//
21// Partial entry (first part):
22// [MULTIHEAD: 2][SIZE: 8][NEXT: 8][REFS: 4][KEY: 26][VALUE]
23// MULTIHEAD - Split entry head marker. 0xfffd.
24// SIZE - 64-bit total size of the value.
25// NEXT - 64-bit index of the entry that holds the next part.
26// REF: 32-bit reference counter (optional).
27// KEY: lower 26 bytes of the key (optional for btree nodes).
28// VALUE: The rest of the entry is filled with payload bytes.
29//
30// Partial entry (continuation):
31// [MULTIPART: 2][NEXT: 8][VALUE]
32// MULTIPART - Split entry marker. 0xfffe.
33// NEXT - 64-bit index of the entry that holds the next part.
34// VALUE: The rest of the entry is filled with payload bytes.
35//
36// Partial entry (last part):
37// [SIZE: 2][VALUE: SIZE]
38// SIZE: 15-bit size of the remaining payload.
39// The highest bit is reserved to indicate if compression is applied.
40// VALUE: SIZE payload bytes.
41//
42// Deleted entry
43// [TOMBSTONE: 2][NEXT: 8]
44// TOMBSTONE - Deleted entry marker. 0xffff
45// NEXT - 64-bit index of the next deleted entry.
46
47use crate::{
48	column::ColId,
49	display::hex,
50	error::Result,
51	log::{LogOverlays, LogQuery, LogReader, LogWriter},
52	options::ColumnOptions as Options,
53	parking_lot::RwLock,
54	table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE},
55};
56use std::{
57	convert::TryInto,
58	mem::MaybeUninit,
59	sync::{
60		atomic::{AtomicBool, AtomicU64, Ordering},
61		Arc,
62	},
63};
64
65pub const SIZE_TIERS: usize = 1usize << SIZE_TIERS_BITS;
66pub const SIZE_TIERS_BITS: u8 = 8;
67pub const COMPRESSED_MASK: u16 = 0x80_00;
68pub const MAX_ENTRY_SIZE: usize = 0x7ff8; // Actual max size in V4 was 0x7dfe
69pub const MIN_ENTRY_SIZE: usize = 32;
70const REFS_SIZE: usize = 4;
71const SIZE_SIZE: usize = 2;
72const INDEX_SIZE: usize = 8;
73const MAX_ENTRY_BUF_SIZE: usize = 0x8000;
74
75const TOMBSTONE: &[u8] = &[0xff, 0xff];
76const MULTIPART_V4: &[u8] = &[0xff, 0xfe];
77const MULTIHEAD_V4: &[u8] = &[0xff, 0xfd];
78const MULTIPART: &[u8] = &[0xfe, 0xff];
79const MULTIHEAD: &[u8] = &[0xfd, 0xff];
80const MULTIHEAD_COMPRESSED: &[u8] = &[0xfd, 0x7f];
81// When a rc reach locked ref, it is locked in db.
82const LOCKED_REF: u32 = u32::MAX;
83
84const MULTIPART_ENTRY_SIZE: u16 = 4096;
85
86pub type Value = Vec<u8>;
87
88#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
89pub struct TableId(u16);
90
91impl TableId {
92	pub fn new(col: ColId, size_tier: u8) -> TableId {
93		TableId(((col as u16) << 8) | size_tier as u16)
94	}
95
96	pub fn from_u16(id: u16) -> TableId {
97		TableId(id)
98	}
99
100	pub fn col(&self) -> ColId {
101		(self.0 >> 8) as ColId
102	}
103
104	pub fn size_tier(&self) -> u8 {
105		(self.0 & 0xff) as u8
106	}
107
108	pub fn file_name(&self) -> String {
109		format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()]))
110	}
111
112	pub fn is_file_name(col: ColId, name: &str) -> bool {
113		name.starts_with(&format!("table_{col:02}_"))
114	}
115
116	pub fn as_u16(&self) -> u16 {
117		self.0
118	}
119
120	pub fn log_index(&self) -> usize {
121		self.col() as usize * SIZE_TIERS + self.size_tier() as usize
122	}
123
124	pub const fn max_log_tables(num_columns: usize) -> usize {
125		SIZE_TIERS * num_columns
126	}
127
128	pub fn from_log_index(i: usize) -> Self {
129		let col = i / SIZE_TIERS;
130		let tier = i % SIZE_TIERS;
131		Self::new(col as ColId, tier as u8)
132	}
133}
134
135impl std::fmt::Display for TableId {
136	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137		write!(f, "t{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
138	}
139}
140
141#[derive(Debug)]
142struct FreeEntries {
143	stack: Vec<u64>,
144}
145
146#[derive(Debug)]
147pub struct ValueTable {
148	pub id: TableId,
149	pub entry_size: u16,
150	file: crate::file::TableFile,
151	filled: AtomicU64,  // Number of entries from the POV of the log overlay.
152	written: AtomicU64, // Actual number of entries on disk.
153	last_removed: AtomicU64,
154	dirty_header: AtomicBool,
155	needs_free_entries: bool,
156	free_entries: Option<RwLock<FreeEntries>>,
157	multipart: bool,
158	ref_counted: bool,
159	db_version: u32,
160}
161
162#[derive(Default, Clone, Copy)]
163struct Header([u8; 16]);
164
165impl Header {
166	fn last_removed(&self) -> u64 {
167		u64::from_le_bytes(self.0[0..INDEX_SIZE].try_into().unwrap())
168	}
169	fn set_last_removed(&mut self, last_removed: u64) {
170		self.0[0..INDEX_SIZE].copy_from_slice(&last_removed.to_le_bytes());
171	}
172	fn filled(&self) -> u64 {
173		u64::from_le_bytes(self.0[INDEX_SIZE..INDEX_SIZE * 2].try_into().unwrap())
174	}
175	fn set_filled(&mut self, filled: u64) {
176		self.0[INDEX_SIZE..INDEX_SIZE * 2].copy_from_slice(&filled.to_le_bytes());
177	}
178}
179
180pub struct Entry<B: AsRef<[u8]>>(usize, B);
181#[cfg(feature = "loom")]
182pub type FullEntry = Entry<Vec<u8>>;
183#[cfg(not(feature = "loom"))]
184pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>;
185pub type EntryRef<'a> = Entry<&'a [u8]>;
186type PartialEntry = Entry<[u8; 18]>;
187type PartialKeyEntry = Entry<[u8; 48]>; // 2 + 8 + 4 + 26 + 8
188
189impl<const C: usize> Entry<[u8; C]> {
190	#[inline(always)]
191	#[allow(clippy::uninit_assumed_init)]
192	pub fn new_uninit() -> Self {
193		Entry(0, unsafe { MaybeUninit::uninit().assume_init() })
194	}
195}
196
197#[cfg(feature = "loom")]
198impl Entry<Vec<u8>> {
199	pub fn new_uninit_full_entry() -> Self {
200		Entry(0, vec![0; MAX_ENTRY_BUF_SIZE])
201	}
202}
203
204#[cfg(not(feature = "loom"))]
205impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> {
206	pub fn new_uninit_full_entry() -> Self {
207		Self::new_uninit()
208	}
209}
210
211impl<B: AsRef<[u8]>> Entry<B> {
212	#[inline(always)]
213	pub fn check_remaining_len(
214		&self,
215		len: usize,
216		error: impl Fn() -> crate::error::Error,
217	) -> Result<()> {
218		if self.0 + len > self.1.as_ref().len() {
219			return Err(error())
220		}
221		Ok(())
222	}
223
224	#[inline(always)]
225	pub fn new(data: B) -> Self {
226		Entry(0, data)
227	}
228
229	pub fn set_offset(&mut self, offset: usize) {
230		self.0 = offset;
231	}
232
233	pub fn offset(&self) -> usize {
234		self.0
235	}
236
237	pub fn size(&self) -> usize {
238		self.1.as_ref().len() - self.0
239	}
240
241	pub fn read_slice(&mut self, size: usize) -> &[u8] {
242		let start = self.0;
243		self.0 += size;
244		&self.1.as_ref()[start..self.0]
245	}
246
247	fn is_tombstone(&self) -> bool {
248		&self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE
249	}
250
251	fn is_multipart(&self) -> bool {
252		&self.1.as_ref()[0..SIZE_SIZE] == MULTIPART
253	}
254
255	fn is_multipart_v4(&self) -> bool {
256		&self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4
257	}
258
259	fn is_multihead_compressed(&self) -> bool {
260		&self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED
261	}
262
263	fn is_multihead(&self) -> bool {
264		self.is_multihead_compressed() || &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD
265	}
266
267	fn is_multihead_v4(&self) -> bool {
268		&self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4
269	}
270
271	fn is_multi(&self, db_version: u32) -> bool {
272		self.is_multipart() ||
273			self.is_multihead() ||
274			(db_version <= 4 && (self.is_multipart_v4() || self.is_multihead_v4()))
275	}
276
277	fn read_size(&mut self) -> (u16, bool) {
278		let size = u16::from_le_bytes(self.read_slice(SIZE_SIZE).try_into().unwrap());
279		let compressed = (size & COMPRESSED_MASK) > 0;
280		(size & !COMPRESSED_MASK, compressed)
281	}
282
283	fn skip_size(&mut self) {
284		self.0 += SIZE_SIZE;
285	}
286
287	pub fn read_u64(&mut self) -> u64 {
288		u64::from_le_bytes(self.read_slice(8).try_into().unwrap())
289	}
290
291	fn read_next(&mut self) -> u64 {
292		self.read_u64()
293	}
294
295	pub fn skip_u64(&mut self) {
296		self.0 += 8;
297	}
298
299	pub fn skip_next(&mut self) {
300		self.skip_u64()
301	}
302
303	pub fn read_u32(&mut self) -> u32 {
304		u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap())
305	}
306
307	fn read_rc(&mut self) -> u32 {
308		self.read_u32()
309	}
310
311	fn read_partial(&mut self) -> &[u8] {
312		self.read_slice(PARTIAL_SIZE)
313	}
314}
315
316impl<B: AsRef<[u8]> + AsMut<[u8]>> Entry<B> {
317	pub fn write_slice(&mut self, buf: &[u8]) {
318		let start = self.0;
319		self.0 += buf.len();
320		self.1.as_mut()[start..self.0].copy_from_slice(buf);
321	}
322
323	fn write_tombstone(&mut self) {
324		self.write_slice(TOMBSTONE);
325	}
326
327	fn write_multipart(&mut self) {
328		self.write_slice(MULTIPART);
329	}
330
331	fn write_multihead(&mut self) {
332		self.write_slice(MULTIHEAD);
333	}
334
335	fn write_multihead_compressed(&mut self) {
336		self.write_slice(MULTIHEAD_COMPRESSED);
337	}
338
339	fn write_size(&mut self, mut size: u16, compressed: bool) {
340		if compressed {
341			size |= COMPRESSED_MASK;
342		}
343		self.write_slice(&size.to_le_bytes());
344	}
345	pub fn write_u64(&mut self, next_index: u64) {
346		self.write_slice(&next_index.to_le_bytes());
347	}
348
349	fn write_next(&mut self, next_index: u64) {
350		self.write_u64(next_index)
351	}
352
353	pub fn write_u32(&mut self, next_index: u32) {
354		self.write_slice(&next_index.to_le_bytes());
355	}
356
357	fn write_rc(&mut self, rc: u32) {
358		self.write_slice(&rc.to_le_bytes());
359	}
360
361	pub fn inner_mut(&mut self) -> &mut B {
362		&mut self.1
363	}
364}
365
366impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for Entry<B> {
367	fn as_mut(&mut self) -> &mut [u8] {
368		self.1.as_mut()
369	}
370}
371
372impl<B: AsRef<[u8]>> AsRef<[u8]> for Entry<B> {
373	fn as_ref(&self) -> &[u8] {
374		self.1.as_ref()
375	}
376}
377
378impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::Index<std::ops::Range<usize>> for Entry<B> {
379	type Output = [u8];
380
381	fn index(&self, index: std::ops::Range<usize>) -> &[u8] {
382		&self.1.as_ref()[index]
383	}
384}
385
386impl<B: AsRef<[u8]> + AsMut<[u8]>> std::ops::IndexMut<std::ops::Range<usize>> for Entry<B> {
387	fn index_mut(&mut self, index: std::ops::Range<usize>) -> &mut [u8] {
388		&mut self.1.as_mut()[index]
389	}
390}
391
392enum LockedSlice<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> {
393	FromOverlay(O),
394	FromFile(F),
395}
396
397impl<O: std::ops::Deref<Target = [u8]>, F: std::ops::Deref<Target = [u8]>> LockedSlice<O, F> {
398	fn as_slice(&self) -> &[u8] {
399		match self {
400			LockedSlice::FromOverlay(slice) => &*slice,
401			LockedSlice::FromFile(slice) => &*slice,
402		}
403	}
404}
405
406impl ValueTable {
407	pub fn open(
408		path: Arc<std::path::PathBuf>,
409		id: TableId,
410		entry_size: Option<u16>,
411		options: &Options,
412		db_version: u32,
413	) -> Result<ValueTable> {
414		let (multipart, entry_size) = match entry_size {
415			Some(s) => (false, s),
416			None => (true, MULTIPART_ENTRY_SIZE),
417		};
418		assert!(entry_size >= MIN_ENTRY_SIZE as u16);
419		assert!(entry_size <= MAX_ENTRY_SIZE as u16);
420
421		let mut filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path);
422		filepath.push(id.file_name());
423		let file = crate::file::TableFile::open(filepath, entry_size, id)?;
424		let mut filled = 1;
425		let mut last_removed = 0;
426		if file.map.read().is_some() {
427			let mut header = Header::default();
428			file.read_at(&mut header.0, 0)?;
429			last_removed = header.last_removed();
430			filled = header.filled();
431			if filled == 0 {
432				filled = 1;
433			}
434			if last_removed >= filled {
435				return Err(crate::error::Error::Corruption(format!(
436					"Bad removed ref {} out of {}",
437					last_removed, filled
438				)))
439			}
440			log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}, removed={}", id, filled, entry_size, last_removed);
441		}
442
443		Ok(ValueTable {
444			id,
445			entry_size,
446			file,
447			filled: AtomicU64::new(filled),
448			written: AtomicU64::new(filled),
449			last_removed: AtomicU64::new(last_removed),
450			dirty_header: AtomicBool::new(false),
451			needs_free_entries: options.multitree,
452			free_entries: None,
453			multipart,
454			ref_counted: options.ref_counted,
455			db_version,
456		})
457	}
458
459	pub fn init_table_data(&mut self) -> Result<()> {
460		let free_entries = if self.needs_free_entries {
461			let mut stack: Vec<u64> = Default::default();
462
463			let filled = self.filled.load(Ordering::Relaxed);
464			let last_removed = self.last_removed.load(Ordering::Relaxed);
465
466			let mut next = last_removed;
467			while next != 0 {
468				if next >= filled {
469					return Err(crate::error::Error::Corruption(format!(
470						"Bad removed ref {} out of {}",
471						next, filled
472					)))
473				}
474
475				stack.insert(0, next);
476
477				let mut buf = PartialEntry::new_uninit();
478				self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
479				buf.skip_size();
480				next = buf.read_next();
481			}
482
483			Some(RwLock::new(FreeEntries { stack }))
484		} else {
485			None
486		};
487		self.free_entries = free_entries;
488		Ok(())
489	}
490
491	pub fn value_size(&self, key: &TableKey) -> Option<u16> {
492		let base = self.entry_size - SIZE_SIZE as u16 - self.ref_size() as u16;
493		let k_encoded = key.encoded_size() as u16;
494		if base < k_encoded {
495			None
496		} else {
497			Some(base - k_encoded)
498		}
499	}
500
501	#[inline(always)]
502	fn parse_head(
503		&self,
504		key: &mut TableKeyQuery,
505		index: u64,
506		mut buf: &mut EntryRef,
507	) -> Result<(u32, Option<u64>, u64, u16, bool)> {
508		// ref counter, total size, next, first part size, compressed
509		let mut compressed = false;
510		let mut rc = 1;
511		let entry_size = self.entry_size as usize;
512		let mut total_size = None;
513
514		if buf.is_tombstone() {
515			return Ok((0, None, 0, 0, false))
516		}
517
518		if self.multipart && !buf.is_multihead() {
519			// This may only happen during value iteration.
520			return Ok((0, None, 0, 0, false))
521		}
522
523		let (mut size, next) = if self.multipart && buf.is_multi(self.db_version) {
524			if self.db_version > 6 && buf.is_multihead_compressed() {
525				compressed = true;
526			}
527			buf.skip_size();
528			let mut buf_size = entry_size - SIZE_SIZE - INDEX_SIZE;
529			if self.db_version > 8 {
530				total_size = Some(buf.read_u64());
531				buf_size -= 8;
532			}
533			let next = buf.read_next();
534			(buf_size as u16, next)
535		} else {
536			let (size, read_compressed) = buf.read_size();
537			compressed = read_compressed;
538			total_size = Some(size as u64);
539			(size, 0)
540		};
541
542		if self.ref_counted {
543			if size < REFS_SIZE as u16 {
544				return Err(crate::error::Error::Corruption(format!(
545					"{}: Corrupted entry at {}. Size {}, expected at least {}",
546					self.id, index, size, REFS_SIZE,
547				)))
548			}
549			size -= REFS_SIZE as u16;
550			rc = buf.read_rc();
551		}
552		match key {
553			TableKeyQuery::Fetch(Some(to_fetch)) => {
554				size -= PARTIAL_SIZE as u16;
555				**to_fetch = TableKey::fetch_partial(&mut buf);
556			},
557			TableKeyQuery::Fetch(None) => (),
558			TableKeyQuery::Check(k) => {
559				size -= k.encoded_size() as u16;
560				let to_fetch = k.fetch(&mut buf);
561				if !k.compare(&to_fetch) {
562					log::debug!(
563						target: "parity-db",
564						"{}: Key mismatch at {}. Expected {}, got {:?}, size = {}",
565						self.id,
566						index,
567						k,
568						to_fetch.as_ref().map(hex),
569						self.entry_size,
570					);
571					return Ok((0, total_size, 0, 0, false))
572				}
573			},
574		}
575
576		Ok((rc, total_size, next, size, compressed))
577	}
578
579	// Return ref counter, size, partial key and if the value is compressed.
580	#[inline(always)]
581	fn entry_info(
582		&self,
583		key: &mut TableKeyQuery,
584		index: u64,
585		log: &impl LogQuery,
586	) -> Result<(u32, Option<u64>, bool)> {
587		let entry_size = self.entry_size as usize;
588
589		let vbuf = log.value_ref(self.id, index);
590		let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
591			log::trace!(
592				target: "parity-db",
593				"{}: Found in overlay {}",
594				self.id,
595				index,
596			);
597			LockedSlice::FromOverlay(buf)
598		} else {
599			log::trace!(
600				target: "parity-db",
601				"{}: Query slot {}",
602				self.id,
603				index,
604			);
605			let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
606			LockedSlice::FromFile(vbuf)
607		};
608		let mut buf = EntryRef::new(buf.as_slice());
609		let (rc, total_size, _next, _size, compressed) = self.parse_head(key, index, &mut buf)?;
610		Ok((rc, total_size, compressed))
611	}
612
613	// Return ref counter, partial key and if the value is compressed.
614	#[inline(always)]
615	fn read(
616		&self,
617		key: &mut TableKeyQuery,
618		mut index: u64,
619		log: &impl LogQuery,
620		//mut f: impl FnMut(&[u8]) -> bool,
621	) -> Result<(u32, bool, Vec<u8>)> {
622		let mut part = 0;
623		let mut compressed = false;
624		let mut rc = 1;
625		let entry_size = self.entry_size as usize;
626		let mut result = Vec::new();
627		loop {
628			let vbuf = log.value_ref(self.id, index);
629			let buf: LockedSlice<_, _> = if let Some(buf) = vbuf.as_deref() {
630				log::trace!(
631					target: "parity-db",
632					"{}: Found in overlay {}",
633					self.id,
634					index,
635				);
636				LockedSlice::FromOverlay(buf)
637			} else {
638				log::trace!(
639					target: "parity-db",
640					"{}: Query slot {}",
641					self.id,
642					index,
643				);
644				let vbuf = self.file.slice_at(index * self.entry_size as u64, entry_size);
645				LockedSlice::FromFile(vbuf)
646			};
647			let mut buf = EntryRef::new(buf.as_slice());
648
649			let (buf_size, next) = {
650				if part == 0 {
651					let (hrc, total_size, n, s, c) = self.parse_head(key, index, &mut buf)?;
652					if hrc == 0 {
653						return Ok((0, false, vec![]));
654					}
655					rc = hrc;
656					if let Some(s) = total_size {
657						result.reserve(s as usize);
658					}
659					compressed = c;
660					(s as usize, n)
661				} else {
662					if self.multipart && buf.is_multi(self.db_version) {
663						buf.skip_size();
664						let next = buf.read_next();
665						(entry_size - SIZE_SIZE - INDEX_SIZE, next)
666					} else {
667						let (size, read_compressed) = buf.read_size();
668						if part == 0 || self.db_version <= 6 {
669							compressed = read_compressed;
670						}
671						(size as usize, 0)
672					}
673				}
674			};
675
676			if buf.size() < buf_size {
677				return Err(crate::error::Error::Corruption(format!(
678					"Unexpected entry size. Expected at least {} bytes",
679					buf_size
680				)))
681			}
682
683			result.extend_from_slice(buf.read_slice(buf_size));
684
685			if next == 0 {
686				break
687			}
688			part += 1;
689			index = next;
690		}
691		Ok((rc, compressed, result))
692	}
693
694	pub fn get(
695		&self,
696		key: &TableKey,
697		index: u64,
698		log: &impl LogQuery,
699	) -> Result<Option<(Value, bool)>> {
700		if let Some((value, compressed, _)) =
701			self.query(&mut TableKeyQuery::Check(key), index, log)?
702		{
703			Ok(Some((value, compressed)))
704		} else {
705			Ok(None)
706		}
707	}
708
709	pub fn dump_entry(&self, index: u64) -> Result<Vec<u8>> {
710		let entry_size = self.entry_size as usize;
711		let mut buf = FullEntry::new_uninit_full_entry();
712		self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?;
713		Ok(buf[0..entry_size].to_vec())
714	}
715
716	pub fn query(
717		&self,
718		key: &mut TableKeyQuery,
719		index: u64,
720		log: &impl LogQuery,
721	) -> Result<Option<(Value, bool, u32)>> {
722		let (rc, compressed, result) = self.read(key, index, log)?;
723		if rc > 0 {
724			return Ok(Some((result, compressed, rc)))
725		}
726		Ok(None)
727	}
728
729	#[allow(clippy::type_complexity)]
730	pub fn get_with_meta(
731		&self,
732		index: u64,
733		log: &impl LogQuery,
734	) -> Result<Option<(Value, u32, [u8; PARTIAL_SIZE], bool)>> {
735		let mut query_key = Default::default();
736		if let Some((value, compressed, rc)) =
737			self.query(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?
738		{
739			return Ok(Some((value, rc, query_key, compressed)))
740		}
741		Ok(None)
742	}
743
744	pub fn size(
745		&self,
746		key: &TableKey,
747		index: u64,
748		log: &impl LogQuery,
749	) -> Result<Option<(u32, bool)>> {
750		let (rc, size, compressed) = self.entry_info(&mut TableKeyQuery::Check(key), index, log)?;
751		if rc > 0 {
752			if let Some(size) = size {
753				return Ok(Some((size as u32, compressed)))
754			}
755			return self
756				.read(&mut TableKeyQuery::Check(key), index, log)
757				.map(|(_rc, compressed, value)| Some((value.len() as u32, compressed)))
758		}
759		Ok(None)
760	}
761
762	pub fn has_key_at(&self, index: u64, key: &TableKey, log: &LogWriter) -> Result<bool> {
763		match key {
764			TableKey::Partial(k) => Ok(match self.partial_key_at(index, log)? {
765				Some(existing_key) => &existing_key[..] == key::partial_key(k),
766				None => false,
767			}),
768			TableKey::NoHash => Ok(!self.is_tombstone(index, log)?),
769		}
770	}
771
772	pub fn partial_key_at(
773		&self,
774		index: u64,
775		log: &impl LogQuery,
776	) -> Result<Option<[u8; PARTIAL_SIZE]>> {
777		let mut query_key = Default::default();
778		let (rc, _size, _compressed) =
779			self.entry_info(&mut TableKeyQuery::Fetch(Some(&mut query_key)), index, log)?;
780		Ok(if rc == 0 { None } else { Some(query_key) })
781	}
782
783	pub fn is_tombstone(&self, index: u64, log: &impl LogQuery) -> Result<bool> {
784		let mut buf = PartialKeyEntry::new_uninit();
785		let buf = if log.value(self.id, index, buf.as_mut()) {
786			&mut buf
787		} else {
788			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
789			&mut buf
790		};
791		Ok(buf.is_tombstone())
792	}
793
794	pub fn read_next_free(&self, index: u64, log: &LogWriter) -> Result<u64> {
795		let mut buf = PartialEntry::new_uninit();
796		let filled = self.filled.load(Ordering::Relaxed);
797		if !log.value(self.id, index, buf.as_mut()) {
798			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
799		}
800		buf.skip_size();
801		let next = buf.read_next();
802		if next >= filled {
803			return Err(crate::error::Error::Corruption(format!(
804				"Bad removed ref {} out of {}",
805				next, filled
806			)))
807		}
808		Ok(next)
809	}
810
811	pub fn read_next_part(&self, index: u64, log: &LogWriter) -> Result<Option<u64>> {
812		let mut buf = PartialEntry::new_uninit();
813		if !log.value(self.id, index, buf.as_mut()) {
814			self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
815		}
816		if self.multipart && buf.is_multi(self.db_version) {
817			buf.skip_size();
818			if self.db_version > 8 && buf.is_multihead() {
819				buf.skip_u64();
820			}
821			let next = buf.read_next();
822			return Ok(Some(next))
823		}
824		Ok(None)
825	}
826
827	pub fn next_free(&self, log: &mut LogWriter) -> Result<u64> {
828		let free_entries_guard = if let Some(free_entries) = &self.free_entries {
829			Some(free_entries.write())
830		} else {
831			None
832		};
833
834		let filled = self.filled.load(Ordering::Relaxed);
835		let last_removed = self.last_removed.load(Ordering::Relaxed);
836		let index = if last_removed != 0 {
837			let next_removed = self.read_next_free(last_removed, log)?;
838			log::trace!(
839				target: "parity-db",
840				"{}: Inserting into removed slot {}",
841				self.id,
842				last_removed,
843			);
844			self.last_removed.store(next_removed, Ordering::Relaxed);
845			if let Some(mut free_entries) = free_entries_guard {
846				let last = free_entries.stack.pop().unwrap();
847				debug_assert_eq!(last, last_removed);
848			}
849			last_removed
850		} else {
851			log::trace!(
852				target: "parity-db",
853				"{}: Inserting into new slot {}",
854				self.id,
855				filled,
856			);
857			self.filled.store(filled + 1, Ordering::Relaxed);
858			filled
859		};
860		self.dirty_header.store(true, Ordering::Relaxed);
861		Ok(index)
862	}
863
864	pub fn claim_entries(&self, num: usize) -> Result<Vec<u64>> {
865		match &self.free_entries {
866			Some(free_entries) => {
867				let mut free_entries = free_entries.write();
868
869				let mut entries: Vec<u64> = Default::default();
870
871				for _i in 0..num {
872					let filled = self.filled.load(Ordering::Relaxed);
873					let last_removed = self.last_removed.load(Ordering::Relaxed);
874					let index = if last_removed != 0 {
875						let last = free_entries.stack.pop().unwrap();
876						debug_assert_eq!(last, last_removed);
877
878						let next_removed = *free_entries.stack.last().unwrap_or(&0u64);
879
880						self.last_removed.store(next_removed, Ordering::Relaxed);
881						last_removed
882					} else {
883						self.filled.store(filled + 1, Ordering::Relaxed);
884						filled
885					};
886					entries.push(index);
887				}
888				self.dirty_header.store(true, Ordering::Relaxed);
889
890				Ok(entries)
891			},
892			None =>
893				return Err(crate::error::Error::InvalidConfiguration(format!(
894					"claim_entries called without free_entries"
895				))),
896		}
897	}
898
899	fn overwrite_chain(
900		&self,
901		key: &TableKey,
902		value: &[u8],
903		log: &mut LogWriter,
904		at: Option<u64>,
905		claimed: bool,
906		compressed: bool,
907	) -> Result<u64> {
908		let mut remainder = value.len() + self.ref_size() + key.encoded_size();
909		let mut offset = 0;
910		let mut start = 0;
911		assert!(self.multipart || value.len() <= self.value_size(key).unwrap() as usize);
912		let (mut index, mut follow) = match at {
913			Some(index) => (index, !claimed),
914			None => (self.next_free(log)?, false),
915		};
916		loop {
917			let mut next_index = 0;
918			if follow {
919				// check existing link
920				match self.read_next_part(index, log)? {
921					Some(next) => {
922						next_index = next;
923					},
924					None => {
925						follow = false;
926					},
927				}
928			}
929			log::trace!(
930				target: "parity-db",
931				"{}: Writing slot {}: {}",
932				self.id,
933				index,
934				key,
935			);
936			let mut buf = FullEntry::new_uninit_full_entry();
937			let mut free_space = self.entry_size as usize - SIZE_SIZE;
938			let value_len = if remainder > free_space {
939				if !follow {
940					next_index = self.next_free(log)?
941				}
942				if start == 0 {
943					if compressed {
944						buf.write_multihead_compressed();
945					} else {
946						buf.write_multihead();
947					}
948					if self.db_version > 8 {
949						free_space -= 8;
950						buf.write_u64(value.len() as u64);
951					}
952				} else {
953					buf.write_multipart();
954				}
955				buf.write_next(next_index);
956				free_space - INDEX_SIZE
957			} else {
958				buf.write_size(remainder as u16, compressed);
959				remainder
960			};
961			let init_offset = buf.offset();
962			if offset == 0 {
963				if self.ref_counted {
964					// First reference.
965					buf.write_rc(1u32);
966				}
967				key.write(&mut buf);
968			}
969			let written = buf.offset() - init_offset;
970			buf.write_slice(&value[offset..offset + value_len - written]);
971			offset += value_len - written;
972			log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
973			remainder -= value_len;
974			if start == 0 {
975				start = index;
976			}
977			index = next_index;
978			if remainder == 0 {
979				if index != 0 {
980					// End of new entry. Clear the remaining tail and exit
981					self.clear_chain(index, log)?;
982				}
983				break
984			}
985		}
986
987		Ok(start)
988	}
989
990	fn clear_chain(&self, mut index: u64, log: &mut LogWriter) -> Result<()> {
991		loop {
992			match self.read_next_part(index, log)? {
993				Some(next) => {
994					self.clear_slot(index, log)?;
995					index = next;
996				},
997				None => {
998					self.clear_slot(index, log)?;
999					return Ok(())
1000				},
1001			}
1002		}
1003	}
1004
1005	fn clear_slot(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1006		let free_entries_guard = if let Some(free_entries) = &self.free_entries {
1007			Some(free_entries.write())
1008		} else {
1009			None
1010		};
1011
1012		let last_removed = self.last_removed.load(Ordering::Relaxed);
1013		log::trace!(
1014			target: "parity-db",
1015			"{}: Freeing slot {}",
1016			self.id,
1017			index,
1018		);
1019
1020		let mut buf = PartialEntry::new_uninit();
1021		buf.write_tombstone();
1022		buf.write_next(last_removed);
1023
1024		log.insert_value(self.id, index, buf[0..buf.offset()].to_vec());
1025		self.last_removed.store(index, Ordering::Relaxed);
1026		self.dirty_header.store(true, Ordering::Relaxed);
1027
1028		if let Some(mut free_entries) = free_entries_guard {
1029			free_entries.stack.push(index);
1030		}
1031
1032		Ok(())
1033	}
1034
1035	pub fn write_insert_plan(
1036		&self,
1037		key: &TableKey,
1038		value: &[u8],
1039		log: &mut LogWriter,
1040		compressed: bool,
1041	) -> Result<u64> {
1042		self.overwrite_chain(key, value, log, None, false, compressed)
1043	}
1044
1045	pub fn write_replace_plan(
1046		&self,
1047		index: u64,
1048		key: &TableKey,
1049		value: &[u8],
1050		log: &mut LogWriter,
1051		compressed: bool,
1052	) -> Result<()> {
1053		self.overwrite_chain(key, value, log, Some(index), false, compressed)?;
1054		Ok(())
1055	}
1056
1057	pub fn write_claimed_plan(
1058		&self,
1059		index: u64,
1060		key: &TableKey,
1061		value: &[u8],
1062		log: &mut LogWriter,
1063		compressed: bool,
1064	) -> Result<()> {
1065		self.overwrite_chain(key, value, log, Some(index), true, compressed)?;
1066		Ok(())
1067	}
1068
1069	pub fn write_remove_plan(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1070		if self.multipart {
1071			self.clear_chain(index, log)?;
1072		} else {
1073			self.clear_slot(index, log)?;
1074		}
1075		Ok(())
1076	}
1077
1078	pub fn write_inc_ref(&self, index: u64, log: &mut LogWriter) -> Result<()> {
1079		self.change_ref(index, 1, log)?;
1080		Ok(())
1081	}
1082
1083	pub fn write_dec_ref(&self, index: u64, log: &mut LogWriter) -> Result<bool> {
1084		if self.change_ref(index, -1, log)? {
1085			return Ok(true)
1086		}
1087		self.write_remove_plan(index, log)?;
1088		Ok(false)
1089	}
1090
1091	pub fn change_ref(&self, index: u64, delta: i32, log: &mut LogWriter) -> Result<bool> {
1092		let mut buf = FullEntry::new_uninit_full_entry();
1093		let buf = if log.value(self.id, index, buf.as_mut()) {
1094			&mut buf
1095		} else {
1096			self.file
1097				.read_at(&mut buf[0..self.entry_size as usize], index * self.entry_size as u64)?;
1098			&mut buf
1099		};
1100
1101		if buf.is_tombstone() {
1102			return Ok(false)
1103		}
1104
1105		let size = if self.multipart && buf.is_multi(self.db_version) {
1106			buf.skip_size();
1107			if self.db_version > 8 && buf.is_multihead() {
1108				buf.skip_u64();
1109			}
1110			buf.skip_next();
1111			self.entry_size as usize
1112		} else {
1113			let (size, _compressed) = buf.read_size();
1114			buf.offset() + size as usize
1115		};
1116
1117		let rc_offset = buf.offset();
1118		let mut counter = buf.read_rc();
1119		if delta > 0 {
1120			if counter >= LOCKED_REF - delta as u32 {
1121				counter = LOCKED_REF
1122			} else {
1123				counter += delta as u32;
1124			}
1125		} else if counter != LOCKED_REF {
1126			counter = counter.saturating_sub(-delta as u32);
1127			if counter == 0 {
1128				return Ok(false)
1129			}
1130		}
1131
1132		buf.set_offset(rc_offset);
1133		buf.write_rc(counter);
1134		// TODO: optimize actual buf size
1135		log.insert_value(self.id, index, buf[0..size].to_vec());
1136		Ok(true)
1137	}
1138
1139	pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
1140		while index >= self.file.capacity.load(Ordering::Relaxed) {
1141			self.file.grow(self.entry_size)?;
1142		}
1143		if index == 0 {
1144			let mut header = Header::default();
1145			log.read(&mut header.0)?;
1146			self.file.write_at(&header.0, 0)?;
1147			self.written.store(header.filled(), Ordering::Relaxed);
1148			log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
1149			return Ok(())
1150		}
1151
1152		let mut buf = FullEntry::new_uninit_full_entry();
1153		log.read(&mut buf[0..SIZE_SIZE])?;
1154		if buf.is_tombstone() {
1155			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
1156			self.file
1157				.write_at(&buf[0..SIZE_SIZE + INDEX_SIZE], index * (self.entry_size as u64))?;
1158			log::trace!(target: "parity-db", "{}: Enacted tombstone in slot {}", self.id, index);
1159		} else if self.multipart && buf.is_multi(self.db_version) {
1160			let entry_size = self.entry_size as usize;
1161			log.read(&mut buf[SIZE_SIZE..entry_size])?;
1162			self.file.write_at(&buf[0..entry_size], index * (entry_size as u64))?;
1163			log::trace!(target: "parity-db", "{}: Enacted multipart in slot {}", self.id, index);
1164		} else {
1165			let (len, _compressed) = buf.read_size();
1166			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
1167			self.file
1168				.write_at(&buf[0..(SIZE_SIZE + len as usize)], index * (self.entry_size as u64))?;
1169			log::trace!(target: "parity-db", "{}: Enacted {}: {}, {} bytes", self.id, index, hex(&buf.1[6..32]), len);
1170		}
1171		Ok(())
1172	}
1173
1174	pub fn validate_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
1175		if index == 0 {
1176			let mut header = Header::default();
1177			log.read(&mut header.0)?;
1178			// TODO: sanity check last_removed and filled
1179			return Ok(())
1180		}
1181		let mut buf = FullEntry::new_uninit_full_entry();
1182		log.read(&mut buf[0..SIZE_SIZE])?;
1183		if buf.is_tombstone() {
1184			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + INDEX_SIZE])?;
1185			log::trace!(target: "parity-db", "{}: Validated tombstone in slot {}", self.id, index);
1186		} else if self.multipart && buf.is_multi(self.db_version) {
1187			let entry_size = self.entry_size as usize;
1188			log.read(&mut buf[SIZE_SIZE..entry_size])?;
1189			log::trace!(target: "parity-db", "{}: Validated multipart in slot {}", self.id, index);
1190		} else {
1191			// TODO: check len
1192			let (len, _compressed) = buf.read_size();
1193			log.read(&mut buf[SIZE_SIZE..SIZE_SIZE + len as usize])?;
1194			log::trace!(target: "parity-db", "{}: Validated {}: {}, {} bytes", self.id, index, hex(&buf[SIZE_SIZE..32]), len);
1195		}
1196		Ok(())
1197	}
1198
1199	pub fn refresh_metadata(&self) -> Result<()> {
1200		if self.file.map.read().is_none() {
1201			return Ok(())
1202		}
1203		let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1204			Some(free_entries.write())
1205		} else {
1206			None
1207		};
1208		let mut header = Header::default();
1209		self.file.read_at(&mut header.0, 0)?;
1210		let last_removed = header.last_removed();
1211		let mut filled = header.filled();
1212		if filled == 0 {
1213			filled = 1;
1214		}
1215		self.last_removed.store(last_removed, Ordering::Relaxed);
1216		self.filled.store(filled, Ordering::Relaxed);
1217		self.written.store(filled, Ordering::Relaxed);
1218		Ok(())
1219	}
1220
1221	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1222		let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1223			Some(free_entries.read())
1224		} else {
1225			None
1226		};
1227		if let Ok(true) =
1228			self.dirty_header
1229				.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
1230		{
1231			// last_removed or filled pointers were modified. Add them to the log
1232			let mut buf = Header::default();
1233			let last_removed = self.last_removed.load(Ordering::Relaxed);
1234			let filled = self.filled.load(Ordering::Relaxed);
1235			buf.set_last_removed(last_removed);
1236			buf.set_filled(filled);
1237			log.insert_value(self.id, 0, buf.0.to_vec());
1238		}
1239		Ok(())
1240	}
1241
1242	pub fn flush(&self) -> Result<()> {
1243		self.file.flush()
1244	}
1245
1246	fn ref_size(&self) -> usize {
1247		if self.ref_counted {
1248			REFS_SIZE
1249		} else {
1250			0
1251		}
1252	}
1253
1254	pub fn iter_while(
1255		&self,
1256		log: &impl LogQuery,
1257		mut f: impl FnMut(u64, u32, Vec<u8>, bool) -> bool,
1258	) -> Result<()> {
1259		let written = self.written.load(Ordering::Relaxed);
1260		for index in 1..written {
1261			let mut _fetch_key = Default::default();
1262			match self.read(&mut TableKeyQuery::Fetch(Some(&mut _fetch_key)), index, log) {
1263				Ok((rc, compressed, value)) =>
1264					if rc > 0 && !f(index, rc, value, compressed) {
1265						break
1266					},
1267				Err(crate::error::Error::InvalidValueData) => (), // ignore, can be external index.
1268				Err(e) => return Err(e),
1269			}
1270		}
1271		Ok(())
1272	}
1273
1274	pub fn is_init(&self) -> bool {
1275		self.file.map.read().is_some()
1276	}
1277
1278	pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> {
1279		if let Err(e) = self.do_init_with_entry(entry) {
1280			log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path.display());
1281			let _ = self.file.remove(); // We ignore error here
1282			return Err(e)
1283		}
1284		Ok(())
1285	}
1286
1287	fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> {
1288		self.file.grow(self.entry_size)?;
1289
1290		let empty_overlays = RwLock::new(LogOverlays::with_columns(0));
1291		let mut log = LogWriter::new(&empty_overlays, 0);
1292		let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false, false)?;
1293		self.complete_plan(&mut log)?;
1294		assert_eq!(at, 1);
1295		let log = log.drain();
1296		let change = log.local_values_changes(self.id).expect("entry written above");
1297		for (at, (_rec_id, entry)) in change.map.iter() {
1298			self.file.write_at(entry.as_slice(), *at * (self.entry_size as u64))?;
1299		}
1300		Ok(())
1301	}
1302
1303	/// Validate free records sequence.
1304	pub fn check_free_refs(&self) -> Result<u64> {
1305		let _free_entries_guard = if let Some(free_entries) = &self.free_entries {
1306			Some(free_entries.read())
1307		} else {
1308			None
1309		};
1310		let written = self.written.load(Ordering::Relaxed);
1311		let mut next = self.last_removed.load(Ordering::Relaxed);
1312		let mut len = 0;
1313		while next != 0 {
1314			if next >= written {
1315				return Err(crate::error::Error::Corruption(format!(
1316					"Bad removed ref {} out of {}",
1317					next, written
1318				)))
1319			}
1320			let mut buf = PartialEntry::new_uninit();
1321			self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
1322			buf.skip_size();
1323			next = buf.read_next();
1324			len += 1;
1325		}
1326		Ok(len)
1327	}
1328
1329	pub fn get_num_entries(&self) -> Result<u64> {
1330		if let Some(free_entries) = &self.free_entries {
1331			let free_entries = free_entries.read();
1332			let filled = self.filled.load(Ordering::Relaxed);
1333			let num_free = free_entries.stack.len();
1334			let num = (filled - 1) - num_free as u64;
1335			if num > 0 && self.multipart {
1336				// TODO: Need to implement this.
1337				return Err(crate::error::Error::InvalidConfiguration(format!(
1338					"Unable to determine number of multipart entries"
1339				)))
1340			}
1341			return Ok(num)
1342		}
1343		Err(crate::error::Error::InvalidConfiguration(format!(
1344			"Unable to determine number of entries"
1345		)))
1346	}
1347}
1348
1349pub mod key {
1350	use super::{EntryRef, FullEntry};
1351	use crate::Key;
1352
1353	pub const PARTIAL_SIZE: usize = 26;
1354
1355	pub fn partial_key(hash: &Key) -> &[u8] {
1356		&hash[6..]
1357	}
1358
1359	pub enum TableKey {
1360		Partial(Key),
1361		NoHash,
1362	}
1363
1364	impl TableKey {
1365		pub fn encoded_size(&self) -> usize {
1366			match self {
1367				TableKey::Partial(_) => PARTIAL_SIZE,
1368				TableKey::NoHash => 0,
1369			}
1370		}
1371
1372		pub fn index_from_partial(partial: &[u8]) -> u64 {
1373			u64::from_be_bytes((partial[0..8]).try_into().unwrap())
1374		}
1375
1376		pub fn compare(&self, fetch: &Option<[u8; PARTIAL_SIZE]>) -> bool {
1377			match (self, fetch) {
1378				(TableKey::Partial(k), Some(fetch)) => partial_key(k) == fetch,
1379				(TableKey::NoHash, _) => true,
1380				_ => false,
1381			}
1382		}
1383
1384		pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> [u8; PARTIAL_SIZE] {
1385			let mut result = [0u8; PARTIAL_SIZE];
1386			let pks = buf.read_partial();
1387			result.copy_from_slice(pks);
1388			result
1389		}
1390
1391		pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Option<[u8; PARTIAL_SIZE]> {
1392			match self {
1393				TableKey::Partial(_k) => Some(Self::fetch_partial(buf)),
1394				TableKey::NoHash => None,
1395			}
1396		}
1397
1398		pub fn write(&self, buf: &mut FullEntry) {
1399			match self {
1400				TableKey::Partial(k) => {
1401					buf.write_slice(partial_key(k));
1402				},
1403				TableKey::NoHash => (),
1404			}
1405		}
1406	}
1407
1408	impl std::fmt::Display for TableKey {
1409		fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1410			match self {
1411				TableKey::Partial(k) => write!(f, "{}", crate::display::hex(k)),
1412				TableKey::NoHash => write!(f, "no_hash"),
1413			}
1414		}
1415	}
1416
1417	pub enum TableKeyQuery<'a> {
1418		Check(&'a TableKey),
1419		Fetch(Option<&'a mut [u8; PARTIAL_SIZE]>),
1420	}
1421}
1422
1423#[cfg(test)]
1424mod test {
1425	const ENTRY_SIZE: u16 = 64;
1426
1427	use super::{TableId, Value, ValueTable, MULTIPART_ENTRY_SIZE};
1428	use crate::{
1429		log::{Log, LogAction, LogWriter},
1430		options::{ColumnOptions, Options, CURRENT_VERSION},
1431		table::key::TableKey,
1432		Key,
1433	};
1434	use std::sync::{atomic::Ordering, Arc};
1435	use tempfile::{tempdir, TempDir};
1436
1437	fn new_table(dir: &TempDir, size: Option<u16>, options: &ColumnOptions) -> ValueTable {
1438		let id = TableId::new(0, 0);
1439		ValueTable::open(Arc::new(dir.path().to_path_buf()), id, size, options, CURRENT_VERSION)
1440			.unwrap()
1441	}
1442
1443	fn new_log(dir: &TempDir) -> Log {
1444		let options = Options::with_columns(dir.path(), 1);
1445		Log::open(&options).unwrap()
1446	}
1447
1448	fn write_ops<F: FnOnce(&mut LogWriter)>(table: &ValueTable, log: &Log, f: F) {
1449		let mut writer = log.begin_record();
1450		f(&mut writer);
1451		let bytes_written = log.end_record(writer.drain()).unwrap();
1452		let _ = log.read_next(false);
1453		log.flush_one(0).unwrap();
1454		let mut reader = log.read_next(false).unwrap().unwrap();
1455		loop {
1456			match reader.next().unwrap() {
1457				LogAction::BeginRecord |
1458				LogAction::InsertIndex { .. } |
1459				LogAction::InsertRefCount { .. } |
1460				LogAction::DropTable { .. } |
1461				LogAction::DropRefCountTable { .. } => {
1462					panic!("Unexpected log entry");
1463				},
1464				LogAction::EndRecord => {
1465					let bytes_read = reader.read_bytes();
1466					assert_eq!(bytes_written, bytes_read);
1467					break
1468				},
1469				LogAction::InsertValue(insertion) => {
1470					table.enact_plan(insertion.index, &mut reader).unwrap();
1471				},
1472			}
1473		}
1474	}
1475
1476	fn key(k: u32) -> Key {
1477		use blake2::{digest::typenum::U32, Blake2b, Digest};
1478		let mut key = Key::default();
1479		let hash = Blake2b::<U32>::digest(k.to_le_bytes());
1480		key.copy_from_slice(&hash);
1481		key
1482	}
1483
1484	fn simple_key(k: Key) -> TableKey {
1485		TableKey::Partial(k)
1486	}
1487
1488	fn no_hash(_: Key) -> TableKey {
1489		TableKey::NoHash
1490	}
1491
1492	fn value(size: usize) -> Value {
1493		use rand::RngCore;
1494		let mut result = vec![0; size];
1495		rand::rng().fill_bytes(&mut result);
1496		result
1497	}
1498
1499	fn rc_options() -> ColumnOptions {
1500		ColumnOptions { ref_counted: true, ..Default::default() }
1501	}
1502
1503	#[test]
1504	fn insert_simple() {
1505		insert_simple_inner(&Default::default());
1506		insert_simple_inner(&rc_options());
1507	}
1508	fn insert_simple_inner(options: &ColumnOptions) {
1509		let dir = tempdir().unwrap();
1510		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1511		let log = new_log(&dir);
1512
1513		let key = key(1);
1514		let key = TableKey::Partial(key);
1515		let key = &key;
1516		let val = value(19);
1517		let compressed = true;
1518
1519		write_ops(&table, &log, |writer| {
1520			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1521			assert_eq!(table.get(key, 1, writer).unwrap(), Some((val.clone(), compressed)));
1522		});
1523
1524		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1525		assert_eq!(table.filled.load(Ordering::Relaxed), 2);
1526	}
1527
1528	#[test]
1529	#[should_panic(expected = "assertion failed: entry_size <= MAX_ENTRY_SIZE as u16")]
1530	fn oversized_into_fixed_panics() {
1531		let dir = tempdir().unwrap();
1532		let _table = new_table(&dir, Some(65534), &Default::default());
1533	}
1534
1535	#[test]
1536	fn remove_simple() {
1537		remove_simple_inner(&Default::default());
1538		remove_simple_inner(&rc_options());
1539	}
1540	fn remove_simple_inner(options: &ColumnOptions) {
1541		let dir = tempdir().unwrap();
1542		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1543		let log = new_log(&dir);
1544
1545		let key1 = key(1);
1546		let key1 = &TableKey::Partial(key1);
1547		let key2 = key(2);
1548		let key2 = &TableKey::Partial(key2);
1549		let val1 = value(11);
1550		let val2 = value(21);
1551		let compressed = false;
1552
1553		write_ops(&table, &log, |writer| {
1554			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1555			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1556		});
1557
1558		write_ops(&table, &log, |writer| {
1559			table.write_remove_plan(1, writer).unwrap();
1560		});
1561
1562		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), None);
1563		assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1564
1565		write_ops(&table, &log, |writer| {
1566			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1567		});
1568		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1569		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1570	}
1571
1572	#[test]
1573	fn replace_simple() {
1574		replace_simple_inner(&Default::default(), simple_key);
1575		replace_simple_inner(&rc_options(), simple_key);
1576		replace_simple_inner(&Default::default(), no_hash);
1577		replace_simple_inner(&rc_options(), no_hash);
1578	}
1579	fn replace_simple_inner(options: &ColumnOptions, table_key: fn(Key) -> TableKey) {
1580		let dir = tempdir().unwrap();
1581		let table = new_table(&dir, Some(ENTRY_SIZE), options);
1582		let log = new_log(&dir);
1583
1584		let key1 = key(1);
1585		let key1 = &table_key(key1);
1586		let key2 = key(2);
1587		let key2 = &table_key(key2);
1588		let val1 = value(11);
1589		let val2 = value(21);
1590		let val3 = value(26); // max size for full hash and rc
1591		let compressed = true;
1592
1593		write_ops(&table, &log, |writer| {
1594			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1595			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1596		});
1597
1598		write_ops(&table, &log, |writer| {
1599			table.write_replace_plan(1, key2, &val3, writer, false).unwrap();
1600		});
1601
1602		assert_eq!(table.get(key2, 1, log.overlays()).unwrap(), Some((val3, false)));
1603		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1604	}
1605
1606	#[test]
1607	fn replace_multipart_shorter() {
1608		replace_multipart_shorter_inner(&Default::default());
1609		replace_multipart_shorter_inner(&rc_options());
1610	}
1611	fn replace_multipart_shorter_inner(options: &ColumnOptions) {
1612		let dir = tempdir().unwrap();
1613		let table = new_table(&dir, None, options);
1614		let log = new_log(&dir);
1615
1616		let key1 = key(1);
1617		let key1 = &TableKey::Partial(key1);
1618		let key2 = key(2);
1619		let key2 = &TableKey::Partial(key2);
1620		let val1 = value(20000);
1621		let val2 = value(30);
1622		let val1s = value(5000);
1623		let compressed = false;
1624
1625		write_ops(&table, &log, |writer| {
1626			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1627			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1628		});
1629
1630		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1631		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1632		assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1633
1634		write_ops(&table, &log, |writer| {
1635			table.write_replace_plan(1, key1, &val1s, writer, compressed).unwrap();
1636		});
1637		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1s, compressed)));
1638		assert_eq!(table.last_removed.load(Ordering::Relaxed), 5);
1639		write_ops(&table, &log, |writer| {
1640			assert_eq!(table.read_next_free(5, writer).unwrap(), 4);
1641			assert_eq!(table.read_next_free(4, writer).unwrap(), 3);
1642			assert_eq!(table.read_next_free(3, writer).unwrap(), 0);
1643		});
1644	}
1645
1646	#[test]
1647	fn replace_multipart_longer() {
1648		replace_multipart_longer_inner(&Default::default());
1649		replace_multipart_longer_inner(&rc_options());
1650	}
1651	fn replace_multipart_longer_inner(options: &ColumnOptions) {
1652		let dir = tempdir().unwrap();
1653		let table = new_table(&dir, None, options);
1654		let log = new_log(&dir);
1655
1656		let key1 = key(1);
1657		let key1 = &TableKey::Partial(key1);
1658		let key2 = key(2);
1659		let key2 = &TableKey::Partial(key2);
1660		let val1 = value(5000);
1661		let val2 = value(30);
1662		let val1l = value(20000);
1663		let compressed = false;
1664
1665		write_ops(&table, &log, |writer| {
1666			table.write_insert_plan(key1, &val1, writer, compressed).unwrap();
1667			table.write_insert_plan(key2, &val2, writer, compressed).unwrap();
1668		});
1669
1670		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1, compressed)));
1671		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1672		assert_eq!(table.filled.load(Ordering::Relaxed), 4);
1673
1674		write_ops(&table, &log, |writer| {
1675			table.write_replace_plan(1, key1, &val1l, writer, compressed).unwrap();
1676		});
1677		assert_eq!(table.get(key1, 1, log.overlays()).unwrap(), Some((val1l, compressed)));
1678		assert_eq!(table.last_removed.load(Ordering::Relaxed), 0);
1679		assert_eq!(table.filled.load(Ordering::Relaxed), 7);
1680	}
1681
1682	#[test]
1683	fn ref_counting() {
1684		for compressed in [false, true] {
1685			let dir = tempdir().unwrap();
1686			let table = new_table(&dir, None, &rc_options());
1687			let log = new_log(&dir);
1688
1689			let key = key(1);
1690			let key = &TableKey::Partial(key);
1691			let val = value(5000);
1692
1693			write_ops(&table, &log, |writer| {
1694				table.write_insert_plan(key, &val, writer, compressed).unwrap();
1695				table.write_inc_ref(1, writer).unwrap();
1696			});
1697			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val.clone(), compressed)));
1698			write_ops(&table, &log, |writer| {
1699				table.write_dec_ref(1, writer).unwrap();
1700			});
1701			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1702			write_ops(&table, &log, |writer| {
1703				table.write_dec_ref(1, writer).unwrap();
1704			});
1705			assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1706		}
1707	}
1708
1709	#[test]
1710	fn iteration() {
1711		for multipart in [false, true] {
1712			for compressed in [false, true] {
1713				let (entry_size, size_mul) =
1714					if multipart { (None, 100) } else { (Some(MULTIPART_ENTRY_SIZE / 2), 1) };
1715
1716				let dir = tempdir().unwrap();
1717				let table = new_table(&dir, entry_size, &rc_options());
1718				let log = new_log(&dir);
1719
1720				let (v1, v2, v3) = (
1721					value(MULTIPART_ENTRY_SIZE as usize / 8 * size_mul),
1722					value(MULTIPART_ENTRY_SIZE as usize / 4 * size_mul),
1723					value(MULTIPART_ENTRY_SIZE as usize * 3 / 8 * size_mul),
1724				);
1725				let entries = [
1726					(TableKey::Partial(key(1)), &v1),
1727					(TableKey::Partial(key(2)), &v2),
1728					(TableKey::Partial(key(3)), &v3),
1729				];
1730
1731				write_ops(&table, &log, |writer| {
1732					for (k, v) in &entries {
1733						table.write_insert_plan(k, &v, writer, compressed).unwrap();
1734					}
1735					table.complete_plan(writer).unwrap();
1736				});
1737
1738				let mut res = Vec::new();
1739				table
1740					.iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1741						res.push((v.len(), cmpr));
1742						true
1743					})
1744					.unwrap();
1745				assert_eq!(
1746					res,
1747					vec![(v1.len(), compressed), (v2.len(), compressed), (v3.len(), compressed)]
1748				);
1749
1750				let v2_index = 2 + v1.len() as u64 / super::MULTIPART_ENTRY_SIZE as u64;
1751				write_ops(&table, &log, |writer| {
1752					table.write_remove_plan(v2_index, writer).unwrap();
1753				});
1754
1755				let mut res = Vec::new();
1756				table
1757					.iter_while(log.overlays(), |_index, _rc, v, cmpr| {
1758						res.push((v.len(), cmpr));
1759						true
1760					})
1761					.unwrap();
1762				assert_eq!(res, vec![(v1.len(), compressed), (v3.len(), compressed)]);
1763			}
1764		}
1765	}
1766
1767	#[test]
1768	fn ref_underflow() {
1769		let dir = tempdir().unwrap();
1770		let table = new_table(&dir, Some(ENTRY_SIZE), &rc_options());
1771		let log = new_log(&dir);
1772
1773		let key = key(1);
1774		let key = &TableKey::Partial(key);
1775		let val = value(10);
1776
1777		let compressed = false;
1778		write_ops(&table, &log, |writer| {
1779			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1780			table.write_inc_ref(1, writer).unwrap();
1781		});
1782		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1783		write_ops(&table, &log, |writer| {
1784			table.write_dec_ref(1, writer).unwrap();
1785			table.write_dec_ref(1, writer).unwrap();
1786			table.write_dec_ref(1, writer).unwrap();
1787		});
1788		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), None);
1789	}
1790
1791	#[test]
1792	fn multipart_collision() {
1793		use super::MAX_ENTRY_SIZE;
1794		let mut entry = super::Entry::new(super::MULTIPART.to_vec());
1795		let size = entry.read_size().0 as usize;
1796		assert!(size > MAX_ENTRY_SIZE);
1797		let mut entry = super::Entry::new(super::MULTIHEAD.to_vec());
1798		let size = entry.read_size().0 as usize;
1799		assert!(size > MAX_ENTRY_SIZE);
1800		let mut entry = super::Entry::new(super::MULTIHEAD_COMPRESSED.to_vec());
1801		let size = entry.read_size().0 as usize;
1802		assert!(size > MAX_ENTRY_SIZE);
1803		let dir = tempdir().unwrap();
1804		let table = new_table(&dir, Some(MAX_ENTRY_SIZE as u16), &rc_options());
1805		let log = new_log(&dir);
1806
1807		let key = key(1);
1808		let key = &TableKey::Partial(key);
1809		let val = value(32225); // This result in 0x7dff entry size, which conflicts with v4 multipart definition
1810
1811		let compressed = true;
1812		write_ops(&table, &log, |writer| {
1813			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1814		});
1815		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1816		write_ops(&table, &log, |writer| {
1817			table.write_dec_ref(1, writer).unwrap();
1818		});
1819		assert_eq!(table.last_removed.load(Ordering::Relaxed), 1);
1820
1821		// Check that max entry size values are OK.
1822		let value_size = table.value_size(key).unwrap();
1823		assert_eq!(0x7fd8, table.value_size(key).unwrap()); // Max value size for this configuration.
1824		let val = value(value_size as usize); // This result in 0x7ff8 entry size.
1825		write_ops(&table, &log, |writer| {
1826			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1827		});
1828		assert_eq!(table.get(key, 1, log.overlays()).unwrap(), Some((val, compressed)));
1829	}
1830
1831	#[test]
1832	fn bad_size_header() {
1833		let dir = tempdir().unwrap();
1834		let table = new_table(&dir, Some(36), &rc_options());
1835		let log = new_log(&dir);
1836
1837		let key = &TableKey::Partial(key(1));
1838		let val = value(4);
1839
1840		let compressed = false;
1841		write_ops(&table, &log, |writer| {
1842			table.write_insert_plan(key, &val, writer, compressed).unwrap();
1843		});
1844		// Corrupt entry 1
1845		let zeroes = [0u8, 0u8];
1846		table.file.write_at(&zeroes, table.entry_size as u64).unwrap();
1847		let log = new_log(&dir);
1848		assert!(matches!(
1849			table.get(key, 1, log.overlays()),
1850			Err(crate::error::Error::Corruption(_))
1851		));
1852	}
1853}