parity_db/
column.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{
5	btree::BTreeTable,
6	compress::Compress,
7	db::{check::CheckDisplay, NodeChange, Operation, RcValue},
8	display::hex,
9	error::{try_io, Error, Result},
10	index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
11	log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
12	multitree::{Children, NewNode, NodeAddress, NodeRef},
13	options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
14	parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
15	ref_count::{RefCountTable, RefCountTableId},
16	stats::{ColumnStatSummary, ColumnStats},
17	table::{
18		key::{TableKey, TableKeyQuery},
19		TableId as ValueTableId, Value, ValueTable, SIZE_TIERS,
20	},
21	Key,
22};
23use std::{
24	collections::{HashMap, VecDeque},
25	path::PathBuf,
26	sync::{
27		atomic::{AtomicU64, Ordering},
28		Arc,
29	},
30};
31
32pub const MIN_INDEX_BITS: u8 = 16;
33pub const MIN_REF_COUNT_BITS: u8 = 16;
34// Measured in index entries
35const MAX_REINDEX_BATCH: usize = 8192;
36
37pub type ColId = u8;
38pub type Salt = [u8; 32];
39
40// The size tiers follow log distribution. Generated with the following code:
41//
42//{
43//	let mut r = [0u16; SIZE_TIERS - 1];
44//	let  start = MIN_ENTRY_SIZE as f64;
45//	let  end = MAX_ENTRY_SIZE as f64;
46//	let  n_slices = SIZE_TIERS - 1;
47//	let factor = ((end.ln() - start.ln()) / (n_slices - 1) as f64).exp();
48//
49//	let mut s = start;
50//	let mut i = 0;
51//	while i <  n_slices {
52//		r[i] = s.round() as u16;
53//		s = s * factor;
54//		i += 1;
55//	}
56//	r
57//};
58
59const SIZES: [u16; SIZE_TIERS - 1] = [
60	32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 50, 51, 52, 54, 55, 57, 58, 60,
61	62, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 88, 90, 93, 95, 98, 101, 103, 106, 109,
62	112, 115, 119, 122, 125, 129, 132, 136, 140, 144, 148, 152, 156, 160, 165, 169, 174, 179, 183,
63	189, 194, 199, 205, 210, 216, 222, 228, 235, 241, 248, 255, 262, 269, 276, 284, 292, 300, 308,
64	317, 325, 334, 344, 353, 363, 373, 383, 394, 405, 416, 428, 439, 452, 464, 477, 490, 504, 518,
65	532, 547, 562, 577, 593, 610, 627, 644, 662, 680, 699, 718, 738, 758, 779, 801, 823, 846, 869,
66	893, 918, 943, 969, 996, 1024, 1052, 1081, 1111, 1142, 1174, 1206, 1239, 1274, 1309, 1345,
67	1382, 1421, 1460, 1500, 1542, 1584, 1628, 1673, 1720, 1767, 1816, 1866, 1918, 1971, 2025, 2082,
68	2139, 2198, 2259, 2322, 2386, 2452, 2520, 2589, 2661, 2735, 2810, 2888, 2968, 3050, 3134, 3221,
69	3310, 3402, 3496, 3593, 3692, 3794, 3899, 4007, 4118, 4232, 4349, 4469, 4593, 4720, 4850, 4984,
70	5122, 5264, 5410, 5559, 5713, 5871, 6034, 6200, 6372, 6548, 6729, 6916, 7107, 7303, 7506, 7713,
71	7927, 8146, 8371, 8603, 8841, 9085, 9337, 9595, 9860, 10133, 10413, 10702, 10998, 11302, 11614,
72	11936, 12266, 12605, 12954, 13312, 13681, 14059, 14448, 14848, 15258, 15681, 16114, 16560,
73	17018, 17489, 17973, 18470, 18981, 19506, 20046, 20600, 21170, 21756, 22358, 22976, 23612,
74	24265, 24936, 25626, 26335, 27064, 27812, 28582, 29372, 30185, 31020, 31878, 32760,
75];
76
77#[derive(Debug)]
78struct Tables {
79	index: IndexTable,
80	value: Vec<ValueTable>,
81	ref_count: Option<RefCountTable>,
82}
83
84impl Tables {
85	fn get_ref_count(&self) -> &RefCountTable {
86		self.ref_count.as_ref().unwrap()
87	}
88}
89
90#[derive(Debug)]
91enum ReindexEntry {
92	Index(IndexTable),
93	RefCount(RefCountTable),
94}
95
96#[derive(Debug)]
97struct Reindex {
98	queue: VecDeque<ReindexEntry>,
99	progress: AtomicU64,
100}
101
102#[allow(clippy::large_enum_variant)]
103#[derive(Debug)]
104pub enum Column {
105	Hash(HashColumn),
106	Tree(BTreeTable),
107}
108
109#[derive(Debug)]
110pub struct HashColumn {
111	col: ColId,
112	tables: RwLock<Tables>,
113	reindex: RwLock<Reindex>,
114	ref_count_cache: Option<RwLock<HashMap<u64, u64>>>,
115	path: PathBuf,
116	preimage: bool,
117	uniform_keys: bool,
118	collect_stats: bool,
119	ref_counted: bool,
120	append_only: bool,
121	salt: Salt,
122	stats: ColumnStats,
123	compression: Compress,
124	db_version: u32,
125}
126
127#[derive(Clone, Copy)]
128pub struct TablesRef<'a> {
129	pub tables: &'a [ValueTable],
130	pub compression: &'a Compress,
131	pub col: ColId,
132	pub preimage: bool,
133	pub ref_counted: bool,
134}
135
136/// Value iteration state
137pub struct ValueIterState {
138	/// Reference counter.
139	pub rc: u32,
140	/// Value.
141	pub value: Vec<u8>,
142}
143
144// Only used for DB validation and migration.
145pub struct CorruptedIndexEntryInfo {
146	pub chunk_index: u64,
147	pub sub_index: u32,
148	pub entry: crate::index::Entry,
149	pub value_entry: Option<Vec<u8>>,
150	pub error: Option<Error>,
151}
152
153// Only used for DB validation and migration.
154pub struct IterState {
155	pub item_index: u64,
156	pub total_items: u64,
157	pub key: Key,
158	pub rc: u32,
159	pub value: Vec<u8>,
160}
161
162// Only used for DB validation and migration.
163enum IterStateOrCorrupted {
164	Item(IterState),
165	Corrupted(CorruptedIndexEntryInfo),
166}
167
168#[inline]
169pub fn hash_key(key: &[u8], salt: &Salt, uniform: bool, db_version: u32) -> Key {
170	use blake2::{
171		digest::{typenum::U32, FixedOutput, Update},
172		Blake2bMac,
173	};
174
175	let mut k = Key::default();
176	if uniform {
177		if db_version <= 5 {
178			k.copy_from_slice(&key[0..32]);
179		} else if db_version <= 7 {
180			// XOR with salt.
181			let key = &key[0..32];
182			for i in 0..32 {
183				k[i] = key[i] ^ salt[i];
184			}
185		} else {
186			#[cfg(any(test, feature = "instrumentation"))]
187			// Used for forcing collisions in tests.
188			if salt == &Salt::default() {
189				k.copy_from_slice(&key);
190				return k
191			}
192			// siphash 1-3 first 128 bits of the key
193			use siphasher::sip128::Hasher128;
194			use std::hash::Hasher;
195			let mut hasher = siphasher::sip128::SipHasher13::new_with_key(
196				salt[..16].try_into().expect("Salt length is 32"),
197			);
198			hasher.write(&key);
199			let hash = hasher.finish128();
200			k[0..8].copy_from_slice(&hash.h1.to_le_bytes());
201			k[8..16].copy_from_slice(&hash.h2.to_le_bytes());
202			k[16..].copy_from_slice(&key[16..]);
203		}
204	} else {
205		let mut ctx = Blake2bMac::<U32>::new_with_salt_and_personal(salt, &[], &[])
206			.expect("Salt length (32) is a valid key length (<= 64)");
207		ctx.update(key);
208		let hash = ctx.finalize_fixed();
209		k.copy_from_slice(&hash);
210	}
211	k
212}
213
214pub struct ReindexBatch {
215	pub drop_index: Option<IndexTableId>,
216	pub batch: Vec<(Key, Address)>,
217	pub drop_ref_count: Option<RefCountTableId>,
218	pub ref_count_batch: Vec<(Address, u64)>,
219	pub ref_count_batch_source: Option<RefCountTableId>,
220}
221
222impl HashColumn {
223	pub fn get(&self, key: &Key, log: &impl LogQuery) -> Result<Option<(Value, u32)>> {
224		let tables = self.tables.read();
225		let values = self.as_ref(&tables.value);
226		if let Some((tier, rc, value)) = self.get_in_index(key, &tables.index, values, log)? {
227			if self.collect_stats {
228				self.stats.query_hit(tier);
229			}
230			return Ok(Some((value, rc)))
231		}
232		for entry in &self.reindex.read().queue {
233			if let ReindexEntry::Index(r) = entry {
234				if let Some((tier, rc, value)) = self.get_in_index(key, r, values, log)? {
235					if self.collect_stats {
236						self.stats.query_hit(tier);
237					}
238					return Ok(Some((value, rc)))
239				}
240			}
241		}
242		if self.collect_stats {
243			self.stats.query_miss();
244		}
245		Ok(None)
246	}
247
248	pub fn get_size(&self, key: &Key, log: &RwLock<LogOverlays>) -> Result<Option<u32>> {
249		Ok(self.get(key, log)?.map(|(v, _rc)| v.len() as u32))
250	}
251
252	pub fn get_value(&self, address: Address, log: &impl LogQuery) -> Result<Option<Value>> {
253		let tables = self.tables.read();
254		let values = self.as_ref(&tables.value);
255		if let Some((tier, _rc, value)) =
256			Column::get_value(TableKeyQuery::Check(&TableKey::NoHash), address, values, log)?
257		{
258			if self.collect_stats {
259				self.stats.query_hit(tier);
260			}
261			return Ok(Some(value))
262		}
263		if self.collect_stats {
264			self.stats.query_miss();
265		}
266		Ok(None)
267	}
268
269	fn get_in_index(
270		&self,
271		key: &Key,
272		index: &IndexTable,
273		tables: TablesRef,
274		log: &impl LogQuery,
275	) -> Result<Option<(u8, u32, Value)>> {
276		let (mut entry, mut sub_index) = index.get(key, 0, log)?;
277		while !entry.is_empty() {
278			let address = entry.address(index.id.index_bits());
279			let value = Column::get_value(
280				TableKeyQuery::Check(&TableKey::Partial(*key)),
281				address,
282				tables,
283				log,
284			)?;
285			match value {
286				Some(result) => return Ok(Some(result)),
287				None => {
288					let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
289					entry = next_entry;
290					sub_index = next_index;
291				},
292			}
293		}
294		Ok(None)
295	}
296
297	pub fn as_ref<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
298		TablesRef {
299			tables,
300			preimage: self.preimage,
301			col: self.col,
302			ref_counted: self.ref_counted,
303			compression: &self.compression,
304		}
305	}
306}
307
308impl Column {
309	pub fn get_value(
310		mut key: TableKeyQuery,
311		address: Address,
312		tables: TablesRef,
313		log: &impl LogQuery,
314	) -> Result<Option<(u8, u32, Value)>> {
315		let size_tier = address.size_tier() as usize;
316		if let Some((value, compressed, rc)) =
317			tables.tables[size_tier].query(&mut key, address.offset(), log)?
318		{
319			let value = if compressed { tables.compression.decompress(&value)? } else { value };
320			return Ok(Some((size_tier as u8, rc, value)))
321		}
322		Ok(None)
323	}
324
325	pub fn compress(
326		compression: &Compress,
327		key: &TableKey,
328		value: &[u8],
329		tables: &[ValueTable],
330	) -> (Option<Vec<u8>>, usize) {
331		let (len, result) = if value.len() > compression.threshold as usize {
332			let cvalue = compression.compress(value);
333			if cvalue.len() < value.len() {
334				(cvalue.len(), Some(cvalue))
335			} else {
336				(value.len(), None)
337			}
338		} else {
339			(value.len(), None)
340		};
341		let target_tier = tables
342			.iter()
343			.position(|t| t.value_size(key).map_or(false, |s| len <= s as usize));
344		let target_tier = target_tier.unwrap_or_else(|| {
345			log::trace!(target: "parity-db", "Using blob {}", key);
346			tables.len() - 1
347		});
348
349		(result, target_tier)
350	}
351
352	pub fn open(col: ColId, options: &Options, metadata: &Metadata) -> Result<Column> {
353		let path = &options.path;
354		let arc_path = Arc::new(path.clone());
355		let column_options = &metadata.columns[col as usize];
356		let db_version = metadata.version;
357		let value = (0..SIZE_TIERS)
358			.map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version))
359			.collect::<Result<_>>()?;
360
361		if column_options.btree_index {
362			Ok(Column::Tree(BTreeTable::open(col, value, options, metadata)?))
363		} else {
364			Ok(Column::Hash(HashColumn::open(col, value, options, metadata)?))
365		}
366	}
367
368	fn open_table(
369		path: Arc<PathBuf>,
370		col: ColId,
371		tier: u8,
372		options: &ColumnOptions,
373		db_version: u32,
374	) -> Result<ValueTable> {
375		let id = ValueTableId::new(col, tier);
376		let entry_size = SIZES.get(tier as usize).cloned();
377		ValueTable::open(path, id, entry_size, options, db_version)
378	}
379
380	pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
381		// It is not specified how read_dir behaves when deleting and iterating in the same loop
382		// We collect a list of paths to be deleted first.
383		let mut to_delete = Vec::new();
384		for entry in try_io!(std::fs::read_dir(&path)) {
385			let entry = try_io!(entry);
386			if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
387				if crate::index::TableId::is_file_name(column, file) ||
388					crate::table::TableId::is_file_name(column, file) ||
389					crate::ref_count::RefCountTableId::is_file_name(column, file)
390				{
391					to_delete.push(PathBuf::from(file));
392				}
393			}
394		}
395
396		for file in to_delete {
397			let mut path = path.clone();
398			path.push(file);
399			try_io!(std::fs::remove_file(path));
400		}
401		Ok(())
402	}
403}
404
405pub fn packed_node_size(data: &Vec<u8>, num_children: u8) -> usize {
406	data.len() + num_children as usize * 8 + 1
407}
408
409pub fn unpack_node_data(data: Vec<u8>) -> Result<(Vec<u8>, Children)> {
410	if data.len() == 0 {
411		return Err(Error::InvalidValueData)
412	}
413	let num_children = data[data.len() - 1] as usize;
414	let child_buf_len = num_children * 8;
415	if data.len() < (child_buf_len + 1) {
416		return Err(Error::InvalidValueData)
417	}
418	let data_len = data.len() - (child_buf_len + 1);
419	let mut children = Children::with_capacity(num_children);
420	for i in 0..num_children {
421		let node_address =
422			u64::from_le_bytes(data[data_len + i * 8..data_len + (i + 1) * 8].try_into().unwrap());
423		children.push(node_address);
424	}
425	let data = data.split_at(data_len).0.to_vec();
426	Ok((data, children))
427}
428
429pub fn unpack_node_children(data: &[u8]) -> Result<Children> {
430	if data.len() == 0 {
431		return Err(Error::InvalidValueData)
432	}
433	let num_children = data[data.len() - 1] as usize;
434	let child_buf_len = num_children * 8;
435	if data.len() < (child_buf_len + 1) {
436		return Err(Error::InvalidValueData)
437	}
438	let data_len = data.len() - (child_buf_len + 1);
439	let mut children = Children::with_capacity(num_children);
440	for i in 0..num_children {
441		let node_address =
442			u64::from_le_bytes(data[data_len + i * 8..data_len + (i + 1) * 8].try_into().unwrap());
443		children.push(node_address);
444	}
445	Ok(children)
446}
447
448impl HashColumn {
449	fn open(
450		col: ColId,
451		value: Vec<ValueTable>,
452		options: &Options,
453		metadata: &Metadata,
454	) -> Result<HashColumn> {
455		let (index, mut reindexing, stats) = Self::open_index(&options.path, col)?;
456		let collect_stats = options.stats;
457		let path = &options.path;
458		let col_options = &metadata.columns[col as usize];
459		let db_version = metadata.version;
460		let (ref_count, ref_count_cache) = if col_options.multitree && !col_options.append_only {
461			(
462				Some(Self::open_ref_count(&options.path, col, &mut reindexing)?),
463				Some(RwLock::new(Default::default())),
464			)
465		} else {
466			(None, None)
467		};
468		Ok(HashColumn {
469			col,
470			tables: RwLock::new(Tables { index, value, ref_count }),
471			reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }),
472			ref_count_cache,
473			path: path.into(),
474			preimage: col_options.preimage,
475			uniform_keys: col_options.uniform,
476			ref_counted: col_options.ref_counted,
477			append_only: col_options.append_only,
478			collect_stats,
479			salt: metadata.salt,
480			stats,
481			compression: Compress::new(
482				col_options.compression,
483				options
484					.compression_threshold
485					.get(&col)
486					.copied()
487					.unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
488			),
489			db_version,
490		})
491	}
492
493	pub fn init_table_data(&mut self) -> Result<()> {
494		let mut tables = self.tables.write();
495		for table in &mut tables.value {
496			table.init_table_data()?;
497		}
498
499		if let Some(cache) = &self.ref_count_cache {
500			let mut cache = cache.write();
501
502			for entry in &self.reindex.read().queue {
503				if let ReindexEntry::RefCount(table) = entry {
504					for index in 0..table.id.total_chunks() {
505						let entries = table.table_entries(index)?;
506						for entry in entries.iter() {
507							if !entry.is_empty() {
508								cache.insert(entry.address().as_u64(), entry.ref_count());
509							}
510						}
511					}
512				}
513			}
514
515			let table = &tables.ref_count;
516			if let Some(table) = table {
517				for index in 0..table.id.total_chunks() {
518					let entries = table.table_entries(index)?;
519					for entry in entries.iter() {
520						if !entry.is_empty() {
521							cache.insert(entry.address().as_u64(), entry.ref_count());
522						}
523					}
524				}
525			}
526		}
527
528		Ok(())
529	}
530
531	pub fn hash_key(&self, key: &[u8]) -> Key {
532		hash_key(key, &self.salt, self.uniform_keys, self.db_version)
533	}
534
535	pub fn flush(&self) -> Result<()> {
536		let tables = self.tables.read();
537		tables.index.flush()?;
538		for t in tables.value.iter() {
539			t.flush()?;
540		}
541		if tables.ref_count.is_some() {
542			tables.get_ref_count().flush()?;
543		}
544		Ok(())
545	}
546
547	fn open_index(
548		path: &std::path::Path,
549		col: ColId,
550	) -> Result<(IndexTable, VecDeque<ReindexEntry>, ColumnStats)> {
551		let mut reindexing = VecDeque::new();
552		let mut top = None;
553		let mut stats = ColumnStats::empty();
554		for bits in (MIN_INDEX_BITS..65).rev() {
555			let id = IndexTableId::new(col, bits);
556			if let Some(table) = IndexTable::open_existing(path, id)? {
557				if top.is_none() {
558					stats = table.load_stats()?;
559					log::trace!(target: "parity-db", "Opened main index {}", table.id);
560					top = Some(table);
561				} else {
562					log::trace!(target: "parity-db", "Opened stale index {}", table.id);
563					reindexing.push_front(ReindexEntry::Index(table));
564				}
565			}
566		}
567		let table = match top {
568			Some(table) => table,
569			None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)),
570		};
571		Ok((table, reindexing, stats))
572	}
573
574	fn open_ref_count(
575		path: &std::path::Path,
576		col: ColId,
577		reindexing: &mut VecDeque<ReindexEntry>,
578	) -> Result<RefCountTable> {
579		let mut top = None;
580		for bits in (MIN_REF_COUNT_BITS..65).rev() {
581			let id = RefCountTableId::new(col, bits);
582			if let Some(table) = RefCountTable::open_existing(path, id)? {
583				if top.is_none() {
584					log::trace!(target: "parity-db", "Opened main ref count {}", table.id);
585					top = Some(table);
586				} else {
587					log::trace!(target: "parity-db", "Opened stale ref count {}", table.id);
588					reindexing.push_front(ReindexEntry::RefCount(table));
589				}
590			}
591		}
592		let table = match top {
593			Some(table) => table,
594			None => RefCountTable::create_new(path, RefCountTableId::new(col, MIN_REF_COUNT_BITS)),
595		};
596		Ok(table)
597	}
598
599	fn trigger_reindex<'a, 'b>(
600		tables: RwLockUpgradableReadGuard<'a, Tables>,
601		reindex: RwLockUpgradableReadGuard<'b, Reindex>,
602		path: &std::path::Path,
603	) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
604		let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
605		let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
606		log::info!(
607			target: "parity-db",
608			"Started reindex for {}",
609			tables.index.id,
610		);
611		// Start reindex
612		let new_index_id =
613			IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1);
614		let new_table = IndexTable::create_new(path, new_index_id);
615		let old_table = std::mem::replace(&mut tables.index, new_table);
616		reindex.queue.push_back(ReindexEntry::Index(old_table));
617		(
618			RwLockWriteGuard::downgrade_to_upgradable(tables),
619			RwLockWriteGuard::downgrade_to_upgradable(reindex),
620		)
621	}
622
623	pub fn write_reindex_plan(
624		&self,
625		key: &Key,
626		address: Address,
627		log: &mut LogWriter,
628	) -> Result<PlanOutcome> {
629		let tables = self.tables.upgradable_read();
630		let reindex = self.reindex.upgradable_read();
631		self.write_reindex_plan_locked(tables, reindex, key, address, log)
632	}
633
634	fn write_reindex_plan_locked(
635		&self,
636		mut tables: RwLockUpgradableReadGuard<Tables>,
637		mut reindex: RwLockUpgradableReadGuard<Reindex>,
638		key: &Key,
639		address: Address,
640		log: &mut LogWriter,
641	) -> Result<PlanOutcome> {
642		if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
643			log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
644			return Ok(PlanOutcome::Skipped)
645		}
646		let mut outcome = PlanOutcome::Written;
647		while let PlanOutcome::NeedReindex =
648			tables.index.write_insert_plan(key, address, None, log)?
649		{
650			log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
651			(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
652			outcome = PlanOutcome::NeedReindex;
653		}
654		Ok(outcome)
655	}
656
657	fn search_index<'a>(
658		key: &Key,
659		index: &'a IndexTable,
660		tables: &'a Tables,
661		log: &LogWriter,
662	) -> Result<Option<(&'a IndexTable, usize, Address)>> {
663		let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
664		while !existing_entry.is_empty() {
665			let existing_address = existing_entry.address(index.id.index_bits());
666			let existing_tier = existing_address.size_tier();
667			let table_key = TableKey::Partial(*key);
668			if tables.value[existing_tier as usize].has_key_at(
669				existing_address.offset(),
670				&table_key,
671				log,
672			)? {
673				return Ok(Some((index, sub_index, existing_address)))
674			}
675
676			let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
677			existing_entry = next_entry;
678			sub_index = next_index;
679		}
680		Ok(None)
681	}
682
683	fn contains_partial_key_with_address(
684		key: &Key,
685		address: Address,
686		index: &IndexTable,
687		log: &LogWriter,
688	) -> Result<bool> {
689		let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
690		while !existing_entry.is_empty() {
691			let existing_address = existing_entry.address(index.id.index_bits());
692			if existing_address == address {
693				return Ok(true)
694			}
695			let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
696			existing_entry = next_entry;
697			sub_index = next_index;
698		}
699		Ok(false)
700	}
701
702	fn search_all_indexes<'a>(
703		key: &Key,
704		tables: &'a Tables,
705		reindex: &'a Reindex,
706		log: &LogWriter,
707	) -> Result<Option<(&'a IndexTable, usize, Address)>> {
708		if let Some(r) = Self::search_index(key, &tables.index, tables, log)? {
709			return Ok(Some(r))
710		}
711		// Check old indexes
712		// TODO: don't search if index precedes reindex progress
713		for entry in &reindex.queue {
714			if let ReindexEntry::Index(index) = entry {
715				if let Some(r) = Self::search_index(key, index, tables, log)? {
716					return Ok(Some(r))
717				}
718			}
719		}
720		Ok(None)
721	}
722
723	pub fn write_plan(
724		&self,
725		change: &Operation<Key, RcValue>,
726		log: &mut LogWriter,
727	) -> Result<PlanOutcome> {
728		let tables = self.tables.upgradable_read();
729		let reindex = self.reindex.upgradable_read();
730		let existing = Self::search_all_indexes(change.key(), &tables, &reindex, log)?;
731		if let Some((table, sub_index, existing_address)) = existing {
732			self.write_plan_existing(&tables, change, log, table, sub_index, existing_address)
733		} else {
734			match change {
735				Operation::Set(key, value) => {
736					let (r, _, _) =
737						self.write_plan_new(tables, reindex, key, value.as_ref(), log)?;
738					Ok(r)
739				},
740				Operation::Dereference(key) => {
741					log::trace!(target: "parity-db", "{}: Deleting missing key {}", tables.index.id, hex(key));
742					if self.collect_stats {
743						self.stats.remove_miss();
744					}
745					Ok(PlanOutcome::Skipped)
746				},
747				Operation::Reference(key) => {
748					log::trace!(target: "parity-db", "{}: Ignoring increase rc, missing key {}", tables.index.id, hex(key));
749					if self.collect_stats {
750						self.stats.reference_increase_miss();
751					}
752					Ok(PlanOutcome::Skipped)
753				},
754				Operation::InsertTree(..) |
755				Operation::ReferenceTree(..) |
756				Operation::DereferenceTree(..) =>
757					Err(Error::InvalidConfiguration("Unsupported operation on hash column".into())),
758			}
759		}
760	}
761
762	#[allow(clippy::too_many_arguments)]
763	fn write_plan_existing(
764		&self,
765		tables: &Tables,
766		change: &Operation<Key, RcValue>,
767		log: &mut LogWriter,
768		index: &IndexTable,
769		sub_index: usize,
770		existing_address: Address,
771	) -> Result<PlanOutcome> {
772		let stats = if self.collect_stats { Some(&self.stats) } else { None };
773
774		let key = change.key();
775		let table_key = TableKey::Partial(*key);
776		match Column::write_existing_value_plan(
777			&table_key,
778			self.as_ref(&tables.value),
779			existing_address,
780			change,
781			log,
782			stats,
783			self.ref_counted,
784		)? {
785			(Some(outcome), _) => Ok(outcome),
786			(None, Some(value_address)) => {
787				// If it was found in an older index we just insert a new entry. Reindex won't
788				// overwrite it.
789				let sub_index = if index.id == tables.index.id { Some(sub_index) } else { None };
790				tables.index.write_insert_plan(key, value_address, sub_index, log)
791			},
792			(None, None) => {
793				log::trace!(target: "parity-db", "{}: Removing from index {}", tables.index.id, hex(key));
794				index.write_remove_plan(key, sub_index, log)?;
795				Ok(PlanOutcome::Written)
796			},
797		}
798	}
799
800	fn write_plan_new<'a, 'b>(
801		&self,
802		mut tables: RwLockUpgradableReadGuard<'a, Tables>,
803		mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
804		key: &Key,
805		value: &[u8],
806		log: &mut LogWriter,
807	) -> Result<(
808		PlanOutcome,
809		RwLockUpgradableReadGuard<'a, Tables>,
810		RwLockUpgradableReadGuard<'b, Reindex>,
811	)> {
812		let stats = self.collect_stats.then_some(&self.stats);
813		let table_key = TableKey::Partial(*key);
814		let address = Column::write_new_value_plan(
815			&table_key,
816			self.as_ref(&tables.value),
817			value,
818			log,
819			stats,
820		)?;
821		let mut outcome = PlanOutcome::Written;
822		while let PlanOutcome::NeedReindex =
823			tables.index.write_insert_plan(key, address, None, log)?
824		{
825			log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
826			(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
827			outcome = PlanOutcome::NeedReindex;
828		}
829		Ok((outcome, tables, reindex))
830	}
831
832	fn trigger_ref_count_reindex<'a, 'b>(
833		tables: RwLockUpgradableReadGuard<'a, Tables>,
834		reindex: RwLockUpgradableReadGuard<'b, Reindex>,
835		path: &std::path::Path,
836	) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
837		let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
838		let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
839		log::info!(
840			target: "parity-db",
841			"Started reindex for ref count {}",
842			tables.get_ref_count().id,
843		);
844		// Start reindex
845		let new_id = RefCountTableId::new(
846			tables.get_ref_count().id.col(),
847			tables.get_ref_count().id.index_bits() + 1,
848		);
849		let new_table = Some(RefCountTable::create_new(path, new_id));
850		let old_table = std::mem::replace(&mut tables.ref_count, new_table);
851		reindex.queue.push_back(ReindexEntry::RefCount(old_table.unwrap()));
852		(
853			RwLockWriteGuard::downgrade_to_upgradable(tables),
854			RwLockWriteGuard::downgrade_to_upgradable(reindex),
855		)
856	}
857
858	pub fn write_ref_count_reindex_plan(
859		&self,
860		address: Address,
861		ref_count: u64,
862		source: RefCountTableId,
863		log: &mut LogWriter,
864	) -> Result<PlanOutcome> {
865		let tables = self.tables.upgradable_read();
866		let reindex = self.reindex.upgradable_read();
867		self.write_ref_count_reindex_plan_locked(tables, reindex, address, ref_count, source, log)
868	}
869
870	fn write_ref_count_reindex_plan_locked(
871		&self,
872		mut tables: RwLockUpgradableReadGuard<Tables>,
873		mut reindex: RwLockUpgradableReadGuard<Reindex>,
874		address: Address,
875		ref_count: u64,
876		source: RefCountTableId,
877		log: &mut LogWriter,
878	) -> Result<PlanOutcome> {
879		if let Some((_ref_count, _sub_index)) = tables.get_ref_count().get(address, log)? {
880			log::trace!(target: "parity-db", "{}: Skipped ref count reindex entry {} when reindexing", tables.get_ref_count().id, address);
881			return Ok(PlanOutcome::Skipped)
882		}
883		// An intermediate reindex table might contain a more recent value for the ref count so need
884		// to check for this and skip.
885		for entry in reindex.queue.iter().rev() {
886			if let ReindexEntry::RefCount(ref_count_table) = entry {
887				if ref_count_table.id == source {
888					break
889				}
890				if let Some(_r) = Self::search_ref_count(address, ref_count_table, log)? {
891					log::trace!(target: "parity-db", "{}: Skipped ref count reindex entry {} when reindexing", ref_count_table.id, address);
892					return Ok(PlanOutcome::Skipped)
893				}
894			}
895		}
896		let mut outcome = PlanOutcome::Written;
897		while let PlanOutcome::NeedReindex =
898			tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
899		{
900			log::debug!(target: "parity-db", "{}: Ref count chunk full {} when reindexing", tables.get_ref_count().id, address);
901			(tables, reindex) =
902				Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
903			outcome = PlanOutcome::NeedReindex;
904		}
905		Ok(outcome)
906	}
907
908	fn search_ref_count<'a>(
909		address: Address,
910		ref_count_table: &'a RefCountTable,
911		log: &LogWriter,
912	) -> Result<Option<(&'a RefCountTable, usize, u64)>> {
913		if let Some((ref_count, sub_index)) = ref_count_table.get(address, log)? {
914			return Ok(Some((ref_count_table, sub_index, ref_count)))
915		}
916		Ok(None)
917	}
918
919	fn search_all_ref_count<'a>(
920		address: Address,
921		tables: &'a Tables,
922		reindex: &'a Reindex,
923		log: &LogWriter,
924	) -> Result<Option<(&'a RefCountTable, usize, u64)>> {
925		if let Some(r) = Self::search_ref_count(address, tables.get_ref_count(), log)? {
926			return Ok(Some(r))
927		}
928		// Check old tables
929		// TODO: don't search if table precedes reindex progress
930		for entry in reindex.queue.iter().rev() {
931			if let ReindexEntry::RefCount(ref_count_table) = entry {
932				if let Some(r) = Self::search_ref_count(address, ref_count_table, log)? {
933					return Ok(Some(r))
934				}
935			}
936		}
937		Ok(None)
938	}
939
940	fn write_ref_count_plan_existing<'a>(
941		&self,
942		tables: &Tables,
943		reindex: &'a Reindex,
944		change: (Address, Option<u64>),
945		log: &mut LogWriter,
946		ref_count_table: &RefCountTable,
947		sub_index: usize,
948	) -> Result<PlanOutcome> {
949		let (address, ref_count) = change;
950		if let Some(ref_count) = ref_count {
951			// Replacing
952			assert!(ref_count_table.id == tables.get_ref_count().id);
953			tables
954				.get_ref_count()
955				.write_insert_plan(address, ref_count, Some(sub_index), log)
956		} else {
957			// Removing
958			let result = ref_count_table.write_remove_plan(address, sub_index, log);
959			// Need to remove from all old tables in reindex otherwise it will appear that this
960			// entry still exists and it might get reintroduced during reindex.
961			{
962				if ref_count_table.id != tables.get_ref_count().id {
963					if let Some((table, sub_index, _ref_count)) =
964						Self::search_ref_count(address, tables.get_ref_count(), log)?
965					{
966						table.write_remove_plan(address, sub_index, log)?;
967					}
968				}
969				for entry in &reindex.queue {
970					if let ReindexEntry::RefCount(table) = entry {
971						if table.id != ref_count_table.id {
972							if let Some((table, sub_index, _ref_count)) =
973								Self::search_ref_count(address, table, log)?
974							{
975								table.write_remove_plan(address, sub_index, log)?;
976							}
977						}
978					}
979				}
980			}
981			result
982		}
983	}
984
985	fn write_ref_count_plan_new<'a, 'b>(
986		&self,
987		mut tables: RwLockUpgradableReadGuard<'a, Tables>,
988		mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
989		address: Address,
990		ref_count: u64,
991		log: &mut LogWriter,
992	) -> Result<(
993		PlanOutcome,
994		RwLockUpgradableReadGuard<'a, Tables>,
995		RwLockUpgradableReadGuard<'b, Reindex>,
996	)> {
997		let mut outcome = PlanOutcome::Written;
998		while let PlanOutcome::NeedReindex =
999			tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
1000		{
1001			log::debug!(target: "parity-db", "{}: Ref count chunk full {}", tables.get_ref_count().id, address);
1002			(tables, reindex) =
1003				Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
1004			outcome = PlanOutcome::NeedReindex;
1005		}
1006		let (test_ref_count, _test_sub_index) = tables.get_ref_count().get(address, log)?.unwrap();
1007		assert!(test_ref_count == ref_count);
1008		Ok((outcome, tables, reindex))
1009	}
1010
1011	fn prepare_children(
1012		&self,
1013		children: &Vec<NodeRef>,
1014		tables: TablesRef,
1015		tier_count: &mut HashMap<usize, usize>,
1016	) -> Result<()> {
1017		for child in children {
1018			match child {
1019				NodeRef::New(node) => self.prepare_node(node, tables, tier_count)?,
1020				NodeRef::Existing(_address) => {},
1021			};
1022		}
1023		Ok(())
1024	}
1025
1026	fn prepare_node(
1027		&self,
1028		node: &NewNode,
1029		tables: TablesRef,
1030		tier_count: &mut HashMap<usize, usize>,
1031	) -> Result<()> {
1032		let data_size = packed_node_size(&node.data, node.children.len() as u8);
1033
1034		let table_key = TableKey::NoHash;
1035
1036		let target_tier = tables
1037			.tables
1038			.iter()
1039			.position(|t| t.value_size(&table_key).map_or(false, |s| data_size <= s as usize));
1040		let target_tier = target_tier.unwrap_or_else(|| tables.tables.len() - 1);
1041
1042		// Check it isn't multipart
1043		//assert!(target_tier < (SIZE_TIERS - 1));
1044
1045		match tier_count.entry(target_tier) {
1046			std::collections::hash_map::Entry::Occupied(mut entry) => {
1047				*entry.get_mut() += 1;
1048			},
1049			std::collections::hash_map::Entry::Vacant(entry) => {
1050				entry.insert(1);
1051			},
1052		}
1053
1054		self.prepare_children(&node.children, tables, tier_count)?;
1055
1056		Ok(())
1057	}
1058
1059	fn claim_children_to_data(
1060		&self,
1061		children: &Vec<NodeRef>,
1062		tables: TablesRef,
1063		tier_addresses: &HashMap<usize, Vec<u64>>,
1064		tier_index: &mut HashMap<usize, usize>,
1065		node_values: &mut Vec<NodeChange>,
1066		data: &mut Vec<u8>,
1067	) -> Result<()> {
1068		for child in children {
1069			let address = match child {
1070				NodeRef::New(node) =>
1071					self.claim_node(node, tables, tier_addresses, tier_index, node_values)?,
1072				NodeRef::Existing(address) => {
1073					if !self.append_only {
1074						node_values.push(NodeChange::IncrementReference(*address));
1075					}
1076					*address
1077				},
1078			};
1079			data.extend_from_slice(&address.to_le_bytes());
1080		}
1081		Ok(())
1082	}
1083
1084	fn claim_node(
1085		&self,
1086		node: &NewNode,
1087		tables: TablesRef,
1088		tier_addresses: &HashMap<usize, Vec<u64>>,
1089		tier_index: &mut HashMap<usize, usize>,
1090		node_values: &mut Vec<NodeChange>,
1091	) -> Result<NodeAddress> {
1092		let num_children = node.children.len();
1093
1094		let data_size = packed_node_size(&node.data, num_children as u8);
1095
1096		let table_key = TableKey::NoHash;
1097
1098		let target_tier = tables
1099			.tables
1100			.iter()
1101			.position(|t| t.value_size(&table_key).map_or(false, |s| data_size <= s as usize));
1102		let target_tier = target_tier.unwrap_or_else(|| tables.tables.len() - 1);
1103
1104		let index = *tier_index.get(&target_tier).unwrap();
1105		tier_index.insert(target_tier, index + 1);
1106
1107		let offset = tier_addresses.get(&target_tier).unwrap()[index];
1108
1109		let mut data: Vec<u8> = Vec::with_capacity(data_size);
1110		data.extend_from_slice(&node.data);
1111		self.claim_children_to_data(
1112			&node.children,
1113			tables,
1114			tier_addresses,
1115			tier_index,
1116			node_values,
1117			&mut data,
1118		)?;
1119		data.push(num_children as u8);
1120
1121		// Can't support compression as we need to know the size earlier to get the tier.
1122		let val: RcValue = data.into();
1123
1124		let address = Address::new(offset, target_tier as u8);
1125
1126		node_values.push(NodeChange::NewValue(address.as_u64(), val));
1127
1128		Ok(address.as_u64())
1129	}
1130
1131	/// returns value for the root node and vector of NodeChange for nodes.
1132	pub fn claim_tree_values<K, V>(
1133		&self,
1134		change: &Operation<K, V>,
1135	) -> Result<(Vec<u8>, Vec<NodeChange>)> {
1136		match change {
1137			Operation::InsertTree(_key, node) => {
1138				let tables = self.tables.upgradable_read();
1139
1140				let values = self.as_ref(&tables.value);
1141
1142				let mut tier_count: HashMap<usize, usize> = Default::default();
1143				self.prepare_children(&node.children, values, &mut tier_count)?;
1144
1145				let mut tier_addresses: HashMap<usize, Vec<u64>> = Default::default();
1146				let mut tier_index: HashMap<usize, usize> = Default::default();
1147				for (tier, count) in tier_count {
1148					let offsets = values.tables[tier].claim_entries(count)?;
1149					tier_addresses.insert(tier, offsets);
1150					tier_index.insert(tier, 0);
1151				}
1152
1153				let mut node_values: Vec<NodeChange> = Default::default();
1154
1155				let num_children = node.children.len();
1156				let data_size = packed_node_size(&node.data, num_children as u8);
1157				let mut data: Vec<u8> = Vec::with_capacity(data_size);
1158				data.extend_from_slice(&node.data);
1159				self.claim_children_to_data(
1160					&node.children,
1161					values,
1162					&tier_addresses,
1163					&mut tier_index,
1164					&mut node_values,
1165					&mut data,
1166				)?;
1167				data.push(num_children as u8);
1168
1169				return Ok((data, node_values))
1170			},
1171			Operation::ReferenceTree(..) | Operation::DereferenceTree(..) =>
1172				return Err(Error::InvalidInput(format!(
1173					"claim_tree_values should not be called from ReferenceTree or DereferenceTree"
1174				))),
1175			_ =>
1176				return Err(Error::InvalidInput(format!(
1177					"Invalid operation for column {}",
1178					self.col
1179				))),
1180		}
1181	}
1182
1183	pub fn write_address_value_plan(
1184		&self,
1185		address: u64,
1186		cval: RcValue,
1187		compressed: bool,
1188		val_len: u32,
1189		log: &mut LogWriter,
1190	) -> Result<PlanOutcome> {
1191		let tables = self.tables.upgradable_read();
1192		let tables = self.as_ref(&tables.value);
1193		let address = Address::from_u64(address);
1194		let target_tier = address.size_tier();
1195		let offset = address.offset();
1196		tables.tables[target_tier as usize].write_claimed_plan(
1197			offset,
1198			&TableKey::NoHash,
1199			cval.as_ref(),
1200			log,
1201			compressed,
1202		)?;
1203
1204		let stats = self.collect_stats.then_some(&self.stats);
1205		if let Some(stats) = stats {
1206			stats.insert_val(val_len, cval.as_ref().len() as u32);
1207		}
1208
1209		Ok(PlanOutcome::Written)
1210	}
1211
1212	pub fn write_address_inc_ref_plan(
1213		&self,
1214		address: u64,
1215		log: &mut LogWriter,
1216	) -> Result<PlanOutcome> {
1217		let tables = self.tables.upgradable_read();
1218		let reindex = self.reindex.upgradable_read();
1219		let address = Address::from_u64(address);
1220		let cached = self
1221			.ref_count_cache
1222			.as_ref()
1223			.map_or(None, |c| c.read().get(&address.as_u64()).cloned());
1224		if let Some(cached_ref_count) = cached {
1225			let existing: Option<(&RefCountTable, usize, u64)> =
1226				Self::search_all_ref_count(address, &tables, &reindex, log)?;
1227			let (table, sub_index, table_ref_count) = existing.unwrap();
1228			assert!(cached_ref_count > 1);
1229			assert!(table_ref_count == cached_ref_count);
1230			let new_ref_count = cached_ref_count + 1;
1231			self.ref_count_cache
1232				.as_ref()
1233				.map(|c| c.write().insert(address.as_u64(), new_ref_count));
1234			if table.id == tables.get_ref_count().id {
1235				self.write_ref_count_plan_existing(
1236					&tables,
1237					&reindex,
1238					(address, Some(new_ref_count)),
1239					log,
1240					table,
1241					sub_index,
1242				)
1243			} else {
1244				let (r, _, _) =
1245					self.write_ref_count_plan_new(tables, reindex, address, new_ref_count, log)?;
1246				Ok(r)
1247			}
1248		} else {
1249			// inc ref is only called on addresses that already exist, so we know they must have
1250			// only 1 reference.
1251			self.ref_count_cache.as_ref().map(|c| c.write().insert(address.as_u64(), 2));
1252			let (r, _, _) = self.write_ref_count_plan_new(tables, reindex, address, 2, log)?;
1253			Ok(r)
1254		}
1255	}
1256
1257	pub fn write_address_dec_ref_plan(
1258		&self,
1259		address: u64,
1260		log: &mut LogWriter,
1261	) -> Result<(bool, PlanOutcome)> {
1262		let tables = self.tables.upgradable_read();
1263		let reindex = self.reindex.upgradable_read();
1264		let address = Address::from_u64(address);
1265		let cached = self
1266			.ref_count_cache
1267			.as_ref()
1268			.map_or(None, |c| c.read().get(&address.as_u64()).cloned());
1269		if let Some(cached_ref_count) = cached {
1270			let existing: Option<(&RefCountTable, usize, u64)> =
1271				Self::search_all_ref_count(address, &tables, &reindex, log)?;
1272			let (table, sub_index, table_ref_count) = existing.unwrap();
1273			assert!(cached_ref_count > 1);
1274			assert!(table_ref_count == cached_ref_count);
1275			let new_ref_count = cached_ref_count - 1;
1276			self.ref_count_cache.as_ref().map(|c| {
1277				if new_ref_count > 1 {
1278					c.write().insert(address.as_u64(), new_ref_count);
1279				} else {
1280					c.write().remove(&address.as_u64());
1281				}
1282			});
1283			let new_ref_count = if new_ref_count > 1 { Some(new_ref_count) } else { None };
1284			let outcome = if new_ref_count.is_some() && table.id != tables.get_ref_count().id {
1285				let (r, _, _) = self.write_ref_count_plan_new(
1286					tables,
1287					reindex,
1288					address,
1289					new_ref_count.unwrap(),
1290					log,
1291				)?;
1292				r
1293			} else {
1294				self.write_ref_count_plan_existing(
1295					&tables,
1296					&reindex,
1297					(address, new_ref_count),
1298					log,
1299					table,
1300					sub_index,
1301				)?
1302			};
1303			Ok((true, outcome))
1304		} else {
1305			// dec ref is only called on addresses that already exist, so we know they must have
1306			// only 1 reference.
1307			let tables = self.as_ref(&tables.value);
1308			let target_tier = address.size_tier();
1309			let offset = address.offset();
1310			tables.tables[target_tier as usize].write_remove_plan(offset, log)?;
1311			Ok((false, PlanOutcome::Written))
1312		}
1313	}
1314
1315	pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1316		let tables = self.tables.read();
1317		let reindex = self.reindex.read();
1318		match action {
1319			LogAction::InsertIndex(record) => {
1320				if tables.index.id == record.table {
1321					tables.index.enact_plan(record.index, log)?;
1322				} else if let Some(table) = reindex
1323					.queue
1324					.iter()
1325					.filter_map(|s| if let ReindexEntry::Index(t) = s { Some(t) } else { None })
1326					.find(|r| r.id == record.table)
1327				{
1328					table.enact_plan(record.index, log)?;
1329				} else {
1330					// This may happen when removal is planned for an old index when reindexing.
1331					// We can safely skip the removal since the new index does not have the entry
1332					// anyway and the old index is already dropped.
1333					log::debug!(
1334						target: "parity-db",
1335						"Missing index {}. Skipped",
1336						record.table,
1337					);
1338					IndexTable::skip_plan(log)?;
1339				}
1340			},
1341			LogAction::InsertValue(record) => {
1342				tables.value[record.table.size_tier() as usize].enact_plan(record.index, log)?;
1343			},
1344			LogAction::InsertRefCount(record) => {
1345				if tables.get_ref_count().id == record.table {
1346					tables.get_ref_count().enact_plan(record.index, log)?;
1347				} else if let Some(table) = reindex
1348					.queue
1349					.iter()
1350					.filter_map(|s| if let ReindexEntry::RefCount(t) = s { Some(t) } else { None })
1351					.find(|r| r.id == record.table)
1352				{
1353					table.enact_plan(record.index, log)?;
1354				} else {
1355					// This may happen when removal is planned for an old ref count when reindexing.
1356					// We can safely skip the removal since the new ref count does not have the
1357					// entry anyway and the old ref count is already dropped.
1358					log::debug!(
1359						target: "parity-db",
1360						"Missing ref count {}. Skipped",
1361						record.table,
1362					);
1363					RefCountTable::skip_plan(log)?;
1364				}
1365			},
1366			// This should never happen, unless something has modified the log file while the
1367			// database is running. Existing logs should be validated with `validate_plan` on
1368			// startup.
1369			_ => return Err(Error::Corruption("Unexpected log action".into())),
1370		}
1371		Ok(())
1372	}
1373
1374	pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1375		let tables = self.tables.upgradable_read();
1376		let reindex = self.reindex.upgradable_read();
1377		match action {
1378			LogAction::InsertIndex(record) => {
1379				if tables.index.id == record.table {
1380					tables.index.validate_plan(record.index, log)?;
1381				} else if let Some(table) = reindex
1382					.queue
1383					.iter()
1384					.filter_map(|s| if let ReindexEntry::Index(t) = s { Some(t) } else { None })
1385					.find(|r| r.id == record.table)
1386				{
1387					table.validate_plan(record.index, log)?;
1388				} else {
1389					if record.table.index_bits() < tables.index.id.index_bits() {
1390						// Insertion into a previously dropped index.
1391						log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
1392						return Err(Error::Corruption("Unexpected log index id".to_string()))
1393					}
1394					// Re-launch previously started reindex
1395					// TODO: add explicit log records for reindexing events.
1396					log::warn!(
1397						target: "parity-db",
1398						"Missing table {}, starting reindex",
1399						record.table,
1400					);
1401					let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
1402					std::mem::drop(lock);
1403					return self.validate_plan(LogAction::InsertIndex(record), log)
1404				}
1405			},
1406			LogAction::InsertValue(record) => {
1407				tables.value[record.table.size_tier() as usize].validate_plan(record.index, log)?;
1408			},
1409			LogAction::InsertRefCount(record) => {
1410				if tables.get_ref_count().id == record.table {
1411					tables.get_ref_count().validate_plan(record.index, log)?;
1412				} else if let Some(table) = reindex
1413					.queue
1414					.iter()
1415					.filter_map(|s| if let ReindexEntry::RefCount(t) = s { Some(t) } else { None })
1416					.find(|r| r.id == record.table)
1417				{
1418					table.validate_plan(record.index, log)?;
1419				} else {
1420					if record.table.index_bits() < tables.get_ref_count().id.index_bits() {
1421						// Insertion into a previously dropped ref count.
1422						log::warn!( target: "parity-db", "Ref count {} is too old. Current is {}", record.table, tables.get_ref_count().id);
1423						return Err(Error::Corruption("Unexpected log ref count id".to_string()))
1424					}
1425					// Re-launch previously started reindex
1426					// TODO: add explicit log records for reindexing events.
1427					log::warn!(
1428						target: "parity-db",
1429						"Missing ref count {}, starting reindex",
1430						record.table,
1431					);
1432					let lock =
1433						Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
1434					std::mem::drop(lock);
1435					return self.validate_plan(LogAction::InsertRefCount(record), log)
1436				}
1437			},
1438			_ => {
1439				log::error!(target: "parity-db", "Unexpected log action");
1440				return Err(Error::Corruption("Unexpected log action".to_string()))
1441			},
1442		}
1443		Ok(())
1444	}
1445
1446	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1447		let tables = self.tables.read();
1448		for t in tables.value.iter() {
1449			t.complete_plan(log)?;
1450		}
1451		if self.collect_stats {
1452			self.stats.commit()
1453		}
1454		Ok(())
1455	}
1456
1457	pub fn refresh_metadata(&self) -> Result<()> {
1458		let tables = self.tables.read();
1459		for t in tables.value.iter() {
1460			t.refresh_metadata()?;
1461		}
1462		Ok(())
1463	}
1464
1465	pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
1466		let tables = self.tables.read();
1467		tables.index.write_stats(&self.stats)?;
1468		self.stats.write_stats_text(writer, tables.index.id.col()).map_err(Error::Io)
1469	}
1470
1471	fn stat_summary(&self) -> ColumnStatSummary {
1472		self.stats.summary()
1473	}
1474
1475	fn clear_stats(&self) -> Result<()> {
1476		let tables = self.tables.read();
1477		self.stats.clear();
1478		tables.index.write_stats(&self.stats)
1479	}
1480
1481	pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1482		let tables = self.tables.read();
1483		for table in &tables.value {
1484			log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
1485			table.iter_while(log.overlays(), |_, rc, value, compressed| {
1486				let value = if compressed {
1487					if let Ok(value) = self.compression.decompress(&value) {
1488						value
1489					} else {
1490						return false
1491					}
1492				} else {
1493					value
1494				};
1495				let state = ValueIterState { rc, value };
1496				f(state)
1497			})?;
1498			log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
1499		}
1500		Ok(())
1501	}
1502
1503	pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
1504		let action = |state| match state {
1505			IterStateOrCorrupted::Item(item) => Ok(f(item)),
1506			IterStateOrCorrupted::Corrupted(..) =>
1507				Err(Error::Corruption("Missing indexed value".into())),
1508		};
1509		self.iter_index_internal(log, action, 0)
1510	}
1511
1512	fn iter_index_internal(
1513		&self,
1514		log: &Log,
1515		mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
1516		start_chunk: u64,
1517	) -> Result<()> {
1518		let tables = self.tables.read();
1519		let source = &tables.index;
1520		let total_chunks = source.id.total_chunks();
1521
1522		for c in start_chunk..total_chunks {
1523			let entries = source.entries(c, log.overlays())?;
1524			for (sub_index, entry) in entries.iter().enumerate() {
1525				if entry.is_empty() {
1526					continue
1527				}
1528				let (size_tier, offset) = {
1529					let address = entry.address(source.id.index_bits());
1530					(address.size_tier(), address.offset())
1531				};
1532
1533				let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
1534				let (value, rc, pk, compressed) = match value {
1535					Ok(Some(v)) => v,
1536					Ok(None) => {
1537						let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
1538						if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1539							chunk_index: c,
1540							sub_index: sub_index as u32,
1541							value_entry,
1542							entry: *entry,
1543							error: None,
1544						}))? {
1545							return Ok(())
1546						}
1547						continue
1548					},
1549					Err(e) => {
1550						let value_entry = if let Error::Corruption(_) = &e {
1551							tables.value[size_tier as usize].dump_entry(offset).ok()
1552						} else {
1553							None
1554						};
1555						if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1556							chunk_index: c,
1557							sub_index: sub_index as u32,
1558							value_entry,
1559							entry: *entry,
1560							error: Some(e),
1561						}))? {
1562							return Ok(())
1563						}
1564						continue
1565					},
1566				};
1567				let mut key = source.recover_key_prefix(c, *entry);
1568				key[6..].copy_from_slice(&pk);
1569				let value = if compressed { self.compression.decompress(&value)? } else { value };
1570				log::debug!(
1571					target: "parity-db",
1572					"{}: Iterating at {}/{}, key={:?}, pk={:?}",
1573					source.id,
1574					c,
1575					source.id.total_chunks(),
1576					hex(&key),
1577					hex(&pk),
1578				);
1579				let state = IterStateOrCorrupted::Item(IterState {
1580					item_index: c,
1581					total_items: total_chunks,
1582					key,
1583					rc,
1584					value,
1585				});
1586				if !f(state)? {
1587					return Ok(())
1588				}
1589			}
1590		}
1591		Ok(())
1592	}
1593
1594	fn iter_index_fast(
1595		&self,
1596		log: &Log,
1597		mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
1598		_start_chunk: u64,
1599	) -> Result<()> {
1600		let tables = self.tables.read();
1601		let index = &tables.index;
1602
1603		let entries = index.sorted_entries()?;
1604		let total = entries.len();
1605		for (sub_index, entry) in entries.into_iter().enumerate() {
1606			let (size_tier, offset) = {
1607				let address = entry.address(index.id.index_bits());
1608				(address.size_tier(), address.offset())
1609			};
1610
1611			let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
1612			let (value, rc, pk, compressed) = match value {
1613				Ok(Some(v)) => v,
1614				Ok(None) => {
1615					let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
1616					if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1617						chunk_index: sub_index as u64,
1618						sub_index: sub_index as u32,
1619						value_entry,
1620						entry,
1621						error: None,
1622					}))? {
1623						return Ok(())
1624					}
1625					continue
1626				},
1627				Err(e) => {
1628					let value_entry = if let Error::Corruption(_) = &e {
1629						tables.value[size_tier as usize].dump_entry(offset).ok()
1630					} else {
1631						None
1632					};
1633					if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1634						chunk_index: sub_index as u64,
1635						sub_index: sub_index as u32,
1636						value_entry,
1637						entry,
1638						error: Some(e),
1639					}))? {
1640						return Ok(())
1641					}
1642					continue
1643				},
1644			};
1645			let value = if compressed { self.compression.decompress(&value)? } else { value };
1646			log::debug!(
1647				target: "parity-db",
1648				"{}: Iterating at {}/{}, pk={:?}",
1649				index.id,
1650				sub_index,
1651				total,
1652				hex(&pk),
1653			);
1654			let state = IterStateOrCorrupted::Item(IterState {
1655				item_index: sub_index as u64,
1656				total_items: total as u64,
1657				key: Default::default(),
1658				rc,
1659				value,
1660			});
1661			if !f(state)? {
1662				return Ok(())
1663			}
1664		}
1665		Ok(())
1666	}
1667
1668	fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
1669		let start_chunk = check_params.from.unwrap_or(0);
1670		let end_chunk = check_params.bound;
1671
1672		let step = if check_params.fast { 1_000_000 } else { 10_000 };
1673		let (denom, suffix) = if check_params.fast { (1_000_000, "m") } else { (1_000, "k") };
1674		let mut next_info_at = step;
1675		let start_time = std::time::Instant::now();
1676		let index_id = self.tables.read().index.id;
1677		log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
1678		let iter_fn =
1679			if check_params.fast { Self::iter_index_fast } else { Self::iter_index_internal };
1680		iter_fn(
1681			self,
1682			log,
1683			|state| match state {
1684				IterStateOrCorrupted::Item(IterState {
1685					item_index,
1686					total_items,
1687					key,
1688					rc,
1689					value,
1690				}) => {
1691					if Some(item_index) == end_chunk {
1692						return Ok(false)
1693					}
1694					if item_index >= next_info_at {
1695						next_info_at += step;
1696						log::info!(target: "parity-db", "Validated {}{} / {}{} entries", item_index / denom, suffix, total_items / denom, suffix);
1697					}
1698
1699					match check_params.display {
1700						CheckDisplay::Full => {
1701							log::info!(
1702								"Index key: {:x?}\n \
1703							\tRc: {}",
1704								&key,
1705								rc,
1706							);
1707							log::info!("Value: {}", hex(&value));
1708						},
1709						CheckDisplay::Short(t) => {
1710							log::info!("Index key: {:x?}", &key);
1711							log::info!("Rc: {}, Value len: {}", rc, value.len());
1712							log::info!(
1713								"Value: {}",
1714								hex(&value[..std::cmp::min(t as usize, value.len())])
1715							);
1716						},
1717						CheckDisplay::None => (),
1718					}
1719					Ok(true)
1720				},
1721				IterStateOrCorrupted::Corrupted(c) => {
1722					log::error!(
1723						"Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
1724						c.chunk_index,
1725						c.sub_index,
1726						c.entry.address(index_id.index_bits()),
1727						hex(&c.entry.as_u64().to_le_bytes()),
1728						c.error,
1729					);
1730					if let Some(v) = c.value_entry {
1731						log::error!("Value entry: {:?}", hex(v.as_slice()));
1732					}
1733					Ok(true)
1734				},
1735			},
1736			start_chunk,
1737		)?;
1738
1739		log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
1740		if check_params.validate_free_refs {
1741			log::info!(target: "parity-db", "Validating free refs");
1742			let tables = self.tables.read();
1743			let mut total = 0;
1744			for t in &tables.value {
1745				match t.check_free_refs() {
1746					Err(e) => log::warn!(target: "parity-db", "{}: Error: {:?}", t.id, e),
1747					Ok(n) => total += n,
1748				}
1749			}
1750			log::info!(target: "parity-db", "{} Total free refs", total);
1751		}
1752		Ok(())
1753	}
1754
1755	pub fn reindex(&self, log: &Log) -> Result<ReindexBatch> {
1756		let tables = self.tables.read();
1757		let reindex = self.reindex.read();
1758		let mut plan = Vec::new();
1759		let mut drop_index = None;
1760		let mut ref_count_plan = Vec::new();
1761		let mut ref_count_source = None;
1762		let mut drop_ref_count = None;
1763		if let Some(source) = reindex.queue.front() {
1764			let progress = reindex.progress.load(Ordering::Relaxed);
1765			match source {
1766				ReindexEntry::Index(source) => {
1767					if progress != source.id.total_chunks() {
1768						let mut source_index = progress;
1769						if source_index % 500 == 0 {
1770							log::debug!(target: "parity-db", "{}: Reindexing at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1771						}
1772						log::debug!(target: "parity-db", "{}: Continue reindex at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1773						while source_index < source.id.total_chunks() &&
1774							plan.len() < MAX_REINDEX_BATCH
1775						{
1776							log::trace!(target: "parity-db", "{}: Reindexing {}", source.id, source_index);
1777							let entries = source.entries(source_index, log.overlays())?;
1778							for entry in entries.iter() {
1779								if entry.is_empty() {
1780									continue
1781								}
1782								// We only need key prefix to reindex.
1783								let key = source.recover_key_prefix(source_index, *entry);
1784								plan.push((key, entry.address(source.id.index_bits())))
1785							}
1786							source_index += 1;
1787						}
1788						log::trace!(target: "parity-db", "{}: End reindex batch {} ({})", tables.index.id, source_index, plan.len());
1789						reindex.progress.store(source_index, Ordering::Relaxed);
1790						if source_index == source.id.total_chunks() {
1791							log::info!(target: "parity-db", "Completed reindex {} into {}", source.id, tables.index.id);
1792							drop_index = Some(source.id);
1793						}
1794					}
1795				},
1796				ReindexEntry::RefCount(source) =>
1797					if progress != source.id.total_chunks() {
1798						let mut source_index = progress;
1799						if source_index % 500 == 0 {
1800							log::debug!(target: "parity-db", "{}: Reindexing ref count at {}/{}", tables.get_ref_count().id, source_index, source.id.total_chunks());
1801						}
1802						ref_count_source = Some(source.id);
1803						log::debug!(target: "parity-db", "{}: Continue reindex ref count at {}/{}", tables.get_ref_count().id, source_index, source.id.total_chunks());
1804						while source_index < source.id.total_chunks() &&
1805							ref_count_plan.len() < MAX_REINDEX_BATCH
1806						{
1807							log::trace!(target: "parity-db", "{}: Reindexing ref count {}", source.id, source_index);
1808							let entries = source.entries(source_index, log.overlays())?;
1809							for entry in entries.iter() {
1810								if entry.is_empty() {
1811									continue
1812								}
1813								ref_count_plan.push((entry.address(), entry.ref_count()));
1814							}
1815							source_index += 1;
1816						}
1817						log::trace!(target: "parity-db", "{}: End reindex ref count batch {} ({})", tables.get_ref_count().id, source_index, ref_count_plan.len());
1818						reindex.progress.store(source_index, Ordering::Relaxed);
1819						if source_index == source.id.total_chunks() {
1820							log::info!(target: "parity-db", "Completed reindex ref count {} into {}", source.id, tables.get_ref_count().id);
1821							drop_ref_count = Some(source.id);
1822						}
1823					},
1824			}
1825		}
1826		Ok(ReindexBatch {
1827			drop_index,
1828			batch: plan,
1829			drop_ref_count,
1830			ref_count_batch: ref_count_plan,
1831			ref_count_batch_source: ref_count_source,
1832		})
1833	}
1834
1835	pub fn drop_index(&self, id: IndexTableId) -> Result<()> {
1836		log::debug!(target: "parity-db", "Dropping {}", id);
1837		let mut reindex = self.reindex.write();
1838		if reindex.queue.front_mut().map_or(false, |e| {
1839			if let ReindexEntry::Index(t) = e {
1840				t.id == id
1841			} else {
1842				false
1843			}
1844		}) {
1845			reindex.progress.store(0, Ordering::Relaxed);
1846			let table = reindex.queue.pop_front().unwrap();
1847			let table = if let ReindexEntry::Index(table) = table {
1848				table
1849			} else {
1850				return Err(Error::Corruption(format!("Incorrect reindex type")))
1851			};
1852			table.drop_file()?;
1853		} else {
1854			log::warn!(target: "parity-db", "Dropping invalid index {}", id);
1855			return Ok(())
1856		}
1857		log::debug!(target: "parity-db", "Dropped {}", id);
1858		Ok(())
1859	}
1860
1861	pub fn drop_ref_count(&self, id: RefCountTableId) -> Result<()> {
1862		log::debug!(target: "parity-db", "Dropping ref count {}", id);
1863		let mut reindex = self.reindex.write();
1864		if reindex.queue.front_mut().map_or(false, |e| {
1865			if let ReindexEntry::RefCount(t) = e {
1866				t.id == id
1867			} else {
1868				false
1869			}
1870		}) {
1871			reindex.progress.store(0, Ordering::Relaxed);
1872			let table = reindex.queue.pop_front().unwrap();
1873			let table = if let ReindexEntry::RefCount(table) = table {
1874				table
1875			} else {
1876				return Err(Error::Corruption(format!("Incorrect reindex type")))
1877			};
1878			table.drop_file()?;
1879		} else {
1880			log::warn!(target: "parity-db", "Dropping invalid ref count {}", id);
1881			return Ok(())
1882		}
1883		log::debug!(target: "parity-db", "Dropped ref count {}", id);
1884		Ok(())
1885	}
1886
1887	pub fn get_num_value_entries(&self) -> Result<u64> {
1888		let tables = self.tables.read();
1889		let mut num_entries = 0;
1890		for value_table in &tables.value {
1891			num_entries += value_table.get_num_entries()?;
1892		}
1893		Ok(num_entries)
1894	}
1895}
1896
1897impl Column {
1898	pub fn write_existing_value_plan<K, V: AsRef<[u8]>>(
1899		key: &TableKey,
1900		tables: TablesRef,
1901		address: Address,
1902		change: &Operation<K, V>,
1903		log: &mut LogWriter,
1904		stats: Option<&ColumnStats>,
1905		ref_counted: bool,
1906	) -> Result<(Option<PlanOutcome>, Option<Address>)> {
1907		let tier = address.size_tier() as usize;
1908
1909		let fetch_size = || -> Result<(u32, u32)> {
1910			let (cur_size, compressed) =
1911				tables.tables[tier].size(key, address.offset(), log)?.unwrap_or((0, false));
1912			Ok(if compressed {
1913				// This is very costly.
1914				let compressed = tables.tables[tier]
1915					.get(key, address.offset(), log)?
1916					.expect("Same query as size")
1917					.0;
1918				let uncompressed = tables.compression.decompress(compressed.as_slice())?;
1919
1920				(cur_size, uncompressed.len() as u32)
1921			} else {
1922				(cur_size, cur_size)
1923			})
1924		};
1925
1926		match change {
1927			Operation::Reference(_) =>
1928				if ref_counted {
1929					log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1930					tables.tables[tier].write_inc_ref(address.offset(), log)?;
1931					if let Some(stats) = stats {
1932						stats.reference_increase();
1933					}
1934					Ok((Some(PlanOutcome::Written), None))
1935				} else {
1936					Ok((Some(PlanOutcome::Skipped), None))
1937				},
1938			Operation::Set(_, val) => {
1939				if ref_counted {
1940					log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1941					tables.tables[tier].write_inc_ref(address.offset(), log)?;
1942					return Ok((Some(PlanOutcome::Written), None))
1943				}
1944				if tables.preimage {
1945					// Replace is not supported
1946					return Ok((Some(PlanOutcome::Skipped), None))
1947				}
1948
1949				let (cval, target_tier) =
1950					Column::compress(tables.compression, key, val.as_ref(), tables.tables);
1951				let (cval, compressed) = cval
1952					.as_ref()
1953					.map(|cval| (cval.as_slice(), true))
1954					.unwrap_or((val.as_ref(), false));
1955
1956				if let Some(stats) = stats {
1957					let (cur_size, uncompressed) = fetch_size()?;
1958					stats.replace_val(
1959						cur_size,
1960						uncompressed,
1961						val.as_ref().len() as u32,
1962						cval.len() as u32,
1963					);
1964				}
1965				if tier == target_tier {
1966					log::trace!(target: "parity-db", "{}: Replacing {}", tables.col, key);
1967					tables.tables[target_tier].write_replace_plan(
1968						address.offset(),
1969						key,
1970						cval,
1971						log,
1972						compressed,
1973					)?;
1974					Ok((Some(PlanOutcome::Written), None))
1975				} else {
1976					log::trace!(target: "parity-db", "{}: Replacing in a new table {}", tables.col, key);
1977					tables.tables[tier].write_remove_plan(address.offset(), log)?;
1978					let new_offset =
1979						tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1980					let new_address = Address::new(new_offset, target_tier as u8);
1981					Ok((None, Some(new_address)))
1982				}
1983			},
1984			Operation::Dereference(_) => {
1985				// Deletion
1986				let cur_size = if stats.is_some() { Some(fetch_size()?) } else { None };
1987				let remove = if ref_counted {
1988					let removed = !tables.tables[tier].write_dec_ref(address.offset(), log)?;
1989					log::trace!(target: "parity-db", "{}: Dereference {}, deleted={}", tables.col, key, removed);
1990					removed
1991				} else {
1992					log::trace!(target: "parity-db", "{}: Deleting {}", tables.col, key);
1993					tables.tables[tier].write_remove_plan(address.offset(), log)?;
1994					true
1995				};
1996				if remove {
1997					if let Some((compressed_size, uncompressed_size)) = cur_size {
1998						if let Some(stats) = stats {
1999							stats.remove_val(uncompressed_size, compressed_size);
2000						}
2001					}
2002					Ok((None, None))
2003				} else {
2004					Ok((Some(PlanOutcome::Written), None))
2005				}
2006			},
2007			Operation::InsertTree(..) |
2008			Operation::ReferenceTree(..) |
2009			Operation::DereferenceTree(..) =>
2010				Err(Error::InvalidInput(format!("Invalid operation for column {}", tables.col))),
2011		}
2012	}
2013
2014	pub fn write_new_value_plan(
2015		key: &TableKey,
2016		tables: TablesRef,
2017		val: &[u8],
2018		log: &mut LogWriter,
2019		stats: Option<&ColumnStats>,
2020	) -> Result<Address> {
2021		let (cval, target_tier) = Column::compress(tables.compression, key, val, tables.tables);
2022		let (cval, compressed) =
2023			cval.as_ref().map(|cval| (cval.as_slice(), true)).unwrap_or((val, false));
2024
2025		log::trace!(target: "parity-db", "{}: Inserting new {}, size = {}", tables.col, key, cval.len());
2026		let offset = tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
2027		let address = Address::new(offset, target_tier as u8);
2028
2029		if let Some(stats) = stats {
2030			stats.insert_val(val.len() as u32, cval.len() as u32);
2031		}
2032		Ok(address)
2033	}
2034
2035	pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
2036		match self {
2037			Column::Hash(column) => column.complete_plan(log),
2038			Column::Tree(column) => column.complete_plan(log),
2039		}
2040	}
2041
2042	pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
2043		match self {
2044			Column::Hash(column) => column.validate_plan(action, log),
2045			Column::Tree(column) => column.validate_plan(action, log),
2046		}
2047	}
2048
2049	pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
2050		match self {
2051			Column::Hash(column) => column.enact_plan(action, log),
2052			Column::Tree(column) => column.enact_plan(action, log),
2053		}
2054	}
2055
2056	pub fn init_table_data(&mut self) -> Result<()> {
2057		match self {
2058			Column::Hash(column) => column.init_table_data(),
2059			Column::Tree(_column) => Ok(()),
2060		}
2061	}
2062
2063	pub fn flush(&self) -> Result<()> {
2064		match self {
2065			Column::Hash(column) => column.flush(),
2066			Column::Tree(column) => column.flush(),
2067		}
2068	}
2069
2070	pub fn refresh_metadata(&self) -> Result<()> {
2071		match self {
2072			Column::Hash(column) => column.refresh_metadata(),
2073			Column::Tree(column) => column.refresh_metadata(),
2074		}
2075	}
2076
2077	pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
2078		match self {
2079			Column::Hash(column) => column.write_stats_text(writer),
2080			Column::Tree(_column) => Ok(()),
2081		}
2082	}
2083
2084	pub fn clear_stats(&self) -> Result<()> {
2085		match self {
2086			Column::Hash(column) => column.clear_stats(),
2087			Column::Tree(_column) => Ok(()),
2088		}
2089	}
2090
2091	pub fn stats(&self) -> Option<ColumnStatSummary> {
2092		match self {
2093			Column::Hash(column) => Some(column.stat_summary()),
2094			Column::Tree(_column) => None,
2095		}
2096	}
2097
2098	pub fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
2099		match self {
2100			Column::Hash(column) => column.dump(log, check_params, col),
2101			Column::Tree(_column) => Ok(()),
2102		}
2103	}
2104
2105	#[cfg(test)]
2106	#[cfg(feature = "instrumentation")]
2107	pub fn index_bits(&self) -> Option<u8> {
2108		match self {
2109			Column::Hash(column) => Some(column.tables.read().index.id.index_bits()),
2110			Column::Tree(_column) => None,
2111		}
2112	}
2113}