1use crate::{
5 btree::BTreeTable,
6 compress::Compress,
7 db::{check::CheckDisplay, NodeChange, Operation, RcValue},
8 display::hex,
9 error::{try_io, Error, Result},
10 index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
11 log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
12 multitree::{Children, NewNode, NodeAddress, NodeRef},
13 options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
14 parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
15 ref_count::{RefCountTable, RefCountTableId},
16 stats::{ColumnStatSummary, ColumnStats},
17 table::{
18 key::{TableKey, TableKeyQuery},
19 TableId as ValueTableId, Value, ValueTable, SIZE_TIERS,
20 },
21 Key,
22};
23use std::{
24 collections::{HashMap, VecDeque},
25 path::PathBuf,
26 sync::{
27 atomic::{AtomicU64, Ordering},
28 Arc,
29 },
30};
31
32pub const MIN_INDEX_BITS: u8 = 16;
33pub const MIN_REF_COUNT_BITS: u8 = 16;
34const MAX_REINDEX_BATCH: usize = 8192;
36
37pub type ColId = u8;
38pub type Salt = [u8; 32];
39
40const SIZES: [u16; SIZE_TIERS - 1] = [
60 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 50, 51, 52, 54, 55, 57, 58, 60,
61 62, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 88, 90, 93, 95, 98, 101, 103, 106, 109,
62 112, 115, 119, 122, 125, 129, 132, 136, 140, 144, 148, 152, 156, 160, 165, 169, 174, 179, 183,
63 189, 194, 199, 205, 210, 216, 222, 228, 235, 241, 248, 255, 262, 269, 276, 284, 292, 300, 308,
64 317, 325, 334, 344, 353, 363, 373, 383, 394, 405, 416, 428, 439, 452, 464, 477, 490, 504, 518,
65 532, 547, 562, 577, 593, 610, 627, 644, 662, 680, 699, 718, 738, 758, 779, 801, 823, 846, 869,
66 893, 918, 943, 969, 996, 1024, 1052, 1081, 1111, 1142, 1174, 1206, 1239, 1274, 1309, 1345,
67 1382, 1421, 1460, 1500, 1542, 1584, 1628, 1673, 1720, 1767, 1816, 1866, 1918, 1971, 2025, 2082,
68 2139, 2198, 2259, 2322, 2386, 2452, 2520, 2589, 2661, 2735, 2810, 2888, 2968, 3050, 3134, 3221,
69 3310, 3402, 3496, 3593, 3692, 3794, 3899, 4007, 4118, 4232, 4349, 4469, 4593, 4720, 4850, 4984,
70 5122, 5264, 5410, 5559, 5713, 5871, 6034, 6200, 6372, 6548, 6729, 6916, 7107, 7303, 7506, 7713,
71 7927, 8146, 8371, 8603, 8841, 9085, 9337, 9595, 9860, 10133, 10413, 10702, 10998, 11302, 11614,
72 11936, 12266, 12605, 12954, 13312, 13681, 14059, 14448, 14848, 15258, 15681, 16114, 16560,
73 17018, 17489, 17973, 18470, 18981, 19506, 20046, 20600, 21170, 21756, 22358, 22976, 23612,
74 24265, 24936, 25626, 26335, 27064, 27812, 28582, 29372, 30185, 31020, 31878, 32760,
75];
76
77#[derive(Debug)]
78struct Tables {
79 index: IndexTable,
80 value: Vec<ValueTable>,
81 ref_count: Option<RefCountTable>,
82}
83
84impl Tables {
85 fn get_ref_count(&self) -> &RefCountTable {
86 self.ref_count.as_ref().unwrap()
87 }
88}
89
90#[derive(Debug)]
91enum ReindexEntry {
92 Index(IndexTable),
93 RefCount(RefCountTable),
94}
95
96#[derive(Debug)]
97struct Reindex {
98 queue: VecDeque<ReindexEntry>,
99 progress: AtomicU64,
100}
101
102#[allow(clippy::large_enum_variant)]
103#[derive(Debug)]
104pub enum Column {
105 Hash(HashColumn),
106 Tree(BTreeTable),
107}
108
109#[derive(Debug)]
110pub struct HashColumn {
111 col: ColId,
112 tables: RwLock<Tables>,
113 reindex: RwLock<Reindex>,
114 ref_count_cache: Option<RwLock<HashMap<u64, u64>>>,
115 path: PathBuf,
116 preimage: bool,
117 uniform_keys: bool,
118 collect_stats: bool,
119 ref_counted: bool,
120 append_only: bool,
121 salt: Salt,
122 stats: ColumnStats,
123 compression: Compress,
124 db_version: u32,
125}
126
127#[derive(Clone, Copy)]
128pub struct TablesRef<'a> {
129 pub tables: &'a [ValueTable],
130 pub compression: &'a Compress,
131 pub col: ColId,
132 pub preimage: bool,
133 pub ref_counted: bool,
134}
135
136pub struct ValueIterState {
138 pub rc: u32,
140 pub value: Vec<u8>,
142}
143
144pub struct CorruptedIndexEntryInfo {
146 pub chunk_index: u64,
147 pub sub_index: u32,
148 pub entry: crate::index::Entry,
149 pub value_entry: Option<Vec<u8>>,
150 pub error: Option<Error>,
151}
152
153pub struct IterState {
155 pub item_index: u64,
156 pub total_items: u64,
157 pub key: Key,
158 pub rc: u32,
159 pub value: Vec<u8>,
160}
161
162enum IterStateOrCorrupted {
164 Item(IterState),
165 Corrupted(CorruptedIndexEntryInfo),
166}
167
168#[inline]
169pub fn hash_key(key: &[u8], salt: &Salt, uniform: bool, db_version: u32) -> Key {
170 use blake2::{
171 digest::{typenum::U32, FixedOutput, Update},
172 Blake2bMac,
173 };
174
175 let mut k = Key::default();
176 if uniform {
177 if db_version <= 5 {
178 k.copy_from_slice(&key[0..32]);
179 } else if db_version <= 7 {
180 let key = &key[0..32];
182 for i in 0..32 {
183 k[i] = key[i] ^ salt[i];
184 }
185 } else {
186 #[cfg(any(test, feature = "instrumentation"))]
187 if salt == &Salt::default() {
189 k.copy_from_slice(&key);
190 return k
191 }
192 use siphasher::sip128::Hasher128;
194 use std::hash::Hasher;
195 let mut hasher = siphasher::sip128::SipHasher13::new_with_key(
196 salt[..16].try_into().expect("Salt length is 32"),
197 );
198 hasher.write(&key);
199 let hash = hasher.finish128();
200 k[0..8].copy_from_slice(&hash.h1.to_le_bytes());
201 k[8..16].copy_from_slice(&hash.h2.to_le_bytes());
202 k[16..].copy_from_slice(&key[16..]);
203 }
204 } else {
205 let mut ctx = Blake2bMac::<U32>::new_with_salt_and_personal(salt, &[], &[])
206 .expect("Salt length (32) is a valid key length (<= 64)");
207 ctx.update(key);
208 let hash = ctx.finalize_fixed();
209 k.copy_from_slice(&hash);
210 }
211 k
212}
213
214pub struct ReindexBatch {
215 pub drop_index: Option<IndexTableId>,
216 pub batch: Vec<(Key, Address)>,
217 pub drop_ref_count: Option<RefCountTableId>,
218 pub ref_count_batch: Vec<(Address, u64)>,
219 pub ref_count_batch_source: Option<RefCountTableId>,
220}
221
222impl HashColumn {
223 pub fn get(&self, key: &Key, log: &impl LogQuery) -> Result<Option<(Value, u32)>> {
224 let tables = self.tables.read();
225 let values = self.as_ref(&tables.value);
226 if let Some((tier, rc, value)) = self.get_in_index(key, &tables.index, values, log)? {
227 if self.collect_stats {
228 self.stats.query_hit(tier);
229 }
230 return Ok(Some((value, rc)))
231 }
232 for entry in &self.reindex.read().queue {
233 if let ReindexEntry::Index(r) = entry {
234 if let Some((tier, rc, value)) = self.get_in_index(key, r, values, log)? {
235 if self.collect_stats {
236 self.stats.query_hit(tier);
237 }
238 return Ok(Some((value, rc)))
239 }
240 }
241 }
242 if self.collect_stats {
243 self.stats.query_miss();
244 }
245 Ok(None)
246 }
247
248 pub fn get_size(&self, key: &Key, log: &RwLock<LogOverlays>) -> Result<Option<u32>> {
249 Ok(self.get(key, log)?.map(|(v, _rc)| v.len() as u32))
250 }
251
252 pub fn get_value(&self, address: Address, log: &impl LogQuery) -> Result<Option<Value>> {
253 let tables = self.tables.read();
254 let values = self.as_ref(&tables.value);
255 if let Some((tier, _rc, value)) =
256 Column::get_value(TableKeyQuery::Check(&TableKey::NoHash), address, values, log)?
257 {
258 if self.collect_stats {
259 self.stats.query_hit(tier);
260 }
261 return Ok(Some(value))
262 }
263 if self.collect_stats {
264 self.stats.query_miss();
265 }
266 Ok(None)
267 }
268
269 fn get_in_index(
270 &self,
271 key: &Key,
272 index: &IndexTable,
273 tables: TablesRef,
274 log: &impl LogQuery,
275 ) -> Result<Option<(u8, u32, Value)>> {
276 let (mut entry, mut sub_index) = index.get(key, 0, log)?;
277 while !entry.is_empty() {
278 let address = entry.address(index.id.index_bits());
279 let value = Column::get_value(
280 TableKeyQuery::Check(&TableKey::Partial(*key)),
281 address,
282 tables,
283 log,
284 )?;
285 match value {
286 Some(result) => return Ok(Some(result)),
287 None => {
288 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
289 entry = next_entry;
290 sub_index = next_index;
291 },
292 }
293 }
294 Ok(None)
295 }
296
297 pub fn as_ref<'a>(&'a self, tables: &'a [ValueTable]) -> TablesRef<'a> {
298 TablesRef {
299 tables,
300 preimage: self.preimage,
301 col: self.col,
302 ref_counted: self.ref_counted,
303 compression: &self.compression,
304 }
305 }
306}
307
308impl Column {
309 pub fn get_value(
310 mut key: TableKeyQuery,
311 address: Address,
312 tables: TablesRef,
313 log: &impl LogQuery,
314 ) -> Result<Option<(u8, u32, Value)>> {
315 let size_tier = address.size_tier() as usize;
316 if let Some((value, compressed, rc)) =
317 tables.tables[size_tier].query(&mut key, address.offset(), log)?
318 {
319 let value = if compressed { tables.compression.decompress(&value)? } else { value };
320 return Ok(Some((size_tier as u8, rc, value)))
321 }
322 Ok(None)
323 }
324
325 pub fn compress(
326 compression: &Compress,
327 key: &TableKey,
328 value: &[u8],
329 tables: &[ValueTable],
330 ) -> (Option<Vec<u8>>, usize) {
331 let (len, result) = if value.len() > compression.threshold as usize {
332 let cvalue = compression.compress(value);
333 if cvalue.len() < value.len() {
334 (cvalue.len(), Some(cvalue))
335 } else {
336 (value.len(), None)
337 }
338 } else {
339 (value.len(), None)
340 };
341 let target_tier = tables
342 .iter()
343 .position(|t| t.value_size(key).map_or(false, |s| len <= s as usize));
344 let target_tier = target_tier.unwrap_or_else(|| {
345 log::trace!(target: "parity-db", "Using blob {}", key);
346 tables.len() - 1
347 });
348
349 (result, target_tier)
350 }
351
352 pub fn open(col: ColId, options: &Options, metadata: &Metadata) -> Result<Column> {
353 let path = &options.path;
354 let arc_path = Arc::new(path.clone());
355 let column_options = &metadata.columns[col as usize];
356 let db_version = metadata.version;
357 let value = (0..SIZE_TIERS)
358 .map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version))
359 .collect::<Result<_>>()?;
360
361 if column_options.btree_index {
362 Ok(Column::Tree(BTreeTable::open(col, value, options, metadata)?))
363 } else {
364 Ok(Column::Hash(HashColumn::open(col, value, options, metadata)?))
365 }
366 }
367
368 fn open_table(
369 path: Arc<PathBuf>,
370 col: ColId,
371 tier: u8,
372 options: &ColumnOptions,
373 db_version: u32,
374 ) -> Result<ValueTable> {
375 let id = ValueTableId::new(col, tier);
376 let entry_size = SIZES.get(tier as usize).cloned();
377 ValueTable::open(path, id, entry_size, options, db_version)
378 }
379
380 pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
381 let mut to_delete = Vec::new();
384 for entry in try_io!(std::fs::read_dir(&path)) {
385 let entry = try_io!(entry);
386 if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
387 if crate::index::TableId::is_file_name(column, file) ||
388 crate::table::TableId::is_file_name(column, file) ||
389 crate::ref_count::RefCountTableId::is_file_name(column, file)
390 {
391 to_delete.push(PathBuf::from(file));
392 }
393 }
394 }
395
396 for file in to_delete {
397 let mut path = path.clone();
398 path.push(file);
399 try_io!(std::fs::remove_file(path));
400 }
401 Ok(())
402 }
403}
404
405pub fn packed_node_size(data: &Vec<u8>, num_children: u8) -> usize {
406 data.len() + num_children as usize * 8 + 1
407}
408
409pub fn unpack_node_data(data: Vec<u8>) -> Result<(Vec<u8>, Children)> {
410 if data.len() == 0 {
411 return Err(Error::InvalidValueData)
412 }
413 let num_children = data[data.len() - 1] as usize;
414 let child_buf_len = num_children * 8;
415 if data.len() < (child_buf_len + 1) {
416 return Err(Error::InvalidValueData)
417 }
418 let data_len = data.len() - (child_buf_len + 1);
419 let mut children = Children::with_capacity(num_children);
420 for i in 0..num_children {
421 let node_address =
422 u64::from_le_bytes(data[data_len + i * 8..data_len + (i + 1) * 8].try_into().unwrap());
423 children.push(node_address);
424 }
425 let data = data.split_at(data_len).0.to_vec();
426 Ok((data, children))
427}
428
429pub fn unpack_node_children(data: &[u8]) -> Result<Children> {
430 if data.len() == 0 {
431 return Err(Error::InvalidValueData)
432 }
433 let num_children = data[data.len() - 1] as usize;
434 let child_buf_len = num_children * 8;
435 if data.len() < (child_buf_len + 1) {
436 return Err(Error::InvalidValueData)
437 }
438 let data_len = data.len() - (child_buf_len + 1);
439 let mut children = Children::with_capacity(num_children);
440 for i in 0..num_children {
441 let node_address =
442 u64::from_le_bytes(data[data_len + i * 8..data_len + (i + 1) * 8].try_into().unwrap());
443 children.push(node_address);
444 }
445 Ok(children)
446}
447
448impl HashColumn {
449 fn open(
450 col: ColId,
451 value: Vec<ValueTable>,
452 options: &Options,
453 metadata: &Metadata,
454 ) -> Result<HashColumn> {
455 let (index, mut reindexing, stats) = Self::open_index(&options.path, col)?;
456 let collect_stats = options.stats;
457 let path = &options.path;
458 let col_options = &metadata.columns[col as usize];
459 let db_version = metadata.version;
460 let (ref_count, ref_count_cache) = if col_options.multitree && !col_options.append_only {
461 (
462 Some(Self::open_ref_count(&options.path, col, &mut reindexing)?),
463 Some(RwLock::new(Default::default())),
464 )
465 } else {
466 (None, None)
467 };
468 Ok(HashColumn {
469 col,
470 tables: RwLock::new(Tables { index, value, ref_count }),
471 reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }),
472 ref_count_cache,
473 path: path.into(),
474 preimage: col_options.preimage,
475 uniform_keys: col_options.uniform,
476 ref_counted: col_options.ref_counted,
477 append_only: col_options.append_only,
478 collect_stats,
479 salt: metadata.salt,
480 stats,
481 compression: Compress::new(
482 col_options.compression,
483 options
484 .compression_threshold
485 .get(&col)
486 .copied()
487 .unwrap_or(DEFAULT_COMPRESSION_THRESHOLD),
488 ),
489 db_version,
490 })
491 }
492
493 pub fn init_table_data(&mut self) -> Result<()> {
494 let mut tables = self.tables.write();
495 for table in &mut tables.value {
496 table.init_table_data()?;
497 }
498
499 if let Some(cache) = &self.ref_count_cache {
500 let mut cache = cache.write();
501
502 for entry in &self.reindex.read().queue {
503 if let ReindexEntry::RefCount(table) = entry {
504 for index in 0..table.id.total_chunks() {
505 let entries = table.table_entries(index)?;
506 for entry in entries.iter() {
507 if !entry.is_empty() {
508 cache.insert(entry.address().as_u64(), entry.ref_count());
509 }
510 }
511 }
512 }
513 }
514
515 let table = &tables.ref_count;
516 if let Some(table) = table {
517 for index in 0..table.id.total_chunks() {
518 let entries = table.table_entries(index)?;
519 for entry in entries.iter() {
520 if !entry.is_empty() {
521 cache.insert(entry.address().as_u64(), entry.ref_count());
522 }
523 }
524 }
525 }
526 }
527
528 Ok(())
529 }
530
531 pub fn hash_key(&self, key: &[u8]) -> Key {
532 hash_key(key, &self.salt, self.uniform_keys, self.db_version)
533 }
534
535 pub fn flush(&self) -> Result<()> {
536 let tables = self.tables.read();
537 tables.index.flush()?;
538 for t in tables.value.iter() {
539 t.flush()?;
540 }
541 if tables.ref_count.is_some() {
542 tables.get_ref_count().flush()?;
543 }
544 Ok(())
545 }
546
547 fn open_index(
548 path: &std::path::Path,
549 col: ColId,
550 ) -> Result<(IndexTable, VecDeque<ReindexEntry>, ColumnStats)> {
551 let mut reindexing = VecDeque::new();
552 let mut top = None;
553 let mut stats = ColumnStats::empty();
554 for bits in (MIN_INDEX_BITS..65).rev() {
555 let id = IndexTableId::new(col, bits);
556 if let Some(table) = IndexTable::open_existing(path, id)? {
557 if top.is_none() {
558 stats = table.load_stats()?;
559 log::trace!(target: "parity-db", "Opened main index {}", table.id);
560 top = Some(table);
561 } else {
562 log::trace!(target: "parity-db", "Opened stale index {}", table.id);
563 reindexing.push_front(ReindexEntry::Index(table));
564 }
565 }
566 }
567 let table = match top {
568 Some(table) => table,
569 None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)),
570 };
571 Ok((table, reindexing, stats))
572 }
573
574 fn open_ref_count(
575 path: &std::path::Path,
576 col: ColId,
577 reindexing: &mut VecDeque<ReindexEntry>,
578 ) -> Result<RefCountTable> {
579 let mut top = None;
580 for bits in (MIN_REF_COUNT_BITS..65).rev() {
581 let id = RefCountTableId::new(col, bits);
582 if let Some(table) = RefCountTable::open_existing(path, id)? {
583 if top.is_none() {
584 log::trace!(target: "parity-db", "Opened main ref count {}", table.id);
585 top = Some(table);
586 } else {
587 log::trace!(target: "parity-db", "Opened stale ref count {}", table.id);
588 reindexing.push_front(ReindexEntry::RefCount(table));
589 }
590 }
591 }
592 let table = match top {
593 Some(table) => table,
594 None => RefCountTable::create_new(path, RefCountTableId::new(col, MIN_REF_COUNT_BITS)),
595 };
596 Ok(table)
597 }
598
599 fn trigger_reindex<'a, 'b>(
600 tables: RwLockUpgradableReadGuard<'a, Tables>,
601 reindex: RwLockUpgradableReadGuard<'b, Reindex>,
602 path: &std::path::Path,
603 ) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
604 let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
605 let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
606 log::info!(
607 target: "parity-db",
608 "Started reindex for {}",
609 tables.index.id,
610 );
611 let new_index_id =
613 IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1);
614 let new_table = IndexTable::create_new(path, new_index_id);
615 let old_table = std::mem::replace(&mut tables.index, new_table);
616 reindex.queue.push_back(ReindexEntry::Index(old_table));
617 (
618 RwLockWriteGuard::downgrade_to_upgradable(tables),
619 RwLockWriteGuard::downgrade_to_upgradable(reindex),
620 )
621 }
622
623 pub fn write_reindex_plan(
624 &self,
625 key: &Key,
626 address: Address,
627 log: &mut LogWriter,
628 ) -> Result<PlanOutcome> {
629 let tables = self.tables.upgradable_read();
630 let reindex = self.reindex.upgradable_read();
631 self.write_reindex_plan_locked(tables, reindex, key, address, log)
632 }
633
634 fn write_reindex_plan_locked(
635 &self,
636 mut tables: RwLockUpgradableReadGuard<Tables>,
637 mut reindex: RwLockUpgradableReadGuard<Reindex>,
638 key: &Key,
639 address: Address,
640 log: &mut LogWriter,
641 ) -> Result<PlanOutcome> {
642 if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
643 log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
644 return Ok(PlanOutcome::Skipped)
645 }
646 let mut outcome = PlanOutcome::Written;
647 while let PlanOutcome::NeedReindex =
648 tables.index.write_insert_plan(key, address, None, log)?
649 {
650 log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
651 (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
652 outcome = PlanOutcome::NeedReindex;
653 }
654 Ok(outcome)
655 }
656
657 fn search_index<'a>(
658 key: &Key,
659 index: &'a IndexTable,
660 tables: &'a Tables,
661 log: &LogWriter,
662 ) -> Result<Option<(&'a IndexTable, usize, Address)>> {
663 let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
664 while !existing_entry.is_empty() {
665 let existing_address = existing_entry.address(index.id.index_bits());
666 let existing_tier = existing_address.size_tier();
667 let table_key = TableKey::Partial(*key);
668 if tables.value[existing_tier as usize].has_key_at(
669 existing_address.offset(),
670 &table_key,
671 log,
672 )? {
673 return Ok(Some((index, sub_index, existing_address)))
674 }
675
676 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
677 existing_entry = next_entry;
678 sub_index = next_index;
679 }
680 Ok(None)
681 }
682
683 fn contains_partial_key_with_address(
684 key: &Key,
685 address: Address,
686 index: &IndexTable,
687 log: &LogWriter,
688 ) -> Result<bool> {
689 let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
690 while !existing_entry.is_empty() {
691 let existing_address = existing_entry.address(index.id.index_bits());
692 if existing_address == address {
693 return Ok(true)
694 }
695 let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
696 existing_entry = next_entry;
697 sub_index = next_index;
698 }
699 Ok(false)
700 }
701
702 fn search_all_indexes<'a>(
703 key: &Key,
704 tables: &'a Tables,
705 reindex: &'a Reindex,
706 log: &LogWriter,
707 ) -> Result<Option<(&'a IndexTable, usize, Address)>> {
708 if let Some(r) = Self::search_index(key, &tables.index, tables, log)? {
709 return Ok(Some(r))
710 }
711 for entry in &reindex.queue {
714 if let ReindexEntry::Index(index) = entry {
715 if let Some(r) = Self::search_index(key, index, tables, log)? {
716 return Ok(Some(r))
717 }
718 }
719 }
720 Ok(None)
721 }
722
723 pub fn write_plan(
724 &self,
725 change: &Operation<Key, RcValue>,
726 log: &mut LogWriter,
727 ) -> Result<PlanOutcome> {
728 let tables = self.tables.upgradable_read();
729 let reindex = self.reindex.upgradable_read();
730 let existing = Self::search_all_indexes(change.key(), &tables, &reindex, log)?;
731 if let Some((table, sub_index, existing_address)) = existing {
732 self.write_plan_existing(&tables, change, log, table, sub_index, existing_address)
733 } else {
734 match change {
735 Operation::Set(key, value) => {
736 let (r, _, _) =
737 self.write_plan_new(tables, reindex, key, value.as_ref(), log)?;
738 Ok(r)
739 },
740 Operation::Dereference(key) => {
741 log::trace!(target: "parity-db", "{}: Deleting missing key {}", tables.index.id, hex(key));
742 if self.collect_stats {
743 self.stats.remove_miss();
744 }
745 Ok(PlanOutcome::Skipped)
746 },
747 Operation::Reference(key) => {
748 log::trace!(target: "parity-db", "{}: Ignoring increase rc, missing key {}", tables.index.id, hex(key));
749 if self.collect_stats {
750 self.stats.reference_increase_miss();
751 }
752 Ok(PlanOutcome::Skipped)
753 },
754 Operation::InsertTree(..) |
755 Operation::ReferenceTree(..) |
756 Operation::DereferenceTree(..) =>
757 Err(Error::InvalidConfiguration("Unsupported operation on hash column".into())),
758 }
759 }
760 }
761
762 #[allow(clippy::too_many_arguments)]
763 fn write_plan_existing(
764 &self,
765 tables: &Tables,
766 change: &Operation<Key, RcValue>,
767 log: &mut LogWriter,
768 index: &IndexTable,
769 sub_index: usize,
770 existing_address: Address,
771 ) -> Result<PlanOutcome> {
772 let stats = if self.collect_stats { Some(&self.stats) } else { None };
773
774 let key = change.key();
775 let table_key = TableKey::Partial(*key);
776 match Column::write_existing_value_plan(
777 &table_key,
778 self.as_ref(&tables.value),
779 existing_address,
780 change,
781 log,
782 stats,
783 self.ref_counted,
784 )? {
785 (Some(outcome), _) => Ok(outcome),
786 (None, Some(value_address)) => {
787 let sub_index = if index.id == tables.index.id { Some(sub_index) } else { None };
790 tables.index.write_insert_plan(key, value_address, sub_index, log)
791 },
792 (None, None) => {
793 log::trace!(target: "parity-db", "{}: Removing from index {}", tables.index.id, hex(key));
794 index.write_remove_plan(key, sub_index, log)?;
795 Ok(PlanOutcome::Written)
796 },
797 }
798 }
799
800 fn write_plan_new<'a, 'b>(
801 &self,
802 mut tables: RwLockUpgradableReadGuard<'a, Tables>,
803 mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
804 key: &Key,
805 value: &[u8],
806 log: &mut LogWriter,
807 ) -> Result<(
808 PlanOutcome,
809 RwLockUpgradableReadGuard<'a, Tables>,
810 RwLockUpgradableReadGuard<'b, Reindex>,
811 )> {
812 let stats = self.collect_stats.then_some(&self.stats);
813 let table_key = TableKey::Partial(*key);
814 let address = Column::write_new_value_plan(
815 &table_key,
816 self.as_ref(&tables.value),
817 value,
818 log,
819 stats,
820 )?;
821 let mut outcome = PlanOutcome::Written;
822 while let PlanOutcome::NeedReindex =
823 tables.index.write_insert_plan(key, address, None, log)?
824 {
825 log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
826 (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
827 outcome = PlanOutcome::NeedReindex;
828 }
829 Ok((outcome, tables, reindex))
830 }
831
832 fn trigger_ref_count_reindex<'a, 'b>(
833 tables: RwLockUpgradableReadGuard<'a, Tables>,
834 reindex: RwLockUpgradableReadGuard<'b, Reindex>,
835 path: &std::path::Path,
836 ) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
837 let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
838 let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
839 log::info!(
840 target: "parity-db",
841 "Started reindex for ref count {}",
842 tables.get_ref_count().id,
843 );
844 let new_id = RefCountTableId::new(
846 tables.get_ref_count().id.col(),
847 tables.get_ref_count().id.index_bits() + 1,
848 );
849 let new_table = Some(RefCountTable::create_new(path, new_id));
850 let old_table = std::mem::replace(&mut tables.ref_count, new_table);
851 reindex.queue.push_back(ReindexEntry::RefCount(old_table.unwrap()));
852 (
853 RwLockWriteGuard::downgrade_to_upgradable(tables),
854 RwLockWriteGuard::downgrade_to_upgradable(reindex),
855 )
856 }
857
858 pub fn write_ref_count_reindex_plan(
859 &self,
860 address: Address,
861 ref_count: u64,
862 source: RefCountTableId,
863 log: &mut LogWriter,
864 ) -> Result<PlanOutcome> {
865 let tables = self.tables.upgradable_read();
866 let reindex = self.reindex.upgradable_read();
867 self.write_ref_count_reindex_plan_locked(tables, reindex, address, ref_count, source, log)
868 }
869
870 fn write_ref_count_reindex_plan_locked(
871 &self,
872 mut tables: RwLockUpgradableReadGuard<Tables>,
873 mut reindex: RwLockUpgradableReadGuard<Reindex>,
874 address: Address,
875 ref_count: u64,
876 source: RefCountTableId,
877 log: &mut LogWriter,
878 ) -> Result<PlanOutcome> {
879 if let Some((_ref_count, _sub_index)) = tables.get_ref_count().get(address, log)? {
880 log::trace!(target: "parity-db", "{}: Skipped ref count reindex entry {} when reindexing", tables.get_ref_count().id, address);
881 return Ok(PlanOutcome::Skipped)
882 }
883 for entry in reindex.queue.iter().rev() {
886 if let ReindexEntry::RefCount(ref_count_table) = entry {
887 if ref_count_table.id == source {
888 break
889 }
890 if let Some(_r) = Self::search_ref_count(address, ref_count_table, log)? {
891 log::trace!(target: "parity-db", "{}: Skipped ref count reindex entry {} when reindexing", ref_count_table.id, address);
892 return Ok(PlanOutcome::Skipped)
893 }
894 }
895 }
896 let mut outcome = PlanOutcome::Written;
897 while let PlanOutcome::NeedReindex =
898 tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
899 {
900 log::debug!(target: "parity-db", "{}: Ref count chunk full {} when reindexing", tables.get_ref_count().id, address);
901 (tables, reindex) =
902 Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
903 outcome = PlanOutcome::NeedReindex;
904 }
905 Ok(outcome)
906 }
907
908 fn search_ref_count<'a>(
909 address: Address,
910 ref_count_table: &'a RefCountTable,
911 log: &LogWriter,
912 ) -> Result<Option<(&'a RefCountTable, usize, u64)>> {
913 if let Some((ref_count, sub_index)) = ref_count_table.get(address, log)? {
914 return Ok(Some((ref_count_table, sub_index, ref_count)))
915 }
916 Ok(None)
917 }
918
919 fn search_all_ref_count<'a>(
920 address: Address,
921 tables: &'a Tables,
922 reindex: &'a Reindex,
923 log: &LogWriter,
924 ) -> Result<Option<(&'a RefCountTable, usize, u64)>> {
925 if let Some(r) = Self::search_ref_count(address, tables.get_ref_count(), log)? {
926 return Ok(Some(r))
927 }
928 for entry in reindex.queue.iter().rev() {
931 if let ReindexEntry::RefCount(ref_count_table) = entry {
932 if let Some(r) = Self::search_ref_count(address, ref_count_table, log)? {
933 return Ok(Some(r))
934 }
935 }
936 }
937 Ok(None)
938 }
939
940 fn write_ref_count_plan_existing<'a>(
941 &self,
942 tables: &Tables,
943 reindex: &'a Reindex,
944 change: (Address, Option<u64>),
945 log: &mut LogWriter,
946 ref_count_table: &RefCountTable,
947 sub_index: usize,
948 ) -> Result<PlanOutcome> {
949 let (address, ref_count) = change;
950 if let Some(ref_count) = ref_count {
951 assert!(ref_count_table.id == tables.get_ref_count().id);
953 tables
954 .get_ref_count()
955 .write_insert_plan(address, ref_count, Some(sub_index), log)
956 } else {
957 let result = ref_count_table.write_remove_plan(address, sub_index, log);
959 {
962 if ref_count_table.id != tables.get_ref_count().id {
963 if let Some((table, sub_index, _ref_count)) =
964 Self::search_ref_count(address, tables.get_ref_count(), log)?
965 {
966 table.write_remove_plan(address, sub_index, log)?;
967 }
968 }
969 for entry in &reindex.queue {
970 if let ReindexEntry::RefCount(table) = entry {
971 if table.id != ref_count_table.id {
972 if let Some((table, sub_index, _ref_count)) =
973 Self::search_ref_count(address, table, log)?
974 {
975 table.write_remove_plan(address, sub_index, log)?;
976 }
977 }
978 }
979 }
980 }
981 result
982 }
983 }
984
985 fn write_ref_count_plan_new<'a, 'b>(
986 &self,
987 mut tables: RwLockUpgradableReadGuard<'a, Tables>,
988 mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
989 address: Address,
990 ref_count: u64,
991 log: &mut LogWriter,
992 ) -> Result<(
993 PlanOutcome,
994 RwLockUpgradableReadGuard<'a, Tables>,
995 RwLockUpgradableReadGuard<'b, Reindex>,
996 )> {
997 let mut outcome = PlanOutcome::Written;
998 while let PlanOutcome::NeedReindex =
999 tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
1000 {
1001 log::debug!(target: "parity-db", "{}: Ref count chunk full {}", tables.get_ref_count().id, address);
1002 (tables, reindex) =
1003 Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
1004 outcome = PlanOutcome::NeedReindex;
1005 }
1006 let (test_ref_count, _test_sub_index) = tables.get_ref_count().get(address, log)?.unwrap();
1007 assert!(test_ref_count == ref_count);
1008 Ok((outcome, tables, reindex))
1009 }
1010
1011 fn prepare_children(
1012 &self,
1013 children: &Vec<NodeRef>,
1014 tables: TablesRef,
1015 tier_count: &mut HashMap<usize, usize>,
1016 ) -> Result<()> {
1017 for child in children {
1018 match child {
1019 NodeRef::New(node) => self.prepare_node(node, tables, tier_count)?,
1020 NodeRef::Existing(_address) => {},
1021 };
1022 }
1023 Ok(())
1024 }
1025
1026 fn prepare_node(
1027 &self,
1028 node: &NewNode,
1029 tables: TablesRef,
1030 tier_count: &mut HashMap<usize, usize>,
1031 ) -> Result<()> {
1032 let data_size = packed_node_size(&node.data, node.children.len() as u8);
1033
1034 let table_key = TableKey::NoHash;
1035
1036 let target_tier = tables
1037 .tables
1038 .iter()
1039 .position(|t| t.value_size(&table_key).map_or(false, |s| data_size <= s as usize));
1040 let target_tier = target_tier.unwrap_or_else(|| tables.tables.len() - 1);
1041
1042 match tier_count.entry(target_tier) {
1046 std::collections::hash_map::Entry::Occupied(mut entry) => {
1047 *entry.get_mut() += 1;
1048 },
1049 std::collections::hash_map::Entry::Vacant(entry) => {
1050 entry.insert(1);
1051 },
1052 }
1053
1054 self.prepare_children(&node.children, tables, tier_count)?;
1055
1056 Ok(())
1057 }
1058
1059 fn claim_children_to_data(
1060 &self,
1061 children: &Vec<NodeRef>,
1062 tables: TablesRef,
1063 tier_addresses: &HashMap<usize, Vec<u64>>,
1064 tier_index: &mut HashMap<usize, usize>,
1065 node_values: &mut Vec<NodeChange>,
1066 data: &mut Vec<u8>,
1067 ) -> Result<()> {
1068 for child in children {
1069 let address = match child {
1070 NodeRef::New(node) =>
1071 self.claim_node(node, tables, tier_addresses, tier_index, node_values)?,
1072 NodeRef::Existing(address) => {
1073 if !self.append_only {
1074 node_values.push(NodeChange::IncrementReference(*address));
1075 }
1076 *address
1077 },
1078 };
1079 data.extend_from_slice(&address.to_le_bytes());
1080 }
1081 Ok(())
1082 }
1083
1084 fn claim_node(
1085 &self,
1086 node: &NewNode,
1087 tables: TablesRef,
1088 tier_addresses: &HashMap<usize, Vec<u64>>,
1089 tier_index: &mut HashMap<usize, usize>,
1090 node_values: &mut Vec<NodeChange>,
1091 ) -> Result<NodeAddress> {
1092 let num_children = node.children.len();
1093
1094 let data_size = packed_node_size(&node.data, num_children as u8);
1095
1096 let table_key = TableKey::NoHash;
1097
1098 let target_tier = tables
1099 .tables
1100 .iter()
1101 .position(|t| t.value_size(&table_key).map_or(false, |s| data_size <= s as usize));
1102 let target_tier = target_tier.unwrap_or_else(|| tables.tables.len() - 1);
1103
1104 let index = *tier_index.get(&target_tier).unwrap();
1105 tier_index.insert(target_tier, index + 1);
1106
1107 let offset = tier_addresses.get(&target_tier).unwrap()[index];
1108
1109 let mut data: Vec<u8> = Vec::with_capacity(data_size);
1110 data.extend_from_slice(&node.data);
1111 self.claim_children_to_data(
1112 &node.children,
1113 tables,
1114 tier_addresses,
1115 tier_index,
1116 node_values,
1117 &mut data,
1118 )?;
1119 data.push(num_children as u8);
1120
1121 let val: RcValue = data.into();
1123
1124 let address = Address::new(offset, target_tier as u8);
1125
1126 node_values.push(NodeChange::NewValue(address.as_u64(), val));
1127
1128 Ok(address.as_u64())
1129 }
1130
1131 pub fn claim_tree_values<K, V>(
1133 &self,
1134 change: &Operation<K, V>,
1135 ) -> Result<(Vec<u8>, Vec<NodeChange>)> {
1136 match change {
1137 Operation::InsertTree(_key, node) => {
1138 let tables = self.tables.upgradable_read();
1139
1140 let values = self.as_ref(&tables.value);
1141
1142 let mut tier_count: HashMap<usize, usize> = Default::default();
1143 self.prepare_children(&node.children, values, &mut tier_count)?;
1144
1145 let mut tier_addresses: HashMap<usize, Vec<u64>> = Default::default();
1146 let mut tier_index: HashMap<usize, usize> = Default::default();
1147 for (tier, count) in tier_count {
1148 let offsets = values.tables[tier].claim_entries(count)?;
1149 tier_addresses.insert(tier, offsets);
1150 tier_index.insert(tier, 0);
1151 }
1152
1153 let mut node_values: Vec<NodeChange> = Default::default();
1154
1155 let num_children = node.children.len();
1156 let data_size = packed_node_size(&node.data, num_children as u8);
1157 let mut data: Vec<u8> = Vec::with_capacity(data_size);
1158 data.extend_from_slice(&node.data);
1159 self.claim_children_to_data(
1160 &node.children,
1161 values,
1162 &tier_addresses,
1163 &mut tier_index,
1164 &mut node_values,
1165 &mut data,
1166 )?;
1167 data.push(num_children as u8);
1168
1169 return Ok((data, node_values))
1170 },
1171 Operation::ReferenceTree(..) | Operation::DereferenceTree(..) =>
1172 return Err(Error::InvalidInput(format!(
1173 "claim_tree_values should not be called from ReferenceTree or DereferenceTree"
1174 ))),
1175 _ =>
1176 return Err(Error::InvalidInput(format!(
1177 "Invalid operation for column {}",
1178 self.col
1179 ))),
1180 }
1181 }
1182
1183 pub fn write_address_value_plan(
1184 &self,
1185 address: u64,
1186 cval: RcValue,
1187 compressed: bool,
1188 val_len: u32,
1189 log: &mut LogWriter,
1190 ) -> Result<PlanOutcome> {
1191 let tables = self.tables.upgradable_read();
1192 let tables = self.as_ref(&tables.value);
1193 let address = Address::from_u64(address);
1194 let target_tier = address.size_tier();
1195 let offset = address.offset();
1196 tables.tables[target_tier as usize].write_claimed_plan(
1197 offset,
1198 &TableKey::NoHash,
1199 cval.as_ref(),
1200 log,
1201 compressed,
1202 )?;
1203
1204 let stats = self.collect_stats.then_some(&self.stats);
1205 if let Some(stats) = stats {
1206 stats.insert_val(val_len, cval.as_ref().len() as u32);
1207 }
1208
1209 Ok(PlanOutcome::Written)
1210 }
1211
1212 pub fn write_address_inc_ref_plan(
1213 &self,
1214 address: u64,
1215 log: &mut LogWriter,
1216 ) -> Result<PlanOutcome> {
1217 let tables = self.tables.upgradable_read();
1218 let reindex = self.reindex.upgradable_read();
1219 let address = Address::from_u64(address);
1220 let cached = self
1221 .ref_count_cache
1222 .as_ref()
1223 .map_or(None, |c| c.read().get(&address.as_u64()).cloned());
1224 if let Some(cached_ref_count) = cached {
1225 let existing: Option<(&RefCountTable, usize, u64)> =
1226 Self::search_all_ref_count(address, &tables, &reindex, log)?;
1227 let (table, sub_index, table_ref_count) = existing.unwrap();
1228 assert!(cached_ref_count > 1);
1229 assert!(table_ref_count == cached_ref_count);
1230 let new_ref_count = cached_ref_count + 1;
1231 self.ref_count_cache
1232 .as_ref()
1233 .map(|c| c.write().insert(address.as_u64(), new_ref_count));
1234 if table.id == tables.get_ref_count().id {
1235 self.write_ref_count_plan_existing(
1236 &tables,
1237 &reindex,
1238 (address, Some(new_ref_count)),
1239 log,
1240 table,
1241 sub_index,
1242 )
1243 } else {
1244 let (r, _, _) =
1245 self.write_ref_count_plan_new(tables, reindex, address, new_ref_count, log)?;
1246 Ok(r)
1247 }
1248 } else {
1249 self.ref_count_cache.as_ref().map(|c| c.write().insert(address.as_u64(), 2));
1252 let (r, _, _) = self.write_ref_count_plan_new(tables, reindex, address, 2, log)?;
1253 Ok(r)
1254 }
1255 }
1256
1257 pub fn write_address_dec_ref_plan(
1258 &self,
1259 address: u64,
1260 log: &mut LogWriter,
1261 ) -> Result<(bool, PlanOutcome)> {
1262 let tables = self.tables.upgradable_read();
1263 let reindex = self.reindex.upgradable_read();
1264 let address = Address::from_u64(address);
1265 let cached = self
1266 .ref_count_cache
1267 .as_ref()
1268 .map_or(None, |c| c.read().get(&address.as_u64()).cloned());
1269 if let Some(cached_ref_count) = cached {
1270 let existing: Option<(&RefCountTable, usize, u64)> =
1271 Self::search_all_ref_count(address, &tables, &reindex, log)?;
1272 let (table, sub_index, table_ref_count) = existing.unwrap();
1273 assert!(cached_ref_count > 1);
1274 assert!(table_ref_count == cached_ref_count);
1275 let new_ref_count = cached_ref_count - 1;
1276 self.ref_count_cache.as_ref().map(|c| {
1277 if new_ref_count > 1 {
1278 c.write().insert(address.as_u64(), new_ref_count);
1279 } else {
1280 c.write().remove(&address.as_u64());
1281 }
1282 });
1283 let new_ref_count = if new_ref_count > 1 { Some(new_ref_count) } else { None };
1284 let outcome = if new_ref_count.is_some() && table.id != tables.get_ref_count().id {
1285 let (r, _, _) = self.write_ref_count_plan_new(
1286 tables,
1287 reindex,
1288 address,
1289 new_ref_count.unwrap(),
1290 log,
1291 )?;
1292 r
1293 } else {
1294 self.write_ref_count_plan_existing(
1295 &tables,
1296 &reindex,
1297 (address, new_ref_count),
1298 log,
1299 table,
1300 sub_index,
1301 )?
1302 };
1303 Ok((true, outcome))
1304 } else {
1305 let tables = self.as_ref(&tables.value);
1308 let target_tier = address.size_tier();
1309 let offset = address.offset();
1310 tables.tables[target_tier as usize].write_remove_plan(offset, log)?;
1311 Ok((false, PlanOutcome::Written))
1312 }
1313 }
1314
1315 pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1316 let tables = self.tables.read();
1317 let reindex = self.reindex.read();
1318 match action {
1319 LogAction::InsertIndex(record) => {
1320 if tables.index.id == record.table {
1321 tables.index.enact_plan(record.index, log)?;
1322 } else if let Some(table) = reindex
1323 .queue
1324 .iter()
1325 .filter_map(|s| if let ReindexEntry::Index(t) = s { Some(t) } else { None })
1326 .find(|r| r.id == record.table)
1327 {
1328 table.enact_plan(record.index, log)?;
1329 } else {
1330 log::debug!(
1334 target: "parity-db",
1335 "Missing index {}. Skipped",
1336 record.table,
1337 );
1338 IndexTable::skip_plan(log)?;
1339 }
1340 },
1341 LogAction::InsertValue(record) => {
1342 tables.value[record.table.size_tier() as usize].enact_plan(record.index, log)?;
1343 },
1344 LogAction::InsertRefCount(record) => {
1345 if tables.get_ref_count().id == record.table {
1346 tables.get_ref_count().enact_plan(record.index, log)?;
1347 } else if let Some(table) = reindex
1348 .queue
1349 .iter()
1350 .filter_map(|s| if let ReindexEntry::RefCount(t) = s { Some(t) } else { None })
1351 .find(|r| r.id == record.table)
1352 {
1353 table.enact_plan(record.index, log)?;
1354 } else {
1355 log::debug!(
1359 target: "parity-db",
1360 "Missing ref count {}. Skipped",
1361 record.table,
1362 );
1363 RefCountTable::skip_plan(log)?;
1364 }
1365 },
1366 _ => return Err(Error::Corruption("Unexpected log action".into())),
1370 }
1371 Ok(())
1372 }
1373
1374 pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
1375 let tables = self.tables.upgradable_read();
1376 let reindex = self.reindex.upgradable_read();
1377 match action {
1378 LogAction::InsertIndex(record) => {
1379 if tables.index.id == record.table {
1380 tables.index.validate_plan(record.index, log)?;
1381 } else if let Some(table) = reindex
1382 .queue
1383 .iter()
1384 .filter_map(|s| if let ReindexEntry::Index(t) = s { Some(t) } else { None })
1385 .find(|r| r.id == record.table)
1386 {
1387 table.validate_plan(record.index, log)?;
1388 } else {
1389 if record.table.index_bits() < tables.index.id.index_bits() {
1390 log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
1392 return Err(Error::Corruption("Unexpected log index id".to_string()))
1393 }
1394 log::warn!(
1397 target: "parity-db",
1398 "Missing table {}, starting reindex",
1399 record.table,
1400 );
1401 let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
1402 std::mem::drop(lock);
1403 return self.validate_plan(LogAction::InsertIndex(record), log)
1404 }
1405 },
1406 LogAction::InsertValue(record) => {
1407 tables.value[record.table.size_tier() as usize].validate_plan(record.index, log)?;
1408 },
1409 LogAction::InsertRefCount(record) => {
1410 if tables.get_ref_count().id == record.table {
1411 tables.get_ref_count().validate_plan(record.index, log)?;
1412 } else if let Some(table) = reindex
1413 .queue
1414 .iter()
1415 .filter_map(|s| if let ReindexEntry::RefCount(t) = s { Some(t) } else { None })
1416 .find(|r| r.id == record.table)
1417 {
1418 table.validate_plan(record.index, log)?;
1419 } else {
1420 if record.table.index_bits() < tables.get_ref_count().id.index_bits() {
1421 log::warn!( target: "parity-db", "Ref count {} is too old. Current is {}", record.table, tables.get_ref_count().id);
1423 return Err(Error::Corruption("Unexpected log ref count id".to_string()))
1424 }
1425 log::warn!(
1428 target: "parity-db",
1429 "Missing ref count {}, starting reindex",
1430 record.table,
1431 );
1432 let lock =
1433 Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
1434 std::mem::drop(lock);
1435 return self.validate_plan(LogAction::InsertRefCount(record), log)
1436 }
1437 },
1438 _ => {
1439 log::error!(target: "parity-db", "Unexpected log action");
1440 return Err(Error::Corruption("Unexpected log action".to_string()))
1441 },
1442 }
1443 Ok(())
1444 }
1445
1446 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
1447 let tables = self.tables.read();
1448 for t in tables.value.iter() {
1449 t.complete_plan(log)?;
1450 }
1451 if self.collect_stats {
1452 self.stats.commit()
1453 }
1454 Ok(())
1455 }
1456
1457 pub fn refresh_metadata(&self) -> Result<()> {
1458 let tables = self.tables.read();
1459 for t in tables.value.iter() {
1460 t.refresh_metadata()?;
1461 }
1462 Ok(())
1463 }
1464
1465 pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
1466 let tables = self.tables.read();
1467 tables.index.write_stats(&self.stats)?;
1468 self.stats.write_stats_text(writer, tables.index.id.col()).map_err(Error::Io)
1469 }
1470
1471 fn stat_summary(&self) -> ColumnStatSummary {
1472 self.stats.summary()
1473 }
1474
1475 fn clear_stats(&self) -> Result<()> {
1476 let tables = self.tables.read();
1477 self.stats.clear();
1478 tables.index.write_stats(&self.stats)
1479 }
1480
1481 pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1482 let tables = self.tables.read();
1483 for table in &tables.value {
1484 log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
1485 table.iter_while(log.overlays(), |_, rc, value, compressed| {
1486 let value = if compressed {
1487 if let Ok(value) = self.compression.decompress(&value) {
1488 value
1489 } else {
1490 return false
1491 }
1492 } else {
1493 value
1494 };
1495 let state = ValueIterState { rc, value };
1496 f(state)
1497 })?;
1498 log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
1499 }
1500 Ok(())
1501 }
1502
1503 pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
1504 let action = |state| match state {
1505 IterStateOrCorrupted::Item(item) => Ok(f(item)),
1506 IterStateOrCorrupted::Corrupted(..) =>
1507 Err(Error::Corruption("Missing indexed value".into())),
1508 };
1509 self.iter_index_internal(log, action, 0)
1510 }
1511
1512 fn iter_index_internal(
1513 &self,
1514 log: &Log,
1515 mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
1516 start_chunk: u64,
1517 ) -> Result<()> {
1518 let tables = self.tables.read();
1519 let source = &tables.index;
1520 let total_chunks = source.id.total_chunks();
1521
1522 for c in start_chunk..total_chunks {
1523 let entries = source.entries(c, log.overlays())?;
1524 for (sub_index, entry) in entries.iter().enumerate() {
1525 if entry.is_empty() {
1526 continue
1527 }
1528 let (size_tier, offset) = {
1529 let address = entry.address(source.id.index_bits());
1530 (address.size_tier(), address.offset())
1531 };
1532
1533 let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
1534 let (value, rc, pk, compressed) = match value {
1535 Ok(Some(v)) => v,
1536 Ok(None) => {
1537 let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
1538 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1539 chunk_index: c,
1540 sub_index: sub_index as u32,
1541 value_entry,
1542 entry: *entry,
1543 error: None,
1544 }))? {
1545 return Ok(())
1546 }
1547 continue
1548 },
1549 Err(e) => {
1550 let value_entry = if let Error::Corruption(_) = &e {
1551 tables.value[size_tier as usize].dump_entry(offset).ok()
1552 } else {
1553 None
1554 };
1555 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1556 chunk_index: c,
1557 sub_index: sub_index as u32,
1558 value_entry,
1559 entry: *entry,
1560 error: Some(e),
1561 }))? {
1562 return Ok(())
1563 }
1564 continue
1565 },
1566 };
1567 let mut key = source.recover_key_prefix(c, *entry);
1568 key[6..].copy_from_slice(&pk);
1569 let value = if compressed { self.compression.decompress(&value)? } else { value };
1570 log::debug!(
1571 target: "parity-db",
1572 "{}: Iterating at {}/{}, key={:?}, pk={:?}",
1573 source.id,
1574 c,
1575 source.id.total_chunks(),
1576 hex(&key),
1577 hex(&pk),
1578 );
1579 let state = IterStateOrCorrupted::Item(IterState {
1580 item_index: c,
1581 total_items: total_chunks,
1582 key,
1583 rc,
1584 value,
1585 });
1586 if !f(state)? {
1587 return Ok(())
1588 }
1589 }
1590 }
1591 Ok(())
1592 }
1593
1594 fn iter_index_fast(
1595 &self,
1596 log: &Log,
1597 mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
1598 _start_chunk: u64,
1599 ) -> Result<()> {
1600 let tables = self.tables.read();
1601 let index = &tables.index;
1602
1603 let entries = index.sorted_entries()?;
1604 let total = entries.len();
1605 for (sub_index, entry) in entries.into_iter().enumerate() {
1606 let (size_tier, offset) = {
1607 let address = entry.address(index.id.index_bits());
1608 (address.size_tier(), address.offset())
1609 };
1610
1611 let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
1612 let (value, rc, pk, compressed) = match value {
1613 Ok(Some(v)) => v,
1614 Ok(None) => {
1615 let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
1616 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1617 chunk_index: sub_index as u64,
1618 sub_index: sub_index as u32,
1619 value_entry,
1620 entry,
1621 error: None,
1622 }))? {
1623 return Ok(())
1624 }
1625 continue
1626 },
1627 Err(e) => {
1628 let value_entry = if let Error::Corruption(_) = &e {
1629 tables.value[size_tier as usize].dump_entry(offset).ok()
1630 } else {
1631 None
1632 };
1633 if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
1634 chunk_index: sub_index as u64,
1635 sub_index: sub_index as u32,
1636 value_entry,
1637 entry,
1638 error: Some(e),
1639 }))? {
1640 return Ok(())
1641 }
1642 continue
1643 },
1644 };
1645 let value = if compressed { self.compression.decompress(&value)? } else { value };
1646 log::debug!(
1647 target: "parity-db",
1648 "{}: Iterating at {}/{}, pk={:?}",
1649 index.id,
1650 sub_index,
1651 total,
1652 hex(&pk),
1653 );
1654 let state = IterStateOrCorrupted::Item(IterState {
1655 item_index: sub_index as u64,
1656 total_items: total as u64,
1657 key: Default::default(),
1658 rc,
1659 value,
1660 });
1661 if !f(state)? {
1662 return Ok(())
1663 }
1664 }
1665 Ok(())
1666 }
1667
1668 fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
1669 let start_chunk = check_params.from.unwrap_or(0);
1670 let end_chunk = check_params.bound;
1671
1672 let step = if check_params.fast { 1_000_000 } else { 10_000 };
1673 let (denom, suffix) = if check_params.fast { (1_000_000, "m") } else { (1_000, "k") };
1674 let mut next_info_at = step;
1675 let start_time = std::time::Instant::now();
1676 let index_id = self.tables.read().index.id;
1677 log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
1678 let iter_fn =
1679 if check_params.fast { Self::iter_index_fast } else { Self::iter_index_internal };
1680 iter_fn(
1681 self,
1682 log,
1683 |state| match state {
1684 IterStateOrCorrupted::Item(IterState {
1685 item_index,
1686 total_items,
1687 key,
1688 rc,
1689 value,
1690 }) => {
1691 if Some(item_index) == end_chunk {
1692 return Ok(false)
1693 }
1694 if item_index >= next_info_at {
1695 next_info_at += step;
1696 log::info!(target: "parity-db", "Validated {}{} / {}{} entries", item_index / denom, suffix, total_items / denom, suffix);
1697 }
1698
1699 match check_params.display {
1700 CheckDisplay::Full => {
1701 log::info!(
1702 "Index key: {:x?}\n \
1703 \tRc: {}",
1704 &key,
1705 rc,
1706 );
1707 log::info!("Value: {}", hex(&value));
1708 },
1709 CheckDisplay::Short(t) => {
1710 log::info!("Index key: {:x?}", &key);
1711 log::info!("Rc: {}, Value len: {}", rc, value.len());
1712 log::info!(
1713 "Value: {}",
1714 hex(&value[..std::cmp::min(t as usize, value.len())])
1715 );
1716 },
1717 CheckDisplay::None => (),
1718 }
1719 Ok(true)
1720 },
1721 IterStateOrCorrupted::Corrupted(c) => {
1722 log::error!(
1723 "Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
1724 c.chunk_index,
1725 c.sub_index,
1726 c.entry.address(index_id.index_bits()),
1727 hex(&c.entry.as_u64().to_le_bytes()),
1728 c.error,
1729 );
1730 if let Some(v) = c.value_entry {
1731 log::error!("Value entry: {:?}", hex(v.as_slice()));
1732 }
1733 Ok(true)
1734 },
1735 },
1736 start_chunk,
1737 )?;
1738
1739 log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
1740 if check_params.validate_free_refs {
1741 log::info!(target: "parity-db", "Validating free refs");
1742 let tables = self.tables.read();
1743 let mut total = 0;
1744 for t in &tables.value {
1745 match t.check_free_refs() {
1746 Err(e) => log::warn!(target: "parity-db", "{}: Error: {:?}", t.id, e),
1747 Ok(n) => total += n,
1748 }
1749 }
1750 log::info!(target: "parity-db", "{} Total free refs", total);
1751 }
1752 Ok(())
1753 }
1754
1755 pub fn reindex(&self, log: &Log) -> Result<ReindexBatch> {
1756 let tables = self.tables.read();
1757 let reindex = self.reindex.read();
1758 let mut plan = Vec::new();
1759 let mut drop_index = None;
1760 let mut ref_count_plan = Vec::new();
1761 let mut ref_count_source = None;
1762 let mut drop_ref_count = None;
1763 if let Some(source) = reindex.queue.front() {
1764 let progress = reindex.progress.load(Ordering::Relaxed);
1765 match source {
1766 ReindexEntry::Index(source) => {
1767 if progress != source.id.total_chunks() {
1768 let mut source_index = progress;
1769 if source_index % 500 == 0 {
1770 log::debug!(target: "parity-db", "{}: Reindexing at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1771 }
1772 log::debug!(target: "parity-db", "{}: Continue reindex at {}/{}", tables.index.id, source_index, source.id.total_chunks());
1773 while source_index < source.id.total_chunks() &&
1774 plan.len() < MAX_REINDEX_BATCH
1775 {
1776 log::trace!(target: "parity-db", "{}: Reindexing {}", source.id, source_index);
1777 let entries = source.entries(source_index, log.overlays())?;
1778 for entry in entries.iter() {
1779 if entry.is_empty() {
1780 continue
1781 }
1782 let key = source.recover_key_prefix(source_index, *entry);
1784 plan.push((key, entry.address(source.id.index_bits())))
1785 }
1786 source_index += 1;
1787 }
1788 log::trace!(target: "parity-db", "{}: End reindex batch {} ({})", tables.index.id, source_index, plan.len());
1789 reindex.progress.store(source_index, Ordering::Relaxed);
1790 if source_index == source.id.total_chunks() {
1791 log::info!(target: "parity-db", "Completed reindex {} into {}", source.id, tables.index.id);
1792 drop_index = Some(source.id);
1793 }
1794 }
1795 },
1796 ReindexEntry::RefCount(source) =>
1797 if progress != source.id.total_chunks() {
1798 let mut source_index = progress;
1799 if source_index % 500 == 0 {
1800 log::debug!(target: "parity-db", "{}: Reindexing ref count at {}/{}", tables.get_ref_count().id, source_index, source.id.total_chunks());
1801 }
1802 ref_count_source = Some(source.id);
1803 log::debug!(target: "parity-db", "{}: Continue reindex ref count at {}/{}", tables.get_ref_count().id, source_index, source.id.total_chunks());
1804 while source_index < source.id.total_chunks() &&
1805 ref_count_plan.len() < MAX_REINDEX_BATCH
1806 {
1807 log::trace!(target: "parity-db", "{}: Reindexing ref count {}", source.id, source_index);
1808 let entries = source.entries(source_index, log.overlays())?;
1809 for entry in entries.iter() {
1810 if entry.is_empty() {
1811 continue
1812 }
1813 ref_count_plan.push((entry.address(), entry.ref_count()));
1814 }
1815 source_index += 1;
1816 }
1817 log::trace!(target: "parity-db", "{}: End reindex ref count batch {} ({})", tables.get_ref_count().id, source_index, ref_count_plan.len());
1818 reindex.progress.store(source_index, Ordering::Relaxed);
1819 if source_index == source.id.total_chunks() {
1820 log::info!(target: "parity-db", "Completed reindex ref count {} into {}", source.id, tables.get_ref_count().id);
1821 drop_ref_count = Some(source.id);
1822 }
1823 },
1824 }
1825 }
1826 Ok(ReindexBatch {
1827 drop_index,
1828 batch: plan,
1829 drop_ref_count,
1830 ref_count_batch: ref_count_plan,
1831 ref_count_batch_source: ref_count_source,
1832 })
1833 }
1834
1835 pub fn drop_index(&self, id: IndexTableId) -> Result<()> {
1836 log::debug!(target: "parity-db", "Dropping {}", id);
1837 let mut reindex = self.reindex.write();
1838 if reindex.queue.front_mut().map_or(false, |e| {
1839 if let ReindexEntry::Index(t) = e {
1840 t.id == id
1841 } else {
1842 false
1843 }
1844 }) {
1845 reindex.progress.store(0, Ordering::Relaxed);
1846 let table = reindex.queue.pop_front().unwrap();
1847 let table = if let ReindexEntry::Index(table) = table {
1848 table
1849 } else {
1850 return Err(Error::Corruption(format!("Incorrect reindex type")))
1851 };
1852 table.drop_file()?;
1853 } else {
1854 log::warn!(target: "parity-db", "Dropping invalid index {}", id);
1855 return Ok(())
1856 }
1857 log::debug!(target: "parity-db", "Dropped {}", id);
1858 Ok(())
1859 }
1860
1861 pub fn drop_ref_count(&self, id: RefCountTableId) -> Result<()> {
1862 log::debug!(target: "parity-db", "Dropping ref count {}", id);
1863 let mut reindex = self.reindex.write();
1864 if reindex.queue.front_mut().map_or(false, |e| {
1865 if let ReindexEntry::RefCount(t) = e {
1866 t.id == id
1867 } else {
1868 false
1869 }
1870 }) {
1871 reindex.progress.store(0, Ordering::Relaxed);
1872 let table = reindex.queue.pop_front().unwrap();
1873 let table = if let ReindexEntry::RefCount(table) = table {
1874 table
1875 } else {
1876 return Err(Error::Corruption(format!("Incorrect reindex type")))
1877 };
1878 table.drop_file()?;
1879 } else {
1880 log::warn!(target: "parity-db", "Dropping invalid ref count {}", id);
1881 return Ok(())
1882 }
1883 log::debug!(target: "parity-db", "Dropped ref count {}", id);
1884 Ok(())
1885 }
1886
1887 pub fn get_num_value_entries(&self) -> Result<u64> {
1888 let tables = self.tables.read();
1889 let mut num_entries = 0;
1890 for value_table in &tables.value {
1891 num_entries += value_table.get_num_entries()?;
1892 }
1893 Ok(num_entries)
1894 }
1895}
1896
1897impl Column {
1898 pub fn write_existing_value_plan<K, V: AsRef<[u8]>>(
1899 key: &TableKey,
1900 tables: TablesRef,
1901 address: Address,
1902 change: &Operation<K, V>,
1903 log: &mut LogWriter,
1904 stats: Option<&ColumnStats>,
1905 ref_counted: bool,
1906 ) -> Result<(Option<PlanOutcome>, Option<Address>)> {
1907 let tier = address.size_tier() as usize;
1908
1909 let fetch_size = || -> Result<(u32, u32)> {
1910 let (cur_size, compressed) =
1911 tables.tables[tier].size(key, address.offset(), log)?.unwrap_or((0, false));
1912 Ok(if compressed {
1913 let compressed = tables.tables[tier]
1915 .get(key, address.offset(), log)?
1916 .expect("Same query as size")
1917 .0;
1918 let uncompressed = tables.compression.decompress(compressed.as_slice())?;
1919
1920 (cur_size, uncompressed.len() as u32)
1921 } else {
1922 (cur_size, cur_size)
1923 })
1924 };
1925
1926 match change {
1927 Operation::Reference(_) =>
1928 if ref_counted {
1929 log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1930 tables.tables[tier].write_inc_ref(address.offset(), log)?;
1931 if let Some(stats) = stats {
1932 stats.reference_increase();
1933 }
1934 Ok((Some(PlanOutcome::Written), None))
1935 } else {
1936 Ok((Some(PlanOutcome::Skipped), None))
1937 },
1938 Operation::Set(_, val) => {
1939 if ref_counted {
1940 log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
1941 tables.tables[tier].write_inc_ref(address.offset(), log)?;
1942 return Ok((Some(PlanOutcome::Written), None))
1943 }
1944 if tables.preimage {
1945 return Ok((Some(PlanOutcome::Skipped), None))
1947 }
1948
1949 let (cval, target_tier) =
1950 Column::compress(tables.compression, key, val.as_ref(), tables.tables);
1951 let (cval, compressed) = cval
1952 .as_ref()
1953 .map(|cval| (cval.as_slice(), true))
1954 .unwrap_or((val.as_ref(), false));
1955
1956 if let Some(stats) = stats {
1957 let (cur_size, uncompressed) = fetch_size()?;
1958 stats.replace_val(
1959 cur_size,
1960 uncompressed,
1961 val.as_ref().len() as u32,
1962 cval.len() as u32,
1963 );
1964 }
1965 if tier == target_tier {
1966 log::trace!(target: "parity-db", "{}: Replacing {}", tables.col, key);
1967 tables.tables[target_tier].write_replace_plan(
1968 address.offset(),
1969 key,
1970 cval,
1971 log,
1972 compressed,
1973 )?;
1974 Ok((Some(PlanOutcome::Written), None))
1975 } else {
1976 log::trace!(target: "parity-db", "{}: Replacing in a new table {}", tables.col, key);
1977 tables.tables[tier].write_remove_plan(address.offset(), log)?;
1978 let new_offset =
1979 tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
1980 let new_address = Address::new(new_offset, target_tier as u8);
1981 Ok((None, Some(new_address)))
1982 }
1983 },
1984 Operation::Dereference(_) => {
1985 let cur_size = if stats.is_some() { Some(fetch_size()?) } else { None };
1987 let remove = if ref_counted {
1988 let removed = !tables.tables[tier].write_dec_ref(address.offset(), log)?;
1989 log::trace!(target: "parity-db", "{}: Dereference {}, deleted={}", tables.col, key, removed);
1990 removed
1991 } else {
1992 log::trace!(target: "parity-db", "{}: Deleting {}", tables.col, key);
1993 tables.tables[tier].write_remove_plan(address.offset(), log)?;
1994 true
1995 };
1996 if remove {
1997 if let Some((compressed_size, uncompressed_size)) = cur_size {
1998 if let Some(stats) = stats {
1999 stats.remove_val(uncompressed_size, compressed_size);
2000 }
2001 }
2002 Ok((None, None))
2003 } else {
2004 Ok((Some(PlanOutcome::Written), None))
2005 }
2006 },
2007 Operation::InsertTree(..) |
2008 Operation::ReferenceTree(..) |
2009 Operation::DereferenceTree(..) =>
2010 Err(Error::InvalidInput(format!("Invalid operation for column {}", tables.col))),
2011 }
2012 }
2013
2014 pub fn write_new_value_plan(
2015 key: &TableKey,
2016 tables: TablesRef,
2017 val: &[u8],
2018 log: &mut LogWriter,
2019 stats: Option<&ColumnStats>,
2020 ) -> Result<Address> {
2021 let (cval, target_tier) = Column::compress(tables.compression, key, val, tables.tables);
2022 let (cval, compressed) =
2023 cval.as_ref().map(|cval| (cval.as_slice(), true)).unwrap_or((val, false));
2024
2025 log::trace!(target: "parity-db", "{}: Inserting new {}, size = {}", tables.col, key, cval.len());
2026 let offset = tables.tables[target_tier].write_insert_plan(key, cval, log, compressed)?;
2027 let address = Address::new(offset, target_tier as u8);
2028
2029 if let Some(stats) = stats {
2030 stats.insert_val(val.len() as u32, cval.len() as u32);
2031 }
2032 Ok(address)
2033 }
2034
2035 pub fn complete_plan(&self, log: &mut LogWriter) -> Result<()> {
2036 match self {
2037 Column::Hash(column) => column.complete_plan(log),
2038 Column::Tree(column) => column.complete_plan(log),
2039 }
2040 }
2041
2042 pub fn validate_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
2043 match self {
2044 Column::Hash(column) => column.validate_plan(action, log),
2045 Column::Tree(column) => column.validate_plan(action, log),
2046 }
2047 }
2048
2049 pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
2050 match self {
2051 Column::Hash(column) => column.enact_plan(action, log),
2052 Column::Tree(column) => column.enact_plan(action, log),
2053 }
2054 }
2055
2056 pub fn init_table_data(&mut self) -> Result<()> {
2057 match self {
2058 Column::Hash(column) => column.init_table_data(),
2059 Column::Tree(_column) => Ok(()),
2060 }
2061 }
2062
2063 pub fn flush(&self) -> Result<()> {
2064 match self {
2065 Column::Hash(column) => column.flush(),
2066 Column::Tree(column) => column.flush(),
2067 }
2068 }
2069
2070 pub fn refresh_metadata(&self) -> Result<()> {
2071 match self {
2072 Column::Hash(column) => column.refresh_metadata(),
2073 Column::Tree(column) => column.refresh_metadata(),
2074 }
2075 }
2076
2077 pub fn write_stats_text(&self, writer: &mut impl std::io::Write) -> Result<()> {
2078 match self {
2079 Column::Hash(column) => column.write_stats_text(writer),
2080 Column::Tree(_column) => Ok(()),
2081 }
2082 }
2083
2084 pub fn clear_stats(&self) -> Result<()> {
2085 match self {
2086 Column::Hash(column) => column.clear_stats(),
2087 Column::Tree(_column) => Ok(()),
2088 }
2089 }
2090
2091 pub fn stats(&self) -> Option<ColumnStatSummary> {
2092 match self {
2093 Column::Hash(column) => Some(column.stat_summary()),
2094 Column::Tree(_column) => None,
2095 }
2096 }
2097
2098 pub fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
2099 match self {
2100 Column::Hash(column) => column.dump(log, check_params, col),
2101 Column::Tree(_column) => Ok(()),
2102 }
2103 }
2104
2105 #[cfg(test)]
2106 #[cfg(feature = "instrumentation")]
2107 pub fn index_bits(&self) -> Option<u8> {
2108 match self {
2109 Column::Hash(column) => Some(column.tables.read().index.id.index_bits()),
2110 Column::Tree(_column) => None,
2111 }
2112 }
2113}