parity_db/
db.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4//! The database objects is split into `Db` and `DbInner`.
5//! `Db` creates shared `DbInner` instance and manages background
6//! worker threads that all use the inner object.
7//!
8//! There are 4 worker threads:
9//! log_worker: Processes commit queue and reindexing. For each commit
10//! in the queue, log worker creates a write-ahead record using `Log`.
11//! Additionally, if there are active reindexing, it creates log records
12//! for batches of relocated index entries.
13//! flush_worker: Flushes log records to disk by calling `fsync` on the
14//! log files.
15//! commit_worker: Reads flushed log records and applies operations to the
16//! index and value tables.
17//! cleanup_worker: Flush tables by calling `fsync`, and cleanup log.
18//! Each background worker is signalled with a conditional variable once
19//! there is some work to be done.
20
21use crate::{
22	btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23	column::{
24		hash_key, unpack_node_children, unpack_node_data, ColId, Column, HashColumn, IterState,
25		ReindexBatch, ValueIterState,
26	},
27	error::{try_io, Error, Result},
28	hash::IdentityBuildHasher,
29	index::{Address, PlanOutcome},
30	log::{Log, LogAction},
31	multitree::{Children, NewNode, NodeAddress},
32	options::{Options, CURRENT_VERSION},
33	parking_lot::{
34		Condvar, Mutex, MutexGuard, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard,
35	},
36	stats::StatSummary,
37	ColumnOptions, Key,
38};
39#[cfg(feature = "bytes")]
40use bytes::Bytes;
41use fs2::FileExt;
42use std::{
43	borrow::Borrow,
44	collections::{BTreeMap, HashMap, HashSet, VecDeque},
45	ops::Bound,
46	sync::{
47		atomic::{AtomicBool, AtomicU64, Ordering},
48		Arc, Weak,
49	},
50	thread,
51};
52
53// Max size of commit queue. (Keys + Values). If the queue is
54// full `commit` will block.
55// These are in memory, so we use usize
56const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
57// Max size of log overlay. If the overlay is full, processing
58// of commit queue is blocked.
59const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
60// Minimum size of log file before it is considered full.
61const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
62// Number of log files to keep after flush when sync mode is disabled. Give the database some chance
63// to recover in case of crash.
64const KEEP_LOGS: usize = 16;
65// Hard limit on the number of log files in sync mode. The number of log may grow while existing
66// logs are waiting on fsync. Commits will be throttled if total number of log files exceeds this
67// number.
68const MAX_LOG_FILES: usize = 4;
69
70/// Value is just a vector of bytes. Value sizes up to 4Gb are allowed.
71pub type Value = Vec<u8>;
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
74pub enum RcValue {
75	#[cfg(feature = "arc")]
76	Arc(Arc<Value>),
77	#[cfg(feature = "bytes")]
78	Bytes(Bytes),
79}
80
81impl AsRef<[u8]> for RcValue {
82	fn as_ref(&self) -> &[u8] {
83		match self {
84			#[cfg(feature = "arc")]
85			Self::Arc(arc) => arc.as_ref(),
86			#[cfg(feature = "bytes")]
87			Self::Bytes(bytes) => bytes.as_ref(),
88		}
89	}
90}
91
92impl Borrow<[u8]> for RcValue {
93	fn borrow(&self) -> &[u8] {
94		self.as_ref()
95	}
96}
97
98#[cfg(feature = "arc")]
99impl From<Value> for RcValue {
100	fn from(value: Value) -> Self {
101		Self::Arc(value.into())
102	}
103}
104
105#[cfg(not(feature = "arc"))]
106impl From<Value> for RcValue {
107	fn from(value: Value) -> Self {
108		Self::Bytes(value.into())
109	}
110}
111
112#[cfg(feature = "arc")]
113impl From<Arc<Value>> for RcValue {
114	fn from(value: Arc<Value>) -> Self {
115		Self::Arc(value)
116	}
117}
118
119#[cfg(feature = "bytes")]
120impl From<Bytes> for RcValue {
121	fn from(value: Bytes) -> Self {
122		Self::Bytes(value)
123	}
124}
125
126#[cfg(test)]
127impl<const N: usize> TryFrom<RcValue> for [u8; N] {
128	type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
129
130	fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
131		value.as_ref().to_vec().try_into()
132	}
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
136pub struct RcKey(Arc<Vec<u8>>);
137
138impl AsRef<[u8]> for RcKey {
139	fn as_ref(&self) -> &[u8] {
140		self.0.as_ref()
141	}
142}
143
144impl Borrow<[u8]> for RcKey {
145	fn borrow(&self) -> &[u8] {
146		self.as_ref()
147	}
148}
149
150impl From<Vec<u8>> for RcKey {
151	fn from(key: Vec<u8>) -> Self {
152		Self(key.into())
153	}
154}
155
156// Commit data passed to `commit`
157#[derive(Debug, Default)]
158struct Commit {
159	// Commit ID. This is not the same as log record id, as some records
160	// are originated within the DB. E.g. reindex.
161	id: u64,
162	// Size of user data pending insertion (keys + values) or
163	// removal (keys)
164	bytes: usize,
165	// Operations.
166	changeset: CommitChangeSet,
167}
168
169// Pending commits. This may not grow beyond `MAX_COMMIT_QUEUE_BYTES` bytes.
170#[derive(Debug, Default)]
171struct CommitQueue {
172	// Log record.
173	record_id: u64,
174	// Total size of all commits in the queue.
175	bytes: usize,
176	// FIFO queue.
177	commits: VecDeque<Commit>,
178}
179
180#[derive(Debug)]
181struct Trees {
182	readers: HashMap<Key, Weak<RwLock<Box<dyn TreeReader + Send + Sync>>>, IdentityBuildHasher>,
183	/// Number of queued dereferences for each tree
184	to_dereference: HashMap<Key, usize>,
185}
186
187#[derive(Debug)]
188struct DbInner {
189	columns: Vec<Column>,
190	options: Options,
191	shutdown: AtomicBool,
192	log: Log,
193	commit_queue: Mutex<CommitQueue>,
194	commit_queue_full_cv: Condvar,
195	log_worker_wait: WaitCondvar<bool>,
196	commit_worker_wait: Arc<WaitCondvar<bool>>,
197	// Overlay of most recent values in the commit queue.
198	commit_overlay: RwLock<Vec<CommitOverlay>>,
199	trees: RwLock<HashMap<ColId, Trees>>,
200	// This may underflow occasionally, but is bound for 0 eventually.
201	log_queue_wait: WaitCondvar<i64>,
202	flush_worker_wait: Arc<WaitCondvar<bool>>,
203	cleanup_worker_wait: WaitCondvar<bool>,
204	cleanup_queue_wait: WaitCondvar<bool>,
205	iteration_lock: Mutex<()>,
206	last_enacted: AtomicU64,
207	next_reindex: AtomicU64,
208	bg_err: Mutex<Option<Arc<Error>>>,
209	db_version: u32,
210	lock_file: std::fs::File,
211}
212
213#[derive(Debug)]
214struct WaitCondvar<S> {
215	cv: Condvar,
216	work: Mutex<S>,
217}
218
219impl<S: Default> WaitCondvar<S> {
220	fn new() -> Self {
221		WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
222	}
223}
224
225impl WaitCondvar<bool> {
226	fn signal(&self) {
227		let mut work = self.work.lock();
228		*work = true;
229		self.cv.notify_one();
230	}
231
232	pub fn wait(&self) {
233		let mut work = self.work.lock();
234		while !*work {
235			self.cv.wait(&mut work)
236		}
237		*work = false;
238	}
239}
240
241impl DbInner {
242	fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
243		if opening_mode == OpeningMode::Create {
244			try_io!(std::fs::create_dir_all(&options.path));
245		} else if !options.path.is_dir() {
246			return Err(Error::DatabaseNotFound)
247		}
248
249		let mut lock_path: std::path::PathBuf = options.path.clone();
250		lock_path.push("lock");
251		let lock_file = try_io!(std::fs::OpenOptions::new()
252			.create(true)
253			.read(true)
254			.write(true)
255			.open(lock_path.as_path()));
256		lock_file.try_lock_exclusive().map_err(Error::Locked)?;
257
258		let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
259		let mut columns = Vec::with_capacity(metadata.columns.len());
260		let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
261		let log = Log::open(options)?;
262		let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
263		for c in 0..metadata.columns.len() {
264			let column = Column::open(c as ColId, options, &metadata)?;
265			commit_overlay.push(CommitOverlay::new());
266			columns.push(column);
267		}
268		log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
269		let mut options = options.clone();
270		if options.salt.is_none() {
271			options.salt = Some(metadata.salt);
272		}
273
274		Ok(DbInner {
275			columns,
276			options,
277			shutdown: AtomicBool::new(false),
278			log,
279			commit_queue: Mutex::new(Default::default()),
280			commit_queue_full_cv: Condvar::new(),
281			log_worker_wait: WaitCondvar::new(),
282			commit_worker_wait: Arc::new(WaitCondvar::new()),
283			commit_overlay: RwLock::new(commit_overlay),
284			trees: RwLock::new(Default::default()),
285			log_queue_wait: WaitCondvar::new(),
286			flush_worker_wait: Arc::new(WaitCondvar::new()),
287			cleanup_worker_wait: WaitCondvar::new(),
288			cleanup_queue_wait: WaitCondvar::new(),
289			iteration_lock: Mutex::new(()),
290			next_reindex: AtomicU64::new(1),
291			last_enacted: AtomicU64::new(last_enacted),
292			bg_err: Mutex::new(None),
293			db_version: metadata.version,
294			lock_file,
295		})
296	}
297
298	fn init_table_data(&mut self) -> Result<()> {
299		for column in &mut self.columns {
300			column.init_table_data()?;
301		}
302		Ok(())
303	}
304
305	fn get(&self, col: ColId, key: &[u8], external_call: bool) -> Result<Option<Value>> {
306		if self.options.columns[col as usize].multitree && external_call {
307			return Err(Error::InvalidConfiguration(
308				"get not supported for multitree columns.".to_string(),
309			))
310		}
311		match &self.columns[col as usize] {
312			Column::Hash(column) => {
313				let key = column.hash_key(key);
314				let overlay = self.commit_overlay.read();
315				// Check commit overlay first
316				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
317					return Ok(v.map(|i| i.as_ref().to_vec()))
318				}
319				std::mem::drop(overlay);
320				// Go into tables and log overlay.
321				let log = self.log.overlays();
322				Ok(column.get(&key, log)?.map(|(v, _rc)| v))
323			},
324			Column::Tree(column) => {
325				let overlay = self.commit_overlay.read();
326				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
327					return Ok(l.map(|i| i.as_ref().to_vec()))
328				}
329				std::mem::drop(overlay);
330				// We lock log, if btree structure changed while reading that would be an issue.
331				let log = self.log.overlays().read();
332				column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
333			},
334		}
335	}
336
337	fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
338		if self.options.columns[col as usize].multitree {
339			return Err(Error::InvalidConfiguration(
340				"get_size not supported for multitree columns.".to_string(),
341			))
342		}
343		match &self.columns[col as usize] {
344			Column::Hash(column) => {
345				let key = column.hash_key(key);
346				let overlay = self.commit_overlay.read();
347				// Check commit overlay first
348				if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
349					return Ok(l)
350				}
351				// Go into tables and log overlay.
352				let log = self.log.overlays();
353				column.get_size(&key, log)
354			},
355			Column::Tree(column) => {
356				let overlay = self.commit_overlay.read();
357				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
358					return Ok(l.map(|v| v.as_ref().len() as u32))
359				}
360				let log = self.log.overlays().read();
361				let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
362				Ok(l.map(|v| v.len() as u32))
363			},
364		}
365	}
366
367	fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
368		if !self.options.columns[col as usize].multitree {
369			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
370		}
371		if !self.options.columns[col as usize].append_only &&
372			!self.options.columns[col as usize].allow_direct_node_access
373		{
374			return Err(Error::InvalidConfiguration(
375				"get_root can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
376			))
377		}
378		let value = self.get(col, key, false)?;
379		if let Some(data) = value {
380			return Ok(Some(unpack_node_data(data)?))
381		}
382		Ok(None)
383	}
384
385	fn get_node(
386		&self,
387		col: ColId,
388		node_address: NodeAddress,
389		external_call: bool,
390	) -> Result<Option<(Vec<u8>, Children)>> {
391		if !self.options.columns[col as usize].multitree {
392			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
393		}
394		if !self.options.columns[col as usize].append_only &&
395			!self.options.columns[col as usize].allow_direct_node_access &&
396			external_call
397		{
398			return Err(Error::InvalidConfiguration(
399				"get_node can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
400			))
401		}
402		match &self.columns[col as usize] {
403			Column::Hash(column) => {
404				let overlay = self.commit_overlay.read();
405				// Check commit overlay first
406				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
407				{
408					return Ok(Some(unpack_node_data(v.as_ref().to_vec())?))
409				}
410				let log = self.log.overlays();
411				let value = column.get_value(Address::from_u64(node_address), log)?;
412				if let Some(data) = value {
413					return Ok(Some(unpack_node_data(data)?))
414				}
415				Ok(None)
416			},
417			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
418		}
419	}
420
421	fn get_node_children(
422		&self,
423		col: ColId,
424		node_address: NodeAddress,
425		external_call: bool,
426	) -> Result<Option<Children>> {
427		if !self.options.columns[col as usize].multitree {
428			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
429		}
430		if !self.options.columns[col as usize].append_only &&
431			!self.options.columns[col as usize].allow_direct_node_access &&
432			external_call
433		{
434			return Err(Error::InvalidConfiguration(
435				"get_node_children can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
436			))
437		}
438		match &self.columns[col as usize] {
439			Column::Hash(column) => {
440				let overlay = self.commit_overlay.read();
441				// Check commit overlay first
442				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
443				{
444					return Ok(Some(unpack_node_children(v.as_ref())?))
445				}
446				let log = self.log.overlays();
447				let value = column.get_value(Address::from_u64(node_address), log)?;
448				if let Some(data) = value {
449					return Ok(Some(unpack_node_children(&data)?))
450				}
451				Ok(None)
452			},
453			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
454		}
455	}
456
457	fn get_tree(
458		&self,
459		db: &Arc<DbInner>,
460		col: ColId,
461		key: &[u8],
462		check_existence: bool,
463	) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
464		if !self.options.columns[col as usize].multitree {
465			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
466		}
467		match &self.columns[col as usize] {
468			Column::Hash(column) => {
469				// Check if the tree actually exists. We can't return the data from this function as
470				// TreeReader is not locked. That is done by the client.
471				if check_existence {
472					let root = self.get(col, key, false).unwrap();
473					if root.is_none() {
474						return Ok(None)
475					}
476				}
477
478				let hash_key = column.hash_key(key);
479
480				let trees = self.trees.upgradable_read();
481
482				if let Some(column_trees) = trees.get(&col) {
483					if let Some(reader) = column_trees.readers.get(&hash_key) {
484						let reader = reader.upgrade();
485						if let Some(reader) = reader {
486							return Ok(Some(reader))
487						}
488					}
489				}
490
491				let mut trees = RwLockUpgradableReadGuard::upgrade(trees);
492
493				let column_trees = trees.entry(col).or_insert_with(|| Trees {
494					readers: Default::default(),
495					to_dereference: Default::default(),
496				});
497
498				let reader: Box<dyn TreeReader + Send + Sync> =
499					Box::new(DbTreeReader { db: db.clone(), col, key: hash_key });
500				let reader = Arc::new(RwLock::new(reader));
501
502				column_trees.readers.insert(hash_key, Arc::downgrade(&reader));
503
504				Ok(Some(reader))
505			},
506			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
507		}
508	}
509
510	fn btree_iter(&self, col: ColId) -> Result<BTreeIterator> {
511		match &self.columns[col as usize] {
512			Column::Hash(_column) =>
513				Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
514			Column::Tree(column) => {
515				let log = self.log.overlays();
516				BTreeIterator::new(column, col, log, &self.commit_overlay)
517			},
518		}
519	}
520
521	// Commit simply adds the data to the queue and to the overlay and
522	// exits as early as possible.
523	fn commit<I, K>(&self, tx: I) -> Result<()>
524	where
525		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
526		K: AsRef<[u8]>,
527	{
528		self.commit_changes(tx.into_iter().map(|(c, k, v)| {
529			(
530				c,
531				match v {
532					Some(v) => Operation::Set(k.as_ref().to_vec(), v),
533					None => Operation::Dereference(k.as_ref().to_vec()),
534				},
535			)
536		}))
537	}
538
539	fn commit_changes<I, V>(&self, tx: I) -> Result<()>
540	where
541		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, V>)>,
542		V: Into<RcValue>,
543	{
544		let mut commit: CommitChangeSet = Default::default();
545		for (col, change) in tx.into_iter() {
546			if self.options.columns[col as usize].btree_index {
547				commit
548					.btree_indexed
549					.entry(col)
550					.or_insert_with(|| BTreeChangeSet::new(col))
551					.push(change)?
552			} else if self.options.columns[col as usize].multitree {
553				match &self.columns[col as usize] {
554					Column::Hash(column) =>
555						match change {
556							Operation::Set(..) |
557							Operation::Reference(..) |
558							Operation::Dereference(..) =>
559								return Err(Error::InvalidConfiguration(
560									"Invalid operation for multitree column".to_string(),
561								)),
562							Operation::InsertTree(..) => {
563								let (root_data, node_values) = column.claim_tree_values(&change)?;
564
565								let trees = self.trees.read();
566								if let Some(column_trees) = trees.get(&col) {
567									for (hash, count) in &column_trees.to_dereference {
568										assert!(*count > 0);
569
570										// Check if TreeReader is active for this tree
571										let mut tree_active = false;
572										if let Some(reader) = column_trees.readers.get(hash) {
573											let reader = reader.upgrade();
574											if let Some(reader) = reader {
575												if reader.is_locked() {
576													tree_active = true;
577												}
578											}
579										}
580										if tree_active {
581											commit
582												.indexed
583												.entry(col)
584												.or_insert_with(|| IndexedChangeSet::new(col))
585												.used_trees
586												.insert(*hash);
587										}
588									}
589								}
590								drop(trees);
591
592								let root_operation = Operation::Set(change.key(), root_data);
593								commit
594									.indexed
595									.entry(col)
596									.or_insert_with(|| IndexedChangeSet::new(col))
597									.push(root_operation, &self.options, self.db_version)?;
598
599								for node_change in node_values {
600									commit
601										.indexed
602										.entry(col)
603										.or_insert_with(|| IndexedChangeSet::new(col))
604										.push_node_change(node_change);
605								}
606							},
607							Operation::ReferenceTree(..) => {
608								if !self.options.columns[col as usize].append_only {
609									let root_operation =
610										Operation::<&_, V>::Reference(change.key());
611									commit
612										.indexed
613										.entry(col)
614										.or_insert_with(|| IndexedChangeSet::new(col))
615										.push(root_operation, &self.options, self.db_version)?;
616								}
617							},
618							Operation::DereferenceTree(key) => {
619								if self.options.columns[col as usize].append_only {
620									return Err(Error::InvalidConfiguration("Attempting to dereference a tree from an append_only column.".to_string()))
621								}
622								let value = self.get(col, &key, false)?;
623								if let Some(data) = value {
624									let root_data = unpack_node_data(data)?;
625									let children = root_data.1;
626									let salt = self.options.salt.unwrap_or_default();
627									let hash = hash_key(
628										&key,
629										&salt,
630										self.options.columns[col as usize].uniform,
631										self.db_version,
632									);
633
634									let mut trees = self.trees.write();
635
636									let column_trees = trees.entry(col).or_insert_with(|| Trees {
637										readers: Default::default(),
638										to_dereference: Default::default(),
639									});
640									let count =
641										column_trees.to_dereference.get(&hash).unwrap_or(&0) + 1;
642									column_trees.to_dereference.insert(hash, count);
643
644									drop(trees);
645
646									commit.check_for_deferral = true;
647
648									let node_change =
649										NodeChange::DereferenceChildren(key, hash, children);
650
651									commit
652										.indexed
653										.entry(col)
654										.or_insert_with(|| IndexedChangeSet::new(col))
655										.push_node_change(node_change);
656								} else {
657									return Err(Error::InvalidConfiguration(
658										"No entry for tree root".to_string(),
659									))
660								}
661							},
662						},
663					Column::Tree(_) =>
664						return Err(Error::InvalidConfiguration("Not a HashColumn".to_string())),
665				}
666			} else {
667				commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
668					change,
669					&self.options,
670					self.db_version,
671				)?
672			}
673		}
674
675		self.commit_raw(commit)
676	}
677
678	fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
679		let mut queue = self.commit_queue.lock();
680
681		#[cfg(any(test, feature = "instrumentation"))]
682		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
683		#[cfg(not(any(test, feature = "instrumentation")))]
684		let might_wait_because_the_queue_is_full = true;
685		if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
686			log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
687			self.commit_queue_full_cv.wait(&mut queue);
688		}
689
690		{
691			let bg_err = self.bg_err.lock();
692			if let Some(err) = &*bg_err {
693				return Err(Error::Background(err.clone()))
694			}
695		}
696
697		let mut overlay = self.commit_overlay.write();
698
699		queue.record_id += 1;
700		let record_id = queue.record_id;
701
702		let mut bytes = 0;
703		for (c, indexed) in &commit.indexed {
704			indexed.copy_to_overlay(
705				&mut overlay[*c as usize],
706				record_id,
707				&mut bytes,
708				&self.options,
709			)?;
710		}
711
712		for (c, iterset) in &commit.btree_indexed {
713			iterset.copy_to_overlay(
714				&mut overlay[*c as usize].btree_indexed,
715				record_id,
716				&mut bytes,
717				&self.options,
718			)?;
719		}
720
721		let commit = Commit { id: record_id, changeset: commit, bytes };
722
723		log::debug!(
724			target: "parity-db",
725			"Queued commit {}, {} bytes",
726			commit.id,
727			bytes,
728		);
729		queue.commits.push_back(commit);
730		queue.bytes += bytes;
731		self.log_worker_wait.signal();
732		Ok(())
733	}
734
735	fn defer_commit(
736		&self,
737		mut queue: MutexGuard<CommitQueue>,
738		mut commit: CommitChangeSet,
739		old_bytes: usize,
740		old_id: u64,
741		new_id: Option<u64>,
742	) -> Result<()> {
743		let record_id = if let Some(id) = new_id {
744			id
745		} else {
746			queue.record_id += 1;
747			queue.record_id
748		};
749
750		let bytes = if record_id != old_id {
751			let mut overlay = self.commit_overlay.write();
752
753			let mut bytes = 0;
754
755			for (c, indexed) in &commit.indexed {
756				indexed.copy_to_overlay(
757					&mut overlay[*c as usize],
758					record_id,
759					&mut bytes,
760					&self.options,
761				)?;
762			}
763
764			for (c, iterset) in &commit.btree_indexed {
765				iterset.copy_to_overlay(
766					&mut overlay[*c as usize].btree_indexed,
767					record_id,
768					&mut bytes,
769					&self.options,
770				)?;
771			}
772
773			{
774				// Cleanup the commit overlay with old id.
775				for (c, key_values) in commit.indexed.iter() {
776					key_values.clean_overlay(&mut overlay[*c as usize], old_id);
777				}
778				for (c, iterset) in commit.btree_indexed.iter_mut() {
779					iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, old_id);
780				}
781			}
782
783			bytes
784		} else {
785			old_bytes
786		};
787
788		let commit = Commit { id: record_id, changeset: commit, bytes };
789
790		log::debug!(
791			target: "parity-db",
792			"Deferred commit, old id: {}, new id: {}",
793			old_id,
794			record_id,
795		);
796		queue.commits.push_back(commit);
797		queue.bytes += bytes;
798		Ok(())
799	}
800
801	fn process_commits(&self, db: &Arc<DbInner>) -> Result<bool> {
802		#[cfg(any(test, feature = "instrumentation"))]
803		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
804		#[cfg(not(any(test, feature = "instrumentation")))]
805		let might_wait_because_the_queue_is_full = true;
806		if might_wait_because_the_queue_is_full {
807			// Wait if the queue is full.
808			let mut queue = self.log_queue_wait.work.lock();
809			if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
810				log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
811				self.log_queue_wait.cv.wait(&mut queue);
812			}
813		}
814		let commit = {
815			let mut queue = self.commit_queue.lock();
816			if let Some(commit) = queue.commits.pop_front() {
817				queue.bytes -= commit.bytes;
818				log::debug!(
819					target: "parity-db",
820					"Removed {}. Still queued commits {} bytes",
821					commit.bytes,
822					queue.bytes,
823				);
824				if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
825					(queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
826				{
827					// Past the waiting threshold.
828					log::debug!(
829						target: "parity-db",
830						"Waking up commit queue worker",
831					);
832					self.commit_queue_full_cv.notify_all();
833				}
834				Some(commit)
835			} else {
836				None
837			}
838		};
839
840		if let Some(mut commit) = commit {
841			if commit.changeset.check_for_deferral {
842				let mut defer = false;
843				'outer: for (col, key_values) in commit.changeset.indexed.iter() {
844					for change in &key_values.node_changes {
845						if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
846							// Check if there are currently any locks on the tree. Will need to
847							// defer if there are.
848							let trees = self.trees.read();
849							if let Some(column_trees) = trees.get(&col) {
850								let mut tree_active = false;
851								if let Some(reader) = column_trees.readers.get(hash) {
852									let reader = reader.upgrade();
853									if let Some(reader) = reader {
854										if reader.is_locked() {
855											tree_active = true;
856										}
857									}
858								}
859								if tree_active {
860									defer = true;
861									break 'outer
862								}
863							}
864							drop(trees);
865
866							// Also check if there are any later commits in the queue that use this
867							// tree. Will need to defer if there are.
868							let queue = self.commit_queue.lock();
869							for commit in &queue.commits {
870								for (_col, change_set) in &commit.changeset.indexed {
871									for tree in &change_set.used_trees {
872										if tree == hash {
873											defer = true;
874											break 'outer
875										}
876									}
877								}
878							}
879						}
880					}
881				}
882				if defer {
883					let queue = self.commit_queue.lock();
884					let new_id = if queue.commits.len() > 0 {
885						// Generate a new id
886						None
887					} else {
888						// Nothing else in the queue so can reuse same id
889						Some(commit.id)
890					};
891					self.defer_commit(queue, commit.changeset, commit.bytes, commit.id, new_id)?;
892
893					return Ok(true)
894				} else {
895					for (col, key_values) in commit.changeset.indexed.iter() {
896						for change in &key_values.node_changes {
897							if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
898								let mut trees = self.trees.write();
899								if let Some(column_trees) = trees.get_mut(&col) {
900									let count = column_trees.to_dereference.get(hash).unwrap_or(&0);
901									assert!(*count > 0);
902									if *count == 1 {
903										column_trees.to_dereference.remove(hash);
904									} else {
905										column_trees.to_dereference.insert(*hash, count - 1);
906									}
907								}
908							}
909						}
910					}
911				}
912			}
913
914			let mut reindex = false;
915			let mut writer = self.log.begin_record();
916			log::debug!(
917				target: "parity-db",
918				"Processing commit {}, record {}, {} bytes",
919				commit.id,
920				writer.record_id(),
921				commit.bytes,
922			);
923			let mut ops: u64 = 0;
924			for (c, key_values) in commit.changeset.indexed.iter() {
925				key_values.write_plan(
926					db,
927					*c,
928					&self.columns[*c as usize],
929					&mut writer,
930					&mut ops,
931					&mut reindex,
932				)?;
933			}
934
935			for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
936				match &self.columns[*c as usize] {
937					Column::Hash(_column) =>
938						return Err(Error::InvalidConfiguration(
939							"Not an indexed column.".to_string(),
940						)),
941					Column::Tree(column) => {
942						btree.write_plan(column, &mut writer, &mut ops)?;
943					},
944				}
945			}
946
947			// Collect final changes to value tables
948			for c in self.columns.iter() {
949				c.complete_plan(&mut writer)?;
950			}
951			let record_id = writer.record_id();
952			let l = writer.drain();
953
954			let bytes = {
955				let bytes = self.log.end_record(l)?;
956				let mut logged_bytes = self.log_queue_wait.work.lock();
957				*logged_bytes += bytes as i64;
958				self.flush_worker_wait.signal();
959				bytes
960			};
961
962			{
963				// Cleanup the commit overlay.
964				let mut overlay = self.commit_overlay.write();
965				for (c, key_values) in commit.changeset.indexed.iter() {
966					key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
967				}
968				for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
969					iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
970				}
971			}
972
973			if reindex {
974				self.start_reindex(record_id);
975			}
976
977			log::debug!(
978				target: "parity-db",
979				"Processed commit {} (record {}), {} ops, {} bytes written",
980				commit.id,
981				record_id,
982				ops,
983				bytes,
984			);
985			Ok(true)
986		} else {
987			Ok(false)
988		}
989	}
990
991	fn start_reindex(&self, record_id: u64) {
992		log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
993		self.next_reindex.store(record_id, Ordering::SeqCst);
994	}
995
996	fn process_reindex(&self) -> Result<bool> {
997		let next_reindex = self.next_reindex.load(Ordering::SeqCst);
998		if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
999			return Ok(false)
1000		}
1001		// Process any pending reindexes
1002		for column in self.columns.iter() {
1003			let column = if let Column::Hash(c) = column { c } else { continue };
1004			let ReindexBatch {
1005				drop_index,
1006				batch,
1007				drop_ref_count,
1008				ref_count_batch,
1009				ref_count_batch_source,
1010			} = column.reindex(&self.log)?;
1011			if !batch.is_empty() || drop_index.is_some() {
1012				debug_assert!(
1013					ref_count_batch.is_empty() &&
1014						ref_count_batch_source.is_none() &&
1015						drop_ref_count.is_none()
1016				);
1017				let mut next_reindex = false;
1018				let mut writer = self.log.begin_record();
1019				log::debug!(
1020					target: "parity-db",
1021					"Creating reindex record {}",
1022					writer.record_id(),
1023				);
1024				for (key, address) in batch.into_iter() {
1025					if let PlanOutcome::NeedReindex =
1026						column.write_reindex_plan(&key, address, &mut writer)?
1027					{
1028						next_reindex = true
1029					}
1030				}
1031				if let Some(table) = drop_index {
1032					writer.drop_table(table);
1033				}
1034				let record_id = writer.record_id();
1035				let l = writer.drain();
1036
1037				let mut logged_bytes = self.log_queue_wait.work.lock();
1038				let bytes = self.log.end_record(l)?;
1039				log::debug!(
1040					target: "parity-db",
1041					"Created reindex record {}, {} bytes",
1042					record_id,
1043					bytes,
1044				);
1045				*logged_bytes += bytes as i64;
1046				if next_reindex {
1047					self.start_reindex(record_id);
1048				}
1049				self.flush_worker_wait.signal();
1050				return Ok(true)
1051			}
1052			if !ref_count_batch.is_empty() || drop_ref_count.is_some() {
1053				debug_assert!(batch.is_empty() && drop_index.is_none());
1054				debug_assert!(ref_count_batch_source.is_some());
1055				let ref_count_source = ref_count_batch_source.unwrap();
1056				let mut next_reindex = false;
1057				let mut writer = self.log.begin_record();
1058				log::debug!(
1059					target: "parity-db",
1060					"Creating ref count reindex record {}",
1061					writer.record_id(),
1062				);
1063				for (address, ref_count) in ref_count_batch.into_iter() {
1064					if let PlanOutcome::NeedReindex = column.write_ref_count_reindex_plan(
1065						address,
1066						ref_count,
1067						ref_count_source,
1068						&mut writer,
1069					)? {
1070						next_reindex = true
1071					}
1072				}
1073				if let Some(table) = drop_ref_count {
1074					writer.drop_ref_count_table(table);
1075				}
1076				let record_id = writer.record_id();
1077				let l = writer.drain();
1078
1079				let mut logged_bytes = self.log_queue_wait.work.lock();
1080				let bytes = self.log.end_record(l)?;
1081				log::debug!(
1082					target: "parity-db",
1083					"Created ref count reindex record {}, {} bytes",
1084					record_id,
1085					bytes,
1086				);
1087				*logged_bytes += bytes as i64;
1088				if next_reindex {
1089					self.start_reindex(record_id);
1090				}
1091				self.flush_worker_wait.signal();
1092				return Ok(true)
1093			}
1094		}
1095		self.next_reindex.store(0, Ordering::SeqCst);
1096		Ok(false)
1097	}
1098
1099	fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
1100		let _iteration_lock = self.iteration_lock.lock();
1101		let cleared = {
1102			let reader = match self.log.read_next(validation_mode) {
1103				Ok(reader) => reader,
1104				Err(Error::Corruption(_)) if validation_mode => {
1105					log::debug!(target: "parity-db", "Bad log header");
1106					self.log.clear_replay_logs();
1107					return Ok(false)
1108				},
1109				Err(e) => return Err(e),
1110			};
1111			if let Some(mut reader) = reader {
1112				log::debug!(
1113					target: "parity-db",
1114					"Enacting log record {}",
1115					reader.record_id(),
1116				);
1117				if validation_mode {
1118					if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
1119						log::warn!(
1120							target: "parity-db",
1121							"Log sequence error. Expected record {}, got {}",
1122							self.last_enacted.load(Ordering::Relaxed) + 1,
1123							reader.record_id(),
1124						);
1125						drop(reader);
1126						self.log.clear_replay_logs();
1127						return Ok(false)
1128					}
1129					// Validate all records before applying anything
1130					loop {
1131						let next = match reader.next() {
1132							Ok(next) => next,
1133							Err(e) => {
1134								log::debug!(target: "parity-db", "Error reading log: {:?}", e);
1135								return Ok(false)
1136							},
1137						};
1138						match next {
1139							LogAction::BeginRecord => {
1140								log::debug!(target: "parity-db", "Unexpected log header");
1141								drop(reader);
1142								self.log.clear_replay_logs();
1143								return Ok(false)
1144							},
1145							LogAction::EndRecord => break,
1146							LogAction::InsertIndex(insertion) => {
1147								let col = insertion.table.col() as usize;
1148								if let Err(e) = self.columns.get(col).map_or_else(
1149									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1150									|col| {
1151										col.validate_plan(
1152											LogAction::InsertIndex(insertion),
1153											&mut reader,
1154										)
1155									},
1156								) {
1157									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1158									drop(reader);
1159									self.log.clear_replay_logs();
1160									return Ok(false)
1161								}
1162							},
1163							LogAction::InsertValue(insertion) => {
1164								let col = insertion.table.col() as usize;
1165								if let Err(e) = self.columns.get(col).map_or_else(
1166									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1167									|col| {
1168										col.validate_plan(
1169											LogAction::InsertValue(insertion),
1170											&mut reader,
1171										)
1172									},
1173								) {
1174									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1175									drop(reader);
1176									self.log.clear_replay_logs();
1177									return Ok(false)
1178								}
1179							},
1180							LogAction::InsertRefCount(insertion) => {
1181								let col = insertion.table.col() as usize;
1182								if let Err(e) = self.columns.get(col).map_or_else(
1183									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1184									|col| {
1185										col.validate_plan(
1186											LogAction::InsertRefCount(insertion),
1187											&mut reader,
1188										)
1189									},
1190								) {
1191									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1192									drop(reader);
1193									self.log.clear_replay_logs();
1194									return Ok(false)
1195								}
1196							},
1197							LogAction::DropTable(_) | LogAction::DropRefCountTable(_) => continue,
1198						}
1199					}
1200					reader.reset()?;
1201					reader.next()?;
1202				}
1203				loop {
1204					match reader.next()? {
1205						LogAction::BeginRecord =>
1206							return Err(Error::Corruption("Bad log record".into())),
1207						LogAction::EndRecord => break,
1208						LogAction::InsertIndex(insertion) => {
1209							self.columns[insertion.table.col() as usize]
1210								.enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
1211						},
1212						LogAction::InsertValue(insertion) => {
1213							self.columns[insertion.table.col() as usize]
1214								.enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
1215						},
1216						LogAction::InsertRefCount(insertion) => {
1217							self.columns[insertion.table.col() as usize]
1218								.enact_plan(LogAction::InsertRefCount(insertion), &mut reader)?;
1219						},
1220						LogAction::DropTable(id) => {
1221							log::debug!(
1222								target: "parity-db",
1223								"Dropping index {}",
1224								id,
1225							);
1226							match &self.columns[id.col() as usize] {
1227								Column::Hash(col) => {
1228									col.drop_index(id)?;
1229									// Check if there's another reindex on the next iteration
1230									self.start_reindex(reader.record_id());
1231								},
1232								Column::Tree(_) => (),
1233							}
1234						},
1235						LogAction::DropRefCountTable(id) => {
1236							log::debug!(
1237								target: "parity-db",
1238								"Dropping ref count {}",
1239								id,
1240							);
1241							match &self.columns[id.col() as usize] {
1242								Column::Hash(col) => {
1243									col.drop_ref_count(id)?;
1244									// Check if there's another reindex on the next iteration
1245									self.start_reindex(reader.record_id());
1246								},
1247								Column::Tree(_) => (),
1248							}
1249						},
1250					}
1251				}
1252				log::debug!(
1253					target: "parity-db",
1254					"Enacted log record {}, {} bytes",
1255					reader.record_id(),
1256					reader.read_bytes(),
1257				);
1258				let record_id = reader.record_id();
1259				let bytes = reader.read_bytes();
1260				let cleared = reader.drain();
1261				self.last_enacted.store(record_id, Ordering::SeqCst);
1262				Some((record_id, cleared, bytes))
1263			} else {
1264				log::debug!(target: "parity-db", "End of log");
1265				None
1266			}
1267		};
1268
1269		if let Some((record_id, cleared, bytes)) = cleared {
1270			self.log.end_read(cleared, record_id);
1271			{
1272				if !validation_mode {
1273					let mut queue = self.log_queue_wait.work.lock();
1274					if *queue < bytes as i64 {
1275						log::warn!(
1276							target: "parity-db",
1277							"Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
1278							record_id,
1279							bytes,
1280							*queue,
1281							self.next_reindex.load(Ordering::SeqCst),
1282						);
1283					}
1284					*queue -= bytes as i64;
1285					if *queue <= MAX_LOG_QUEUE_BYTES &&
1286						(*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
1287					{
1288						self.log_queue_wait.cv.notify_one();
1289					}
1290					log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
1291				}
1292
1293				let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
1294				let dirty_logs = self.log.num_dirty_logs();
1295				if !validation_mode {
1296					while self.log.num_dirty_logs() > max_logs {
1297						log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
1298						self.cleanup_queue_wait.wait();
1299					}
1300				}
1301			}
1302			Ok(true)
1303		} else {
1304			Ok(false)
1305		}
1306	}
1307
1308	fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
1309		let has_flushed = self.log.flush_one(min_log_size)?;
1310		if has_flushed {
1311			self.commit_worker_wait.signal();
1312		}
1313		Ok(has_flushed)
1314	}
1315
1316	fn clean_logs(&self) -> Result<bool> {
1317		let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
1318		let num_cleanup = self.log.num_dirty_logs();
1319		let result = if num_cleanup > keep_logs {
1320			if self.options.sync_data {
1321				for c in self.columns.iter() {
1322					c.flush()?;
1323				}
1324			}
1325			self.log.clean_logs(num_cleanup - keep_logs)?
1326		} else {
1327			false
1328		};
1329		self.cleanup_queue_wait.signal();
1330		Ok(result)
1331	}
1332
1333	fn clean_all_logs(&self) -> Result<()> {
1334		for c in self.columns.iter() {
1335			c.flush()?;
1336		}
1337		let num_cleanup = self.log.num_dirty_logs();
1338		self.log.clean_logs(num_cleanup)?;
1339		Ok(())
1340	}
1341
1342	fn replay_all_logs(&self) -> Result<()> {
1343		while let Some(id) = self.log.replay_next()? {
1344			log::debug!(target: "parity-db", "Replaying database log {}", id);
1345			while self.enact_logs(true)? {}
1346		}
1347
1348		// Re-read any cached metadata
1349		for c in self.columns.iter() {
1350			c.refresh_metadata()?;
1351		}
1352		log::debug!(target: "parity-db", "Replay is complete.");
1353		Ok(())
1354	}
1355
1356	fn shutdown(&self) {
1357		self.shutdown.store(true, Ordering::SeqCst);
1358		self.log_queue_wait.cv.notify_one();
1359		self.flush_worker_wait.signal();
1360		self.log_worker_wait.signal();
1361		self.commit_worker_wait.signal();
1362		self.cleanup_worker_wait.signal();
1363	}
1364
1365	fn kill_logs(&self, db: &Arc<DbInner>) -> Result<()> {
1366		{
1367			if let Some(err) = self.bg_err.lock().as_ref() {
1368				// On error the log reader may be left in inconsistent state. So it is important
1369				// to no attempt any further log enactment.
1370				log::debug!(target: "parity-db", "Shutdown with error state {}", err);
1371				self.log.clean_logs(self.log.num_dirty_logs())?;
1372				return Ok(())
1373			}
1374		}
1375		log::debug!(target: "parity-db", "Processing leftover commits");
1376		// Finish logged records and proceed to log and enact queued commits.
1377		while self.enact_logs(false)? {}
1378		self.flush_logs(0)?;
1379		while self.process_commits(db)? {}
1380		while self.enact_logs(false)? {}
1381		self.flush_logs(0)?;
1382		while self.enact_logs(false)? {}
1383		self.clean_all_logs()?;
1384		self.log.kill_logs()?;
1385		if self.options.stats {
1386			let mut path = self.options.path.clone();
1387			path.push("stats.txt");
1388			match std::fs::File::create(path) {
1389				Ok(file) => {
1390					let mut writer = std::io::BufWriter::new(file);
1391					if let Err(e) = self.write_stats_text(&mut writer, None) {
1392						log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
1393					}
1394				},
1395				Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
1396			}
1397		}
1398		Ok(())
1399	}
1400
1401	fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
1402		if let Some(col) = column {
1403			self.columns[col as usize].write_stats_text(writer)
1404		} else {
1405			for c in self.columns.iter() {
1406				c.write_stats_text(writer)?;
1407			}
1408			Ok(())
1409		}
1410	}
1411
1412	fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1413		if let Some(col) = column {
1414			self.columns[col as usize].clear_stats()
1415		} else {
1416			for c in self.columns.iter() {
1417				c.clear_stats()?;
1418			}
1419			Ok(())
1420		}
1421	}
1422
1423	fn stats(&self) -> StatSummary {
1424		StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
1425	}
1426
1427	fn store_err(&self, result: Result<()>) {
1428		if let Err(e) = result {
1429			log::warn!(target: "parity-db", "Background worker error: {}", e);
1430			let mut err = self.bg_err.lock();
1431			if err.is_none() {
1432				*err = Some(Arc::new(e));
1433				self.shutdown();
1434			}
1435			self.commit_queue_full_cv.notify_all();
1436		}
1437	}
1438
1439	fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1440		let _lock = self.iteration_lock.lock();
1441		match &self.columns[c as usize] {
1442			Column::Hash(column) => column.iter_values(&self.log, f),
1443			Column::Tree(_) => unimplemented!(),
1444		}
1445	}
1446
1447	fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
1448		let _lock = self.iteration_lock.lock();
1449		match &self.columns[c as usize] {
1450			Column::Hash(column) => column.iter_index(&self.log, f),
1451			Column::Tree(_) => unimplemented!(),
1452		}
1453	}
1454}
1455
1456/// Database instance.
1457pub struct Db {
1458	inner: Arc<DbInner>,
1459	commit_thread: Option<thread::JoinHandle<()>>,
1460	flush_thread: Option<thread::JoinHandle<()>>,
1461	log_thread: Option<thread::JoinHandle<()>>,
1462	cleanup_thread: Option<thread::JoinHandle<()>>,
1463}
1464
1465impl Db {
1466	#[cfg(test)]
1467	pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
1468		let options = Options::with_columns(path, num_columns);
1469		Self::open_inner(&options, OpeningMode::Create)
1470	}
1471
1472	/// Open the database with given options. An error will be returned if the database does not
1473	/// exist.
1474	pub fn open(options: &Options) -> Result<Db> {
1475		Self::open_inner(options, OpeningMode::Write)
1476	}
1477
1478	/// Open the database using given options. If the database does not exist it will be created
1479	/// empty.
1480	pub fn open_or_create(options: &Options) -> Result<Db> {
1481		Self::open_inner(options, OpeningMode::Create)
1482	}
1483
1484	/// Open an existing database in read-only mode.
1485	pub fn open_read_only(options: &Options) -> Result<Db> {
1486		Self::open_inner(options, OpeningMode::ReadOnly)
1487	}
1488
1489	fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
1490		assert!(options.is_valid());
1491		let mut db = DbInner::open(options, opening_mode)?;
1492		// This needs to be call before log thread: so first reindexing
1493		// will run in correct state.
1494		if let Err(e) = db.replay_all_logs() {
1495			log::debug!(target: "parity-db", "Error during log replay.");
1496			return Err(e)
1497		} else {
1498			db.log.clear_replay_logs();
1499			db.clean_all_logs()?;
1500			db.log.kill_logs()?;
1501		}
1502		db.init_table_data()?;
1503		let db = Arc::new(db);
1504		#[cfg(any(test, feature = "instrumentation"))]
1505		let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
1506		#[cfg(not(any(test, feature = "instrumentation")))]
1507		let start_threads = opening_mode != OpeningMode::ReadOnly;
1508		let commit_thread = if start_threads {
1509			let commit_worker_db = db.clone();
1510			Some(thread::spawn(move || {
1511				commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
1512			}))
1513		} else {
1514			None
1515		};
1516		let flush_thread = if start_threads {
1517			let flush_worker_db = db.clone();
1518			#[cfg(any(test, feature = "instrumentation"))]
1519			let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
1520			#[cfg(not(any(test, feature = "instrumentation")))]
1521			let min_log_size = MIN_LOG_SIZE_BYTES;
1522			Some(thread::spawn(move || {
1523				flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
1524			}))
1525		} else {
1526			None
1527		};
1528		let log_thread = if start_threads {
1529			let log_worker_db = db.clone();
1530			Some(thread::spawn(move || {
1531				log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
1532			}))
1533		} else {
1534			None
1535		};
1536		let cleanup_thread = if start_threads {
1537			let cleanup_worker_db = db.clone();
1538			Some(thread::spawn(move || {
1539				cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
1540			}))
1541		} else {
1542			None
1543		};
1544		Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
1545	}
1546
1547	/// Get a value in a specified column by key. Returns `None` if the key does not exist.
1548	pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
1549		self.inner.get(col, key, true)
1550	}
1551
1552	/// Get value size by key. Returns `None` if the key does not exist.
1553	pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
1554		self.inner.get_size(col, key)
1555	}
1556
1557	/// Iterate over all ordered key-value pairs. Only supported for columns configured with
1558	/// `btree_indexed`.
1559	pub fn iter(&self, col: ColId) -> Result<BTreeIterator> {
1560		self.inner.btree_iter(col)
1561	}
1562
1563	pub fn get_tree(
1564		&self,
1565		col: ColId,
1566		key: &[u8],
1567	) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
1568		self.inner.get_tree(&self.inner, col, key, true)
1569	}
1570
1571	pub fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
1572		self.inner.get_root(col, key)
1573	}
1574
1575	pub fn get_node(
1576		&self,
1577		col: ColId,
1578		node_address: NodeAddress,
1579	) -> Result<Option<(Vec<u8>, Children)>> {
1580		self.inner.get_node(col, node_address, true)
1581	}
1582
1583	pub fn get_node_children(
1584		&self,
1585		col: ColId,
1586		node_address: NodeAddress,
1587	) -> Result<Option<Children>> {
1588		self.inner.get_node_children(col, node_address, true)
1589	}
1590
1591	/// Commit a set of changes to the database.
1592	pub fn commit<I, K>(&self, tx: I) -> Result<()>
1593	where
1594		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1595		K: AsRef<[u8]>,
1596	{
1597		self.inner.commit(tx)
1598	}
1599
1600	/// Commit a set of changes to the database.
1601	pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1602	where
1603		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1604	{
1605		self.inner.commit_changes(tx)
1606	}
1607
1608	/// Commit a set of changes to the database.
1609	///
1610	/// This method passes values as `Arc<Vec<u8>>` potentially eliminating an extra copy.
1611	#[cfg(feature = "arc")]
1612	#[deprecated(note = "This method will be removed in future versions. Use `commit_changes_bytes` instead")]
1613	pub fn commit_changes_shared<I>(&self, tx: I) -> Result<()>
1614	where
1615		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Arc<Vec<u8>>>)>,
1616	{
1617		self.inner.commit_changes(tx)
1618	}
1619
1620	/// Commit a set of changes to the database.
1621	///
1622	/// This method passes values as `Bytes` potentially eliminating an extra copy.
1623	#[cfg(feature = "bytes")]
1624	pub fn commit_changes_bytes<I>(&self, tx: I) -> Result<()>
1625	where
1626		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Bytes>)>,
1627	{
1628		self.inner.commit_changes(tx)
1629	}
1630
1631	pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1632		self.inner.commit_raw(commit)
1633	}
1634
1635	/// Returns the number of columns in the database.
1636	pub fn num_columns(&self) -> u8 {
1637		self.inner.columns.len() as u8
1638	}
1639
1640	/// Iterate a column and call a function for each value. This is only supported for columns with
1641	/// `btree_index` set to `false`. Iteration order is unspecified.
1642	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1643	pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1644		self.inner.iter_column_while(c, f)
1645	}
1646
1647	/// Iterate a column and call a function for each value. This is only supported for columns with
1648	/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
1649	/// `key` field in the state is the hash of the original key.
1650	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1651	pub(crate) fn iter_column_index_while(
1652		&self,
1653		c: ColId,
1654		f: impl FnMut(IterState) -> bool,
1655	) -> Result<()> {
1656		self.inner.iter_column_index_while(c, f)
1657	}
1658
1659	fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1660		let mut more_work = false;
1661		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1662			if !more_work {
1663				db.cleanup_worker_wait.signal();
1664				if !db.log.has_log_files_to_read() {
1665					db.commit_worker_wait.wait();
1666				}
1667			}
1668
1669			more_work = db.enact_logs(false)?;
1670		}
1671		log::debug!(target: "parity-db", "Commit worker shutdown");
1672		Ok(())
1673	}
1674
1675	fn log_worker(db: Arc<DbInner>) -> Result<()> {
1676		// Start with pending reindex.
1677		let mut more_reindex = db.process_reindex()?;
1678		let mut more_commits = false;
1679		// Process all commits but allow reindex to be interrupted.
1680		while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1681			if !more_commits && !more_reindex {
1682				db.log_worker_wait.wait();
1683			}
1684
1685			more_commits = db.process_commits(&db)?;
1686			more_reindex = db.process_reindex()?;
1687		}
1688		log::debug!(target: "parity-db", "Log worker shutdown");
1689		Ok(())
1690	}
1691
1692	fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1693		let mut more_work = false;
1694		while !db.shutdown.load(Ordering::SeqCst) {
1695			if !more_work {
1696				db.flush_worker_wait.wait();
1697			}
1698			more_work = db.flush_logs(min_log_size)?;
1699		}
1700		log::debug!(target: "parity-db", "Flush worker shutdown");
1701		Ok(())
1702	}
1703
1704	fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1705		let mut more_work = true;
1706		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1707			if !more_work {
1708				db.cleanup_worker_wait.wait();
1709			}
1710			more_work = db.clean_logs()?;
1711		}
1712		log::debug!(target: "parity-db", "Cleanup worker shutdown");
1713		Ok(())
1714	}
1715
1716	/// Dump full database stats to the text output.
1717	pub fn write_stats_text(
1718		&self,
1719		writer: &mut impl std::io::Write,
1720		column: Option<u8>,
1721	) -> Result<()> {
1722		self.inner.write_stats_text(writer, column)
1723	}
1724
1725	/// Reset internal database statistics for the database or specified column.
1726	pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1727		self.inner.clear_stats(column)
1728	}
1729
1730	/// Print database contents in text form to stderr.
1731	pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1732		if let Some(col) = check_param.column {
1733			self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1734		} else {
1735			for (ix, c) in self.inner.columns.iter().enumerate() {
1736				c.dump(&self.inner.log, &check_param, ix as ColId)?;
1737			}
1738		}
1739		Ok(())
1740	}
1741
1742	/// Get database statistics.
1743	pub fn stats(&self) -> StatSummary {
1744		self.inner.stats()
1745	}
1746
1747	pub fn get_num_column_value_entries(&self, col: ColId) -> Result<u64> {
1748		let column = &self.inner.columns[col as usize];
1749		match column {
1750			Column::Hash(column) => return column.get_num_value_entries(),
1751			Column::Tree(..) =>
1752				return Err(Error::InvalidConfiguration(
1753					"get_num_column_value_entries not implemented for tree columns.".to_string(),
1754				)),
1755		}
1756	}
1757
1758	// We open the DB before to check metadata validity and make sure there are no pending WAL
1759	// logs.
1760	fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1761		let db = Db::open(options)?;
1762		let salt = db.inner.options.salt;
1763		drop(db);
1764		Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1765	}
1766
1767	/// Add a new column with options specified by `new_column_options`.
1768	pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1769		let salt = Self::precheck_column_operation(options)?;
1770
1771		options.columns.push(new_column_options);
1772		options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1773
1774		Ok(())
1775	}
1776
1777	/// Remove last column from the database.
1778	/// Db must be close when called.
1779	pub fn drop_last_column(options: &mut Options) -> Result<()> {
1780		let salt = Self::precheck_column_operation(options)?;
1781		let nb_column = options.columns.len();
1782		if nb_column == 0 {
1783			return Ok(())
1784		}
1785		let index = options.columns.len() - 1;
1786		Self::remove_column_files(options, index as u8)?;
1787		options.columns.pop();
1788		options.write_metadata(&options.path, &salt)?;
1789		Ok(())
1790	}
1791
1792	/// Truncate a column from the database, optionally changing its options.
1793	/// Db must be close when called.
1794	pub fn reset_column(
1795		options: &mut Options,
1796		index: u8,
1797		new_options: Option<ColumnOptions>,
1798	) -> Result<()> {
1799		let salt = Self::precheck_column_operation(options)?;
1800		Self::remove_column_files(options, index)?;
1801
1802		if let Some(new_options) = new_options {
1803			options.columns[index as usize] = new_options;
1804			options.write_metadata(&options.path, &salt)?;
1805		}
1806
1807		Ok(())
1808	}
1809
1810	fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1811		if index as usize >= options.columns.len() {
1812			return Err(Error::IncompatibleColumnConfig {
1813				id: index,
1814				reason: "Column not found".to_string(),
1815			})
1816		}
1817
1818		Column::drop_files(index, options.path.clone())?;
1819		Ok(())
1820	}
1821
1822	#[cfg(feature = "instrumentation")]
1823	pub fn process_reindex(&self) -> Result<()> {
1824		self.inner.process_reindex()?;
1825		Ok(())
1826	}
1827
1828	#[cfg(feature = "instrumentation")]
1829	pub fn process_commits(&self) -> Result<()> {
1830		self.inner.process_commits(&self.inner)?;
1831		Ok(())
1832	}
1833
1834	#[cfg(feature = "instrumentation")]
1835	pub fn flush_logs(&self) -> Result<()> {
1836		self.inner.flush_logs(0)?;
1837		Ok(())
1838	}
1839
1840	#[cfg(feature = "instrumentation")]
1841	pub fn enact_logs(&self) -> Result<()> {
1842		while self.inner.enact_logs(false)? {}
1843		Ok(())
1844	}
1845
1846	#[cfg(feature = "instrumentation")]
1847	pub fn clean_logs(&self) -> Result<()> {
1848		self.inner.clean_logs()?;
1849		Ok(())
1850	}
1851}
1852
1853impl Drop for Db {
1854	fn drop(&mut self) {
1855		self.drop_inner()
1856	}
1857}
1858
1859impl Db {
1860	fn drop_inner(&mut self) {
1861		self.inner.shutdown();
1862		if let Some(t) = self.log_thread.take() {
1863			if let Err(e) = t.join() {
1864				log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1865			}
1866		}
1867		if let Some(t) = self.flush_thread.take() {
1868			if let Err(e) = t.join() {
1869				log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1870			}
1871		}
1872		if let Some(t) = self.commit_thread.take() {
1873			if let Err(e) = t.join() {
1874				log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1875			}
1876		}
1877		if let Some(t) = self.cleanup_thread.take() {
1878			if let Err(e) = t.join() {
1879				log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1880			}
1881		}
1882		if let Err(e) = self.inner.kill_logs(&self.inner) {
1883			log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1884		}
1885		if let Err(e) = fs2::FileExt::unlock(&self.inner.lock_file) {
1886			log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1887		}
1888	}
1889}
1890
1891// Use a trait here to allow client code to have better control over lock guard lifetime without
1892// lifetime proliferation within Db (which would be required if not using a dynamic object).
1893pub trait TreeReader {
1894	fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>>;
1895	fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>>;
1896	fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>>;
1897}
1898
1899#[derive(Debug)]
1900pub struct DbTreeReader {
1901	db: Arc<DbInner>,
1902	col: ColId,
1903	key: Key,
1904}
1905
1906impl TreeReader for DbTreeReader {
1907	fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>> {
1908		/* let value = self.db.get(self.col, &self.key)?;
1909		if let Some(data) = value {
1910			return unpack_node_data(data).map(|x| Some(x))
1911		}
1912		Err(Error::InvalidValueData) */
1913
1914		match &self.db.columns[self.col as usize] {
1915			Column::Hash(column) => {
1916				let overlay = self.db.commit_overlay.read();
1917				// Check commit overlay first
1918				let value = if let Some(v) =
1919					overlay.get(self.col as usize).and_then(|o| o.get(&self.key))
1920				{
1921					Ok(v.map(|i| i.as_ref().to_vec()))
1922				} else {
1923					// Go into tables and log overlay.
1924					let log = self.db.log.overlays();
1925					Ok(column.get(&self.key, log)?.map(|(v, _rc)| v))
1926				}?;
1927
1928				if let Some(data) = value {
1929					return unpack_node_data(data).map(|x| Some(x))
1930				}
1931
1932				return Ok(None)
1933			},
1934			Column::Tree(..) =>
1935				return Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
1936		};
1937	}
1938
1939	fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>> {
1940		self.db.get_node(self.col, node_address, false)
1941	}
1942
1943	fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>> {
1944		self.db.get_node_children(self.col, node_address, false)
1945	}
1946}
1947
1948pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1949pub type AddressCommitOverlay = HashMap<u64, (u64, RcValue)>;
1950pub type BTreeCommitOverlay = BTreeMap<RcKey, (u64, Option<RcValue>)>;
1951
1952#[derive(Debug)]
1953pub struct CommitOverlay {
1954	indexed: IndexedCommitOverlay,
1955	address: AddressCommitOverlay,
1956	btree_indexed: BTreeCommitOverlay,
1957}
1958
1959impl CommitOverlay {
1960	fn new() -> Self {
1961		CommitOverlay {
1962			indexed: Default::default(),
1963			address: Default::default(),
1964			btree_indexed: Default::default(),
1965		}
1966	}
1967
1968	#[cfg(test)]
1969	fn is_empty(&self) -> bool {
1970		self.indexed.is_empty() && self.address.is_empty() && self.btree_indexed.is_empty()
1971	}
1972}
1973
1974impl CommitOverlay {
1975	fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1976		self.indexed.get(key).map(|(_, v)| v.as_ref())
1977	}
1978
1979	fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1980		self.get_ref(key).map(|v| v.cloned())
1981	}
1982
1983	fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1984		self.get_ref(key).map(|res| res.as_ref().map(|b| b.as_ref().len() as u32))
1985	}
1986
1987	fn get_address(&self, address: u64) -> Option<RcValue> {
1988		self.address.get(&address).map(|(_, v)| v.clone())
1989	}
1990
1991	fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1992		self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1993	}
1994
1995	pub fn btree_next(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
1996		use crate::btree::LastKey;
1997		match &last_key {
1998			LastKey::Start => self
1999				.btree_indexed
2000				.range::<[u8], _>(..)
2001				.next()
2002				.map(|(k, (_, v))| (k.clone(), v.clone())),
2003			LastKey::End => None,
2004			LastKey::At(key) => self
2005				.btree_indexed
2006				.range::<[u8], _>((Bound::Excluded(key.as_slice()), Bound::Unbounded))
2007				.next()
2008				.map(|(k, (_, v))| (k.clone(), v.clone())),
2009			LastKey::Seeked(key) => self
2010				.btree_indexed
2011				.range::<[u8], _>((Bound::Included(key.as_slice()), Bound::Unbounded))
2012				.next()
2013				.map(|(k, (_, v))| (k.clone(), v.clone())),
2014		}
2015	}
2016
2017	pub fn btree_prev(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
2018		use crate::btree::LastKey;
2019		match &last_key {
2020			LastKey::End => self
2021				.btree_indexed
2022				.range::<[u8], _>(..)
2023				.rev()
2024				.next()
2025				.map(|(k, (_, v))| (k.clone(), v.clone())),
2026			LastKey::Start => None,
2027			LastKey::At(key) => self
2028				.btree_indexed
2029				.range::<[u8], _>((Bound::Unbounded, Bound::Excluded(key.as_slice())))
2030				.rev()
2031				.next()
2032				.map(|(k, (_, v))| (k.clone(), v.clone())),
2033			LastKey::Seeked(key) => self
2034				.btree_indexed
2035				.range::<[u8], _>((Bound::Unbounded, Bound::Included(key.as_slice())))
2036				.rev()
2037				.next()
2038				.map(|(k, (_, v))| (k.clone(), v.clone())),
2039		}
2040	}
2041}
2042
2043/// Different operations allowed for a commit.
2044/// Behavior may differs depending on column configuration.
2045#[derive(Debug, PartialEq, Eq)]
2046pub enum Operation<Key, Value> {
2047	/// Insert or update the value for a given key.
2048	Set(Key, Value),
2049
2050	/// Dereference at a given key, resulting in
2051	/// either removal of a key value or decrement of its
2052	/// reference count counter.
2053	Dereference(Key),
2054
2055	/// Increment the reference count counter of an existing value for a given key.
2056	/// If no value exists for the key, this operation is skipped.
2057	Reference(Key),
2058
2059	/// Insert a new tree into a MultiTree column using root key and node structure.
2060	InsertTree(Key, NewNode),
2061
2062	/// Increment the reference count of a tree (at root Key) from a MultiTree column.
2063	ReferenceTree(Key),
2064
2065	/// Dereference an existing tree (at root Key) from a MultiTree column, resulting in either
2066	/// removal of the tree or decrement of its reference count.
2067	DereferenceTree(Key),
2068}
2069
2070impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
2071	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2072		Some(self.cmp(other))
2073	}
2074}
2075
2076impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
2077	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2078		self.key().cmp(other.key())
2079	}
2080}
2081
2082impl<Key, Value> Operation<Key, Value> {
2083	pub fn key(&self) -> &Key {
2084		match self {
2085			Operation::Set(k, _) |
2086			Operation::Dereference(k) |
2087			Operation::Reference(k) |
2088			Operation::InsertTree(k, _) |
2089			Operation::ReferenceTree(k) |
2090			Operation::DereferenceTree(k) => k,
2091		}
2092	}
2093
2094	pub fn into_key(self) -> Key {
2095		match self {
2096			Operation::Set(k, _) |
2097			Operation::Dereference(k) |
2098			Operation::Reference(k) |
2099			Operation::InsertTree(k, _) |
2100			Operation::ReferenceTree(k) |
2101			Operation::DereferenceTree(k) => k,
2102		}
2103	}
2104}
2105
2106impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
2107	pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
2108		match self {
2109			Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
2110			Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
2111			Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
2112			Operation::InsertTree(k, n) => Operation::InsertTree(k.as_ref().to_vec(), n),
2113			Operation::ReferenceTree(k) => Operation::ReferenceTree(k.as_ref().to_vec()),
2114			Operation::DereferenceTree(k) => Operation::DereferenceTree(k.as_ref().to_vec()),
2115		}
2116	}
2117}
2118
2119#[derive(Debug, PartialEq, Eq)]
2120pub enum NodeChange {
2121	/// (address, value)
2122	NewValue(u64, RcValue),
2123	/// (address)
2124	IncrementReference(u64),
2125	/// Dereference and remove any of the children in the tree
2126	DereferenceChildren(Vec<u8>, Key, Children),
2127}
2128
2129#[derive(Debug, Default)]
2130pub struct CommitChangeSet {
2131	pub indexed: HashMap<ColId, IndexedChangeSet>,
2132	pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
2133	pub check_for_deferral: bool,
2134}
2135
2136#[derive(Debug)]
2137pub struct IndexedChangeSet {
2138	pub col: ColId,
2139	pub changes: Vec<Operation<Key, RcValue>>,
2140	pub node_changes: Vec<NodeChange>,
2141	pub used_trees: HashSet<Key>,
2142}
2143
2144impl IndexedChangeSet {
2145	pub fn new(col: ColId) -> Self {
2146		IndexedChangeSet {
2147			col,
2148			changes: Default::default(),
2149			node_changes: Default::default(),
2150			used_trees: Default::default(),
2151		}
2152	}
2153
2154	fn push<K: AsRef<[u8]>, V: Into<RcValue>>(
2155		&mut self,
2156		change: Operation<K, V>,
2157		options: &Options,
2158		db_version: u32,
2159	) -> Result<()> {
2160		let salt = options.salt.unwrap_or_default();
2161		let hash_key = |key: &[u8]| -> Key {
2162			hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
2163		};
2164
2165		self.push_change_hashed(match change {
2166			Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
2167			Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
2168			Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
2169			Operation::InsertTree(..) |
2170			Operation::ReferenceTree(..) |
2171			Operation::DereferenceTree(..) =>
2172				return Err(Error::InvalidInput(format!(
2173					"Invalid operation for column {}",
2174					self.col
2175				))),
2176		});
2177
2178		Ok(())
2179	}
2180
2181	fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
2182		self.changes.push(change);
2183	}
2184
2185	fn push_node_change(&mut self, change: NodeChange) {
2186		self.node_changes.push(change);
2187	}
2188
2189	fn copy_to_overlay(
2190		&self,
2191		overlay: &mut CommitOverlay,
2192		record_id: u64,
2193		bytes: &mut usize,
2194		options: &Options,
2195	) -> Result<()> {
2196		let ref_counted = options.columns[self.col as usize].ref_counted;
2197		for change in self.changes.iter() {
2198			match &change {
2199				Operation::Set(k, v) => {
2200					*bytes += k.len();
2201					*bytes += v.as_ref().len();
2202					overlay.indexed.insert(*k, (record_id, Some(v.clone())));
2203				},
2204				Operation::Dereference(k) => {
2205					// Don't add removed ref-counted values to overlay.
2206					if !ref_counted {
2207						overlay.indexed.insert(*k, (record_id, None));
2208					}
2209				},
2210				Operation::Reference(..) => {
2211					// Don't add (we allow remove value in overlay when using rc: some
2212					// indexing on top of it is expected).
2213					if !ref_counted {
2214						return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
2215					}
2216				},
2217				Operation::InsertTree(..) |
2218				Operation::ReferenceTree(..) |
2219				Operation::DereferenceTree(..) =>
2220					return Err(Error::InvalidInput(format!(
2221						"Invalid operation for column {}",
2222						self.col
2223					))),
2224			}
2225		}
2226		for change in self.node_changes.iter() {
2227			if let NodeChange::NewValue(address, val) = change {
2228				*bytes += val.as_ref().len();
2229				overlay.address.insert(*address, (record_id, val.clone()));
2230			}
2231		}
2232		Ok(())
2233	}
2234
2235	fn write_plan(
2236		&self,
2237		db: &Arc<DbInner>,
2238		col: ColId,
2239		column: &Column,
2240		writer: &mut crate::log::LogWriter,
2241		ops: &mut u64,
2242		reindex: &mut bool,
2243	) -> Result<()> {
2244		let column = match column {
2245			Column::Hash(column) => column,
2246			Column::Tree(_) => {
2247				log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
2248				return Ok(())
2249			},
2250		};
2251		for change in self.changes.iter() {
2252			if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
2253				// Reindex has triggered another reindex.
2254				*reindex = true;
2255			}
2256			*ops += 1;
2257		}
2258		for change in self.node_changes.iter() {
2259			match change {
2260				NodeChange::NewValue(address, val) => {
2261					column.write_address_value_plan(
2262						*address,
2263						val.clone(),
2264						false,
2265						val.as_ref().len() as u32,
2266						writer,
2267					)?;
2268				},
2269				NodeChange::IncrementReference(address) => {
2270					if let PlanOutcome::NeedReindex =
2271						column.write_address_inc_ref_plan(*address, writer)?
2272					{
2273						*reindex = true;
2274					}
2275				},
2276				NodeChange::DereferenceChildren(key, hash, children) => {
2277					if let Some((_root, rc)) = column.get(hash, writer)? {
2278						column.write_plan(&Operation::Dereference(*hash), writer)?;
2279						log::debug!(target: "parity-db", "Dereferencing root, rc={}", rc);
2280						if rc == 1 {
2281							let tree = db.get_tree(db, col, key, false).unwrap();
2282							if let Some(tree) = tree {
2283								let guard = tree.write();
2284								let mut num_removed = 0;
2285								self.write_dereference_children_plan(
2286									column,
2287									&guard,
2288									children,
2289									&mut num_removed,
2290									writer,
2291								)?;
2292								log::debug!(target: "parity-db", "Dereferenced tree {:?}, removed {}", &key[0..3], num_removed);
2293							}
2294						}
2295					}
2296					// TODO: Remove TreeReader from Db.
2297				},
2298			}
2299		}
2300		Ok(())
2301	}
2302
2303	fn write_dereference_children_plan(
2304		&self,
2305		column: &HashColumn,
2306		guard: &RwLockWriteGuard<'_, Box<dyn TreeReader + Send + Sync>>,
2307		children: &Vec<u64>,
2308		num_removed: &mut u64,
2309		writer: &mut crate::log::LogWriter,
2310	) -> Result<()> {
2311		for address in children {
2312			// Can't move this after write_address_dec_ref_plan as write_address_dec_ref_plan might
2313			// free the node meaning it could get reclaimed. Then get_node_children will return
2314			// incorrect data.
2315			let node = guard.get_node_children(*address)?;
2316			let (remains, _outcome) = column.write_address_dec_ref_plan(*address, writer)?;
2317			if !remains {
2318				// Was removed
2319				*num_removed += 1;
2320				if let Some(children) = node {
2321					self.write_dereference_children_plan(
2322						column,
2323						guard,
2324						&children,
2325						num_removed,
2326						writer,
2327					)?;
2328				} else {
2329					return Err(Error::InvalidConfiguration("Missing node data".to_string()))
2330				}
2331			}
2332		}
2333		Ok(())
2334	}
2335
2336	fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
2337		use std::collections::hash_map::Entry;
2338		for change in self.changes.iter() {
2339			match change {
2340				Operation::Set(k, _) | Operation::Dereference(k) => {
2341					if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
2342						if e.get().0 == record_id {
2343							e.remove_entry();
2344						}
2345					}
2346				},
2347				Operation::Reference(..) |
2348				Operation::InsertTree(..) |
2349				Operation::ReferenceTree(..) |
2350				Operation::DereferenceTree(..) => (),
2351			}
2352		}
2353		for change in self.node_changes.iter() {
2354			if let NodeChange::NewValue(address, _val) = change {
2355				if let Entry::Occupied(e) = overlay.address.entry(*address) {
2356					if e.get().0 == record_id {
2357						e.remove_entry();
2358					}
2359				}
2360			}
2361		}
2362	}
2363}
2364
2365/// Verification operation utilities.
2366pub mod check {
2367	/// Database dump verbosity.
2368	pub enum CheckDisplay {
2369		/// Don't output any data.
2370		None,
2371		/// Output full data.
2372		Full,
2373		/// Limit value output to the specified size.
2374		Short(u64),
2375	}
2376
2377	/// Options for producing a database dump.
2378	pub struct CheckOptions {
2379		/// Only process this column. If this is `None` all columns will be processed.
2380		pub column: Option<u8>,
2381		/// Start with this index.
2382		pub from: Option<u64>,
2383		/// End with this index.
2384		pub bound: Option<u64>,
2385		/// Verbosity.
2386		pub display: CheckDisplay,
2387		/// Ordered validation.
2388		pub fast: bool,
2389		/// Make sure free lists are correct.
2390		pub validate_free_refs: bool,
2391	}
2392
2393	impl CheckOptions {
2394		/// Create a new instance.
2395		pub fn new(
2396			column: Option<u8>,
2397			from: Option<u64>,
2398			bound: Option<u64>,
2399			display_content: bool,
2400			truncate_value_display: Option<u64>,
2401			fast: bool,
2402			validate_free_refs: bool,
2403		) -> Self {
2404			let display = if display_content {
2405				match truncate_value_display {
2406					Some(t) => CheckDisplay::Short(t),
2407					None => CheckDisplay::Full,
2408				}
2409			} else {
2410				CheckDisplay::None
2411			};
2412			CheckOptions { column, from, bound, display, fast, validate_free_refs }
2413		}
2414	}
2415}
2416
2417#[derive(Eq, PartialEq, Clone, Copy)]
2418enum OpeningMode {
2419	Create,
2420	Write,
2421	ReadOnly,
2422}
2423
2424#[cfg(test)]
2425mod tests {
2426	use super::{Db, Options};
2427	use crate::{
2428		column::ColId,
2429		db::{DbInner, OpeningMode},
2430		ColumnOptions, Value,
2431	};
2432	use rand::Rng;
2433	use std::{
2434		collections::{BTreeMap, HashMap, HashSet},
2435		path::Path,
2436	};
2437	use tempfile::tempdir;
2438
2439	// This is used in tests to disable certain commit stages.
2440	#[derive(Eq, PartialEq, Debug, Clone, Copy)]
2441	enum EnableCommitPipelineStages {
2442		// No threads started, data stays in commit overlay.
2443		#[allow(dead_code)]
2444		CommitOverlay,
2445		// Log worker run, data processed up to the log overlay.
2446		#[allow(dead_code)]
2447		LogOverlay,
2448		// Runing all.
2449		#[allow(dead_code)]
2450		DbFile,
2451		// Default run mode.
2452		Standard,
2453	}
2454
2455	impl EnableCommitPipelineStages {
2456		fn options(&self, path: &Path, num_columns: u8) -> Options {
2457			Options {
2458				path: path.into(),
2459				sync_wal: true,
2460				sync_data: true,
2461				stats: true,
2462				salt: None,
2463				columns: (0..num_columns).map(|_| Default::default()).collect(),
2464				compression_threshold: HashMap::new(),
2465				with_background_thread: *self == Self::Standard,
2466				always_flush: *self == Self::DbFile,
2467			}
2468		}
2469
2470		fn run_stages(&self, db: &Db) {
2471			let db = &db.inner;
2472			if *self == EnableCommitPipelineStages::DbFile ||
2473				*self == EnableCommitPipelineStages::LogOverlay
2474			{
2475				while db.process_commits(db).unwrap() {}
2476				while db.process_reindex().unwrap() {}
2477			}
2478			if *self == EnableCommitPipelineStages::DbFile {
2479				let _ = db.log.flush_one(0).unwrap();
2480				while db.enact_logs(false).unwrap() {}
2481				let _ = db.clean_logs().unwrap();
2482			}
2483		}
2484
2485		fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
2486			match self {
2487				EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
2488					if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
2489						if !overlay.is_empty() {
2490							let mut replayed = 5;
2491							while !overlay.is_empty() {
2492								if replayed > 0 {
2493									replayed -= 1;
2494									// the signal is triggered just before cleaning the overlay, so
2495									// we wait a bit.
2496									// Warning this is still rather flaky and should be ignored
2497									// or removed.
2498									std::thread::sleep(std::time::Duration::from_millis(100));
2499								} else {
2500									return false
2501								}
2502							}
2503						}
2504					}
2505				},
2506				_ => (),
2507			}
2508			true
2509		}
2510	}
2511
2512	#[test]
2513	fn test_db_open_should_fail() {
2514		let tmp = tempdir().unwrap();
2515		let options = Options::with_columns(tmp.path(), 5);
2516		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2517	}
2518
2519	#[test]
2520	fn test_db_open_fail_then_recursively_create() {
2521		let tmp = tempdir().unwrap();
2522		let (db_path_first, db_path_last) = {
2523			let mut db_path_first = tmp.path().to_owned();
2524			db_path_first.push("nope");
2525
2526			let mut db_path_last = db_path_first.to_owned();
2527
2528			for p in ["does", "not", "yet", "exist"] {
2529				db_path_last.push(p);
2530			}
2531
2532			(db_path_first, db_path_last)
2533		};
2534
2535		assert!(
2536			!db_path_first.exists(),
2537			"That directory should not have existed at this point (dir: {db_path_first:?})"
2538		);
2539
2540		let options = Options::with_columns(&db_path_last, 5);
2541		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2542
2543		assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
2544		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2545
2546		assert!(
2547			db_path_first.is_dir(),
2548			"A directory should have been been created (dir: {db_path_first:?})"
2549		);
2550		assert!(
2551			db_path_last.is_dir(),
2552			"A directory should have been been created (dir: {db_path_last:?})"
2553		);
2554	}
2555
2556	#[test]
2557	fn test_db_open_or_create() {
2558		let tmp = tempdir().unwrap();
2559		let options = Options::with_columns(tmp.path(), 5);
2560		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2561		assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
2562	}
2563
2564	#[test]
2565	fn test_indexed_keyvalues() {
2566		test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
2567		test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
2568		test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
2569		test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
2570	}
2571	fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
2572		let tmp = tempdir().unwrap();
2573		let options = db_test.options(tmp.path(), 5);
2574		let col_nb = 0;
2575
2576		let key1 = b"key1".to_vec();
2577		let key2 = b"key2".to_vec();
2578		let key3 = b"key3".to_vec();
2579
2580		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2581		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2582
2583		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2584		db_test.run_stages(&db);
2585		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2586
2587		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2588
2589		db.commit(vec![
2590			(col_nb, key1.clone(), None),
2591			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2592			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2593		])
2594		.unwrap();
2595		db_test.run_stages(&db);
2596		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2597
2598		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2599		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2600		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2601
2602		db.commit(vec![
2603			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2604			(col_nb, key3.clone(), None),
2605		])
2606		.unwrap();
2607		db_test.run_stages(&db);
2608		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2609
2610		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2611		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2612		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2613	}
2614
2615	#[test]
2616	fn test_indexed_overlay_against_backend() {
2617		let tmp = tempdir().unwrap();
2618		let db_test = EnableCommitPipelineStages::DbFile;
2619		let options = db_test.options(tmp.path(), 5);
2620		let col_nb = 0;
2621
2622		let key1 = b"key1".to_vec();
2623		let key2 = b"key2".to_vec();
2624		let key3 = b"key3".to_vec();
2625
2626		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2627
2628		db.commit(vec![
2629			(col_nb, key1.clone(), Some(b"value1".to_vec())),
2630			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2631			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2632		])
2633		.unwrap();
2634		db_test.run_stages(&db);
2635		drop(db);
2636
2637		// issue with some file reopening when no delay
2638		std::thread::sleep(std::time::Duration::from_millis(100));
2639
2640		let db_test = EnableCommitPipelineStages::CommitOverlay;
2641		let options = db_test.options(tmp.path(), 5);
2642		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2643		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2644		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2645		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2646		db.commit(vec![
2647			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2648			(col_nb, key3.clone(), None),
2649		])
2650		.unwrap();
2651		db_test.run_stages(&db);
2652
2653		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2654		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2655		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2656	}
2657
2658	#[test]
2659	fn test_add_column() {
2660		let tmp = tempdir().unwrap();
2661		let db_test = EnableCommitPipelineStages::DbFile;
2662		let mut options = db_test.options(tmp.path(), 1);
2663		options.salt = Some(options.salt.unwrap_or_default());
2664
2665		let old_col_id = 0;
2666		let new_col_id = 1;
2667		let new_col_indexed_id = 2;
2668
2669		let key1 = b"key1".to_vec();
2670		let key2 = b"key2".to_vec();
2671		let key3 = b"key3".to_vec();
2672
2673		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2674
2675		db.commit(vec![
2676			(old_col_id, key1.clone(), Some(b"value1".to_vec())),
2677			(old_col_id, key2.clone(), Some(b"value2".to_vec())),
2678			(old_col_id, key3.clone(), Some(b"value3".to_vec())),
2679		])
2680		.unwrap();
2681		db_test.run_stages(&db);
2682
2683		drop(db);
2684
2685		Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
2686			.unwrap();
2687
2688		Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
2689			.unwrap();
2690
2691		let mut options = db_test.options(tmp.path(), 3);
2692		options.columns[new_col_indexed_id as usize].btree_index = true;
2693
2694		let db_test = EnableCommitPipelineStages::DbFile;
2695		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2696
2697		// Expected number of columns
2698		assert_eq!(db.num_columns(), 3);
2699
2700		let new_key1 = b"abcdef".to_vec();
2701		let new_key2 = b"123456".to_vec();
2702
2703		// Write to new columns.
2704		db.commit(vec![
2705			(new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
2706			(new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
2707			(new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
2708			(new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
2709		])
2710		.unwrap();
2711		db_test.run_stages(&db);
2712
2713		drop(db);
2714
2715		// Reopen DB and fetch all keys we inserted.
2716		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2717
2718		assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2719		assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2720		assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2721
2722		// Fetch from new columns
2723		assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
2724		assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
2725		assert_eq!(
2726			db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
2727			Some(new_key1.to_vec())
2728		);
2729		assert_eq!(
2730			db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
2731			Some(new_key2.to_vec())
2732		);
2733	}
2734
2735	#[test]
2736	fn test_indexed_btree_1() {
2737		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
2738		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
2739		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
2740		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
2741		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
2742		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
2743		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
2744		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
2745	}
2746	fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
2747		let tmp = tempdir().unwrap();
2748		let col_nb = 0u8;
2749		let mut options = db_test.options(tmp.path(), 5);
2750		options.columns[col_nb as usize].btree_index = true;
2751
2752		let (key1, key2, key3, key4) = if !long_key {
2753			(b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
2754		} else {
2755			let key2 = vec![2; 272];
2756			let mut key3 = key2.clone();
2757			key3[271] = 3;
2758			(vec![1; 953], key2, key3, vec![4; 79])
2759		};
2760
2761		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2762		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2763
2764		let mut iter = db.iter(col_nb).unwrap();
2765		assert_eq!(iter.next().unwrap(), None);
2766		assert_eq!(iter.prev().unwrap(), None);
2767
2768		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2769		db_test.run_stages(&db);
2770
2771		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2772		iter.seek_to_first().unwrap();
2773		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2774		assert_eq!(iter.next().unwrap(), None);
2775		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2776		assert_eq!(iter.prev().unwrap(), None);
2777		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2778		assert_eq!(iter.next().unwrap(), None);
2779
2780		iter.seek_to_first().unwrap();
2781		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2782		assert_eq!(iter.prev().unwrap(), None);
2783
2784		iter.seek(&[0xff]).unwrap();
2785		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2786		assert_eq!(iter.prev().unwrap(), None);
2787
2788		db.commit(vec![
2789			(col_nb, key1.clone(), None),
2790			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2791			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2792		])
2793		.unwrap();
2794		db_test.run_stages(&db);
2795
2796		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2797		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2798		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2799
2800		iter.seek(key2.as_slice()).unwrap();
2801		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2802		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2803		assert_eq!(iter.next().unwrap(), None);
2804
2805		iter.seek(key3.as_slice()).unwrap();
2806		assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2807		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2808		assert_eq!(iter.prev().unwrap(), None);
2809
2810		db.commit(vec![
2811			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2812			(col_nb, key4.clone(), Some(b"value4".to_vec())),
2813			(col_nb, key3.clone(), None),
2814		])
2815		.unwrap();
2816		db_test.run_stages(&db);
2817
2818		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2819		assert_eq!(db.get(col_nb, &key3).unwrap(), None);
2820		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
2821		assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
2822		let mut key22 = key2.clone();
2823		key22.push(2);
2824		iter.seek(key22.as_slice()).unwrap();
2825		assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
2826		assert_eq!(iter.next().unwrap(), None);
2827	}
2828
2829	#[test]
2830	fn test_indexed_btree_2() {
2831		test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
2832		test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
2833	}
2834	fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
2835		let tmp = tempdir().unwrap();
2836		let col_nb = 0u8;
2837		let mut options = db_test.options(tmp.path(), 5);
2838		options.columns[col_nb as usize].btree_index = true;
2839
2840		let key1 = b"key1".to_vec();
2841		let key2 = b"key2".to_vec();
2842		let key3 = b"key3".to_vec();
2843
2844		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2845		let mut iter = db.iter(col_nb).unwrap();
2846		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2847		assert_eq!(iter.next().unwrap(), None);
2848
2849		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2850		EnableCommitPipelineStages::DbFile.run_stages(&db);
2851		drop(db);
2852
2853		// issue with some file reopening when no delay
2854		std::thread::sleep(std::time::Duration::from_millis(100));
2855
2856		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2857
2858		let mut iter = db.iter(col_nb).unwrap();
2859		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2860		iter.seek_to_first().unwrap();
2861		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2862		assert_eq!(iter.next().unwrap(), None);
2863
2864		db.commit(vec![
2865			(col_nb, key1.clone(), None),
2866			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2867			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2868		])
2869		.unwrap();
2870		db_test.run_stages(&db);
2871
2872		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2873		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2874		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2875		iter.seek(key2.as_slice()).unwrap();
2876		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2877		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2878		assert_eq!(iter.next().unwrap(), None);
2879
2880		iter.seek_to_last().unwrap();
2881		assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2882		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2883		assert_eq!(iter.prev().unwrap(), None);
2884	}
2885
2886	#[test]
2887	fn test_indexed_btree_3() {
2888		test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2889		test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2890		test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2891		test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2892	}
2893
2894	fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2895		use rand::SeedableRng;
2896
2897		use std::collections::BTreeSet;
2898
2899		let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2900
2901		let tmp = tempdir().unwrap();
2902		let col_nb = 0u8;
2903		let mut options = db_test.options(tmp.path(), 5);
2904		options.columns[col_nb as usize].btree_index = true;
2905
2906		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2907
2908		db.commit(
2909			(0u64..1024)
2910				.map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2911				.chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2912		)
2913		.unwrap();
2914		let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2915		let mut iter = db.iter(0).unwrap();
2916
2917		for _ in 0..100 {
2918			let at = rng.random_range(0u64..=1024);
2919			iter.seek(&at.to_be_bytes()).unwrap();
2920
2921			let mut prev_run: bool = rng.random();
2922			let at = if prev_run {
2923				let take = rng.random_range(1..100);
2924				let got = std::iter::from_fn(|| iter.next().unwrap())
2925					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2926					.take(take)
2927					.collect::<Vec<_>>();
2928				let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2929				assert_eq!(got, expected);
2930				if got.is_empty() {
2931					prev_run = false;
2932				}
2933				if got.len() < take {
2934					prev_run = false;
2935				}
2936				expected.last().copied().unwrap_or(at)
2937			} else {
2938				at
2939			};
2940
2941			let at = {
2942				let take = rng.random_range(1..100);
2943				let got = std::iter::from_fn(|| iter.prev().unwrap())
2944					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2945					.take(take)
2946					.collect::<Vec<_>>();
2947				let expected = if prev_run {
2948					expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2949				} else {
2950					expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2951				};
2952				assert_eq!(got, expected);
2953				prev_run = !got.is_empty();
2954				if take > got.len() {
2955					prev_run = false;
2956				}
2957				expected.last().copied().unwrap_or(at)
2958			};
2959
2960			let take = rng.random_range(1..100);
2961			let mut got = std::iter::from_fn(|| iter.next().unwrap())
2962				.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2963				.take(take)
2964				.collect::<Vec<_>>();
2965			let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2966			if prev_run {
2967				expected = expected.split_off(1);
2968				if got.len() == take {
2969					got.pop();
2970				}
2971			}
2972			assert_eq!(got, expected);
2973		}
2974
2975		let take = rng.random_range(20..100);
2976		iter.seek_to_last().unwrap();
2977		let got = std::iter::from_fn(|| iter.prev().unwrap())
2978			.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2979			.take(take)
2980			.collect::<Vec<_>>();
2981		let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2982		assert_eq!(got, expected);
2983	}
2984
2985	fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2986		test_basic_inner(change_set, false, false);
2987		test_basic_inner(change_set, false, true);
2988		test_basic_inner(change_set, true, false);
2989		test_basic_inner(change_set, true, true);
2990	}
2991
2992	fn test_basic_inner(
2993		change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2994		btree_index: bool,
2995		ref_counted: bool,
2996	) {
2997		let tmp = tempdir().unwrap();
2998		let col_nb = 1u8;
2999		let db_test = EnableCommitPipelineStages::DbFile;
3000		let mut options = db_test.options(tmp.path(), 2);
3001		options.columns[col_nb as usize].btree_index = btree_index;
3002		options.columns[col_nb as usize].ref_counted = ref_counted;
3003		options.columns[col_nb as usize].preimage = ref_counted;
3004		// ref counted and commit overlay currently don't support removal
3005		assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
3006		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3007
3008		let iter = btree_index.then(|| db.iter(col_nb).unwrap());
3009		assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
3010
3011		db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
3012			.unwrap();
3013		db_test.run_stages(&db);
3014
3015		let mut keys = HashSet::new();
3016		let mut expected_count: u64 = 0;
3017		for (k, v) in change_set.iter() {
3018			if v.is_some() {
3019				if keys.insert(k) {
3020					expected_count += 1;
3021				}
3022			} else if keys.remove(k) {
3023				expected_count -= 1;
3024			}
3025		}
3026		if ref_counted {
3027			let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
3028			for (k, v) in change_set.iter() {
3029				let mut remove = false;
3030				let mut insert = false;
3031				match state.get_mut(k) {
3032					Some(Some((_, counter))) =>
3033						if v.is_some() {
3034							*counter += 1;
3035						} else if *counter == 1 {
3036							remove = true;
3037						} else {
3038							*counter -= 1;
3039						},
3040					Some(None) | None =>
3041						if v.is_some() {
3042							insert = true;
3043						},
3044				}
3045				if insert {
3046					state.insert(k.clone(), v.clone().map(|v| (v, 1)));
3047				}
3048				if remove {
3049					state.remove(k);
3050				}
3051			}
3052			for (key, value) in state {
3053				assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
3054			}
3055		} else {
3056			let stats = db.stats();
3057			// btree do not have stats implemented
3058			if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
3059				assert_eq!(stats.total_values, expected_count);
3060			}
3061
3062			let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
3063				change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
3064			for (key, value) in state.iter() {
3065				assert_eq!(&db.get(col_nb, key).unwrap(), value);
3066			}
3067		}
3068	}
3069
3070	#[test]
3071	fn test_random() {
3072		fdlimit::raise_fd_limit().unwrap();
3073		for i in 0..100 {
3074			test_random_inner(60, 60, i);
3075		}
3076		for i in 0..50 {
3077			test_random_inner(20, 60, i);
3078		}
3079	}
3080	fn test_random_inner(size: usize, key_size: usize, seed: u64) {
3081		use rand::{RngCore, SeedableRng};
3082		let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
3083		let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
3084		for i in 0..size {
3085			let nb_delete: u32 = rng.next_u32(); // should be out of loop, yet it makes alternance insert/delete in some case.
3086			let nb_delete = (nb_delete as usize % size) / 2;
3087			let mut key = vec![0u8; key_size];
3088			rng.fill_bytes(&mut key[..]);
3089			let value = if i > size - nb_delete {
3090				let random_key = rng.next_u32();
3091				let random_key = (random_key % 4) > 0;
3092				if !random_key {
3093					key = data[i - size / 2].0.clone();
3094				}
3095				None
3096			} else {
3097				Some(key.clone())
3098			};
3099			let var_keysize = rng.next_u32();
3100			let var_keysize = var_keysize as usize % (key_size / 2);
3101			key.truncate(key_size - var_keysize);
3102			data.push((key, value));
3103		}
3104		test_basic(&data[..]);
3105	}
3106
3107	#[test]
3108	fn test_simple() {
3109		test_basic(&[
3110			(b"key1".to_vec(), Some(b"value1".to_vec())),
3111			(b"key1".to_vec(), Some(b"value1".to_vec())),
3112			(b"key1".to_vec(), None),
3113		]);
3114		test_basic(&[
3115			(b"key1".to_vec(), Some(b"value1".to_vec())),
3116			(b"key1".to_vec(), Some(b"value1".to_vec())),
3117			(b"key1".to_vec(), None),
3118			(b"key1".to_vec(), None),
3119		]);
3120		test_basic(&[
3121			(b"key1".to_vec(), Some(b"value1".to_vec())),
3122			(b"key1".to_vec(), Some(b"value2".to_vec())),
3123		]);
3124		test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
3125		test_basic(&[
3126			(b"key1".to_vec(), Some(b"value1".to_vec())),
3127			(b"key2".to_vec(), Some(b"value2".to_vec())),
3128		]);
3129		test_basic(&[
3130			(b"key1".to_vec(), Some(b"value1".to_vec())),
3131			(b"key2".to_vec(), Some(b"value2".to_vec())),
3132			(b"key3".to_vec(), Some(b"value3".to_vec())),
3133		]);
3134		test_basic(&[
3135			(b"key1".to_vec(), Some(b"value1".to_vec())),
3136			(b"key3".to_vec(), Some(b"value3".to_vec())),
3137			(b"key2".to_vec(), Some(b"value2".to_vec())),
3138		]);
3139		test_basic(&[
3140			(b"key3".to_vec(), Some(b"value3".to_vec())),
3141			(b"key2".to_vec(), Some(b"value2".to_vec())),
3142			(b"key1".to_vec(), Some(b"value1".to_vec())),
3143		]);
3144		test_basic(&[
3145			(b"key1".to_vec(), Some(b"value1".to_vec())),
3146			(b"key2".to_vec(), Some(b"value2".to_vec())),
3147			(b"key3".to_vec(), Some(b"value3".to_vec())),
3148			(b"key4".to_vec(), Some(b"value4".to_vec())),
3149		]);
3150		test_basic(&[
3151			(b"key1".to_vec(), Some(b"value1".to_vec())),
3152			(b"key2".to_vec(), Some(b"value2".to_vec())),
3153			(b"key3".to_vec(), Some(b"value3".to_vec())),
3154			(b"key4".to_vec(), Some(b"value4".to_vec())),
3155			(b"key5".to_vec(), Some(b"value5".to_vec())),
3156		]);
3157		test_basic(&[
3158			(b"key5".to_vec(), Some(b"value5".to_vec())),
3159			(b"key3".to_vec(), Some(b"value3".to_vec())),
3160			(b"key4".to_vec(), Some(b"value4".to_vec())),
3161			(b"key2".to_vec(), Some(b"value2".to_vec())),
3162			(b"key1".to_vec(), Some(b"value1".to_vec())),
3163		]);
3164		test_basic(&[
3165			(b"key5".to_vec(), Some(b"value5".to_vec())),
3166			(b"key3".to_vec(), Some(b"value3".to_vec())),
3167			(b"key4".to_vec(), Some(b"value4".to_vec())),
3168			(b"key2".to_vec(), Some(b"value2".to_vec())),
3169			(b"key1".to_vec(), Some(b"value1".to_vec())),
3170			(b"key11".to_vec(), Some(b"value31".to_vec())),
3171			(b"key12".to_vec(), Some(b"value32".to_vec())),
3172		]);
3173		test_basic(&[
3174			(b"key5".to_vec(), Some(b"value5".to_vec())),
3175			(b"key3".to_vec(), Some(b"value3".to_vec())),
3176			(b"key4".to_vec(), Some(b"value4".to_vec())),
3177			(b"key2".to_vec(), Some(b"value2".to_vec())),
3178			(b"key1".to_vec(), Some(b"value1".to_vec())),
3179			(b"key51".to_vec(), Some(b"value31".to_vec())),
3180			(b"key52".to_vec(), Some(b"value32".to_vec())),
3181		]);
3182		test_basic(&[
3183			(b"key5".to_vec(), Some(b"value5".to_vec())),
3184			(b"key3".to_vec(), Some(b"value3".to_vec())),
3185			(b"key4".to_vec(), Some(b"value4".to_vec())),
3186			(b"key2".to_vec(), Some(b"value2".to_vec())),
3187			(b"key1".to_vec(), Some(b"value1".to_vec())),
3188			(b"key31".to_vec(), Some(b"value31".to_vec())),
3189			(b"key32".to_vec(), Some(b"value32".to_vec())),
3190		]);
3191		test_basic(&[
3192			(b"key1".to_vec(), Some(b"value5".to_vec())),
3193			(b"key2".to_vec(), Some(b"value3".to_vec())),
3194			(b"key3".to_vec(), Some(b"value4".to_vec())),
3195			(b"key4".to_vec(), Some(b"value7".to_vec())),
3196			(b"key5".to_vec(), Some(b"value2".to_vec())),
3197			(b"key6".to_vec(), Some(b"value1".to_vec())),
3198			(b"key3".to_vec(), None),
3199		]);
3200		test_basic(&[
3201			(b"key1".to_vec(), Some(b"value5".to_vec())),
3202			(b"key2".to_vec(), Some(b"value3".to_vec())),
3203			(b"key3".to_vec(), Some(b"value4".to_vec())),
3204			(b"key4".to_vec(), Some(b"value7".to_vec())),
3205			(b"key5".to_vec(), Some(b"value2".to_vec())),
3206			(b"key0".to_vec(), Some(b"value1".to_vec())),
3207			(b"key3".to_vec(), None),
3208		]);
3209		test_basic(&[
3210			(b"key1".to_vec(), Some(b"value5".to_vec())),
3211			(b"key2".to_vec(), Some(b"value3".to_vec())),
3212			(b"key3".to_vec(), Some(b"value4".to_vec())),
3213			(b"key4".to_vec(), Some(b"value7".to_vec())),
3214			(b"key5".to_vec(), Some(b"value2".to_vec())),
3215			(b"key3".to_vec(), None),
3216		]);
3217		test_basic(&[
3218			(b"key1".to_vec(), Some(b"value5".to_vec())),
3219			(b"key4".to_vec(), Some(b"value3".to_vec())),
3220			(b"key5".to_vec(), Some(b"value4".to_vec())),
3221			(b"key6".to_vec(), Some(b"value4".to_vec())),
3222			(b"key7".to_vec(), Some(b"value2".to_vec())),
3223			(b"key8".to_vec(), Some(b"value1".to_vec())),
3224			(b"key5".to_vec(), None),
3225		]);
3226		test_basic(&[
3227			(b"key1".to_vec(), Some(b"value5".to_vec())),
3228			(b"key4".to_vec(), Some(b"value3".to_vec())),
3229			(b"key5".to_vec(), Some(b"value4".to_vec())),
3230			(b"key7".to_vec(), Some(b"value2".to_vec())),
3231			(b"key8".to_vec(), Some(b"value1".to_vec())),
3232			(b"key3".to_vec(), None),
3233		]);
3234		test_basic(&[
3235			(b"key5".to_vec(), Some(b"value5".to_vec())),
3236			(b"key3".to_vec(), Some(b"value3".to_vec())),
3237			(b"key4".to_vec(), Some(b"value4".to_vec())),
3238			(b"key2".to_vec(), Some(b"value2".to_vec())),
3239			(b"key1".to_vec(), Some(b"value1".to_vec())),
3240			(b"key5".to_vec(), None),
3241			(b"key3".to_vec(), None),
3242		]);
3243		test_basic(&[
3244			(b"key5".to_vec(), Some(b"value5".to_vec())),
3245			(b"key3".to_vec(), Some(b"value3".to_vec())),
3246			(b"key4".to_vec(), Some(b"value4".to_vec())),
3247			(b"key2".to_vec(), Some(b"value2".to_vec())),
3248			(b"key1".to_vec(), Some(b"value1".to_vec())),
3249			(b"key5".to_vec(), None),
3250			(b"key3".to_vec(), None),
3251			(b"key2".to_vec(), None),
3252			(b"key4".to_vec(), None),
3253		]);
3254		test_basic(&[
3255			(b"key5".to_vec(), Some(b"value5".to_vec())),
3256			(b"key3".to_vec(), Some(b"value3".to_vec())),
3257			(b"key4".to_vec(), Some(b"value4".to_vec())),
3258			(b"key2".to_vec(), Some(b"value2".to_vec())),
3259			(b"key1".to_vec(), Some(b"value1".to_vec())),
3260			(b"key5".to_vec(), None),
3261			(b"key3".to_vec(), None),
3262			(b"key2".to_vec(), None),
3263			(b"key4".to_vec(), None),
3264			(b"key1".to_vec(), None),
3265		]);
3266		test_basic(&[
3267			([5u8; 250].to_vec(), Some(b"value5".to_vec())),
3268			([5u8; 200].to_vec(), Some(b"value3".to_vec())),
3269			([5u8; 100].to_vec(), Some(b"value4".to_vec())),
3270			([5u8; 150].to_vec(), Some(b"value2".to_vec())),
3271			([5u8; 101].to_vec(), Some(b"value1".to_vec())),
3272			([5u8; 250].to_vec(), None),
3273			([5u8; 101].to_vec(), None),
3274		]);
3275	}
3276
3277	#[test]
3278	fn test_btree_iter() {
3279		let col_nb = 0;
3280		let mut data_start = Vec::new();
3281		for i in 0u8..100 {
3282			let mut key = b"key0".to_vec();
3283			key[3] = i;
3284			let mut value = b"val0".to_vec();
3285			value[3] = i;
3286			data_start.push((col_nb, key, Some(value)));
3287		}
3288		let mut data_change = Vec::new();
3289		for i in 0u8..100 {
3290			let mut key = b"key0".to_vec();
3291			if i % 2 == 0 {
3292				key[2] = i;
3293				let mut value = b"val0".to_vec();
3294				value[2] = i;
3295				data_change.push((col_nb, key, Some(value)));
3296			} else if i % 3 == 0 {
3297				key[3] = i;
3298				data_change.push((col_nb, key, None));
3299			} else {
3300				key[3] = i;
3301				let mut value = b"val0".to_vec();
3302				value[2] = i;
3303				data_change.push((col_nb, key, Some(value)));
3304			}
3305		}
3306
3307		let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3308			data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3309		let mut end_state = start_state.clone();
3310		for (_c, k, v) in data_change.iter() {
3311			if let Some(v) = v {
3312				end_state.insert(k.clone(), v.clone());
3313			} else {
3314				end_state.remove(k);
3315			}
3316		}
3317
3318		for stage in [
3319			EnableCommitPipelineStages::CommitOverlay,
3320			EnableCommitPipelineStages::LogOverlay,
3321			EnableCommitPipelineStages::DbFile,
3322			EnableCommitPipelineStages::Standard,
3323		] {
3324			for i in 0..10 {
3325				test_btree_iter_inner(
3326					stage,
3327					&data_start,
3328					&data_change,
3329					&start_state,
3330					&end_state,
3331					i * 5,
3332				);
3333			}
3334			let data_start = vec![
3335				(0, b"key1".to_vec(), Some(b"val1".to_vec())),
3336				(0, b"key3".to_vec(), Some(b"val3".to_vec())),
3337			];
3338			let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
3339			let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3340				data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3341			let mut end_state = start_state.clone();
3342			for (_c, k, v) in data_change.iter() {
3343				if let Some(v) = v {
3344					end_state.insert(k.clone(), v.clone());
3345				} else {
3346					end_state.remove(k);
3347				}
3348			}
3349			test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
3350		}
3351	}
3352	fn test_btree_iter_inner(
3353		db_test: EnableCommitPipelineStages,
3354		data_start: &[(u8, Vec<u8>, Option<Value>)],
3355		data_change: &[(u8, Vec<u8>, Option<Value>)],
3356		start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3357		end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3358		commit_at: usize,
3359	) {
3360		let tmp = tempdir().unwrap();
3361		let mut options = db_test.options(tmp.path(), 5);
3362		let col_nb = 0;
3363		options.columns[col_nb as usize].btree_index = true;
3364		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3365
3366		db.commit(data_start.iter().cloned()).unwrap();
3367		db_test.run_stages(&db);
3368
3369		let mut iter = db.iter(col_nb).unwrap();
3370		let mut iter_state = start_state.iter();
3371		let mut last_key = Value::new();
3372		for _ in 0..commit_at {
3373			let next = iter.next().unwrap();
3374			if let Some((k, _)) = next.as_ref() {
3375				last_key = k.clone();
3376			}
3377			assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
3378		}
3379
3380		db.commit(data_change.iter().cloned()).unwrap();
3381		db_test.run_stages(&db);
3382
3383		let mut iter_state = end_state.range(last_key.clone()..);
3384		for _ in commit_at..100 {
3385			let mut state_next = iter_state.next();
3386			if let Some((k, _v)) = state_next.as_ref() {
3387				if *k == &last_key {
3388					state_next = iter_state.next();
3389				}
3390			}
3391			let iter_next = iter.next().unwrap();
3392			assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
3393		}
3394		let mut iter_state_rev = end_state.iter().rev();
3395		let mut iter = db.iter(col_nb).unwrap();
3396		iter.seek_to_last().unwrap();
3397		for _ in 0..100 {
3398			let next = iter.prev().unwrap();
3399			assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
3400		}
3401	}
3402
3403	#[cfg(feature = "instrumentation")]
3404	#[test]
3405	fn test_recover_from_log_on_error() {
3406		let tmp = tempdir().unwrap();
3407		let mut options = Options::with_columns(tmp.path(), 1);
3408		options.always_flush = true;
3409		options.with_background_thread = false;
3410
3411		// We do 2 commits and we fail while enacting the second one
3412		{
3413			let db = Db::open_or_create(&options).unwrap();
3414			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3415			db.process_commits().unwrap();
3416			db.flush_logs().unwrap();
3417			db.enact_logs().unwrap();
3418			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3419			db.process_commits().unwrap();
3420			db.flush_logs().unwrap();
3421			crate::set_number_of_allowed_io_operations(4);
3422
3423			// Set the background error explicitly as background threads are disabled in tests.
3424			let err = db.enact_logs();
3425			assert!(err.is_err());
3426			db.inner.store_err(err);
3427			crate::set_number_of_allowed_io_operations(usize::MAX);
3428		}
3429
3430		// Open the databases and check that both values are there.
3431		{
3432			let db = Db::open(&options).unwrap();
3433			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3434			assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
3435		}
3436	}
3437
3438	#[cfg(feature = "instrumentation")]
3439	#[test]
3440	fn test_partial_log_recovery() {
3441		let tmp = tempdir().unwrap();
3442		let mut options = Options::with_columns(tmp.path(), 1);
3443		options.columns[0].btree_index = true;
3444		options.always_flush = true;
3445		options.with_background_thread = false;
3446
3447		// We do 2 commits and we fail while writing the second one
3448		{
3449			let db = Db::open_or_create(&options).unwrap();
3450			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3451			db.process_commits().unwrap();
3452			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3453			crate::set_number_of_allowed_io_operations(4);
3454			assert!(db.process_commits().is_err());
3455			crate::set_number_of_allowed_io_operations(usize::MAX);
3456			db.flush_logs().unwrap();
3457		}
3458
3459		// We open a first time, the first value is there
3460		{
3461			let db = Db::open(&options).unwrap();
3462			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3463		}
3464
3465		// We open a second time, the first value should be still there
3466		{
3467			let db = Db::open(&options).unwrap();
3468			assert!(db.get(0, &[0]).unwrap().is_some());
3469		}
3470	}
3471
3472	#[cfg(feature = "instrumentation")]
3473	#[test]
3474	fn test_continue_reindex() {
3475		let _ = env_logger::try_init();
3476		let tmp = tempdir().unwrap();
3477		let mut options = Options::with_columns(tmp.path(), 1);
3478		options.columns[0].preimage = true;
3479		options.columns[0].uniform = true;
3480		options.always_flush = true;
3481		options.with_background_thread = false;
3482		options.salt = Some(Default::default());
3483
3484		{
3485			// Force a reindex by committing more than 64 values with the same 16 bit prefix
3486			let db = Db::open_or_create(&options).unwrap();
3487			let commit: Vec<_> = (0..65u32)
3488				.map(|index| {
3489					let mut key = [0u8; 32];
3490					key[2] = (index as u8) << 1;
3491					(0, key.to_vec(), Some(vec![index as u8]))
3492				})
3493				.collect();
3494			db.commit(commit).unwrap();
3495
3496			db.process_commits().unwrap();
3497			db.flush_logs().unwrap();
3498			db.enact_logs().unwrap();
3499			// i16 now contains 64 values and i17 contains a single value that did not fit
3500
3501			// Simulate interrupted reindex by processing it first and then restoring the old index
3502			// file. Make a copy of the index file first.
3503			std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
3504				.unwrap();
3505			db.process_reindex().unwrap();
3506			db.flush_logs().unwrap();
3507			db.enact_logs().unwrap();
3508			db.clean_logs().unwrap();
3509			std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
3510				.unwrap();
3511		}
3512
3513		// Reopen the database which should load the reindex.
3514		{
3515			let db = Db::open(&options).unwrap();
3516			db.process_reindex().unwrap();
3517			let mut entries = 0;
3518			db.iter_column_while(0, |_| {
3519				entries += 1;
3520				true
3521			})
3522			.unwrap();
3523
3524			assert_eq!(entries, 65);
3525			assert_eq!(db.inner.columns[0].index_bits(), Some(17));
3526		}
3527	}
3528
3529	#[test]
3530	fn test_remove_column() {
3531		let tmp = tempdir().unwrap();
3532		let db_test_file = EnableCommitPipelineStages::DbFile;
3533		let mut options_db_files = db_test_file.options(tmp.path(), 2);
3534		options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
3535		let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
3536		options_std.salt = options_db_files.salt.clone();
3537
3538		let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
3539
3540		let payload: Vec<(u8, _, _)> = (0u16..100)
3541			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3542			.collect();
3543
3544		db.commit(payload.clone()).unwrap();
3545
3546		db_test_file.run_stages(&db);
3547		drop(db);
3548
3549		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3550		for (col, key, value) in payload.iter() {
3551			assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
3552		}
3553		drop(db);
3554		Db::reset_column(&mut options_db_files, 1, None).unwrap();
3555
3556		let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
3557		for (col, key, _value) in payload.iter() {
3558			assert_eq!(db.get(*col, key).unwrap(), None);
3559		}
3560
3561		let payload: Vec<(u8, _, _)> = (0u16..10)
3562			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3563			.collect();
3564
3565		db.commit(payload.clone()).unwrap();
3566
3567		db_test_file.run_stages(&db);
3568		drop(db);
3569
3570		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3571		let payload: Vec<(u8, _, _)> = (10u16..100)
3572			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3573			.collect();
3574
3575		db.commit(payload.clone()).unwrap();
3576		assert!(db.iter(1).is_err());
3577
3578		drop(db);
3579
3580		let mut col_option = options_std.columns[1].clone();
3581		col_option.btree_index = true;
3582		Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
3583
3584		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3585		let payload: Vec<(u8, _, _)> = (0u16..10)
3586			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3587			.collect();
3588
3589		db.commit(payload.clone()).unwrap();
3590		assert!(db.iter(1).is_ok());
3591	}
3592}