Skip to main content

parity_db/
db.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4//! The database objects is split into `Db` and `DbInner`.
5//! `Db` creates shared `DbInner` instance and manages background
6//! worker threads that all use the inner object.
7//!
8//! There are 4 worker threads:
9//! log_worker: Processes commit queue and reindexing. For each commit
10//! in the queue, log worker creates a write-ahead record using `Log`.
11//! Additionally, if there are active reindexing, it creates log records
12//! for batches of relocated index entries.
13//! flush_worker: Flushes log records to disk by calling `fsync` on the
14//! log files.
15//! commit_worker: Reads flushed log records and applies operations to the
16//! index and value tables.
17//! cleanup_worker: Flush tables by calling `fsync`, and cleanup log.
18//! Each background worker is signalled with a conditional variable once
19//! there is some work to be done.
20
21use crate::{
22	btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23	column::{
24		hash_key, unpack_node_children, unpack_node_data, ColId, Column, HashColumn, IterState,
25		ReindexBatch, ValueIterState,
26	},
27	error::{try_io, Error, Result},
28	hash::IdentityBuildHasher,
29	index::{Address, PlanOutcome},
30	log::{Log, LogAction},
31	multitree::{Children, NewNode, NodeAddress},
32	options::{Options, CURRENT_VERSION},
33	parking_lot::{
34		Condvar, Mutex, MutexGuard, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard,
35	},
36	stats::StatSummary,
37	ColumnOptions, Key,
38};
39#[cfg(feature = "bytes")]
40use bytes::Bytes;
41use fs2::FileExt;
42use std::{
43	borrow::Borrow,
44	collections::{BTreeMap, HashMap, HashSet, VecDeque},
45	ops::Bound,
46	sync::{
47		atomic::{AtomicBool, AtomicU64, Ordering},
48		Arc, Weak,
49	},
50	thread,
51};
52
53// Max size of commit queue. (Keys + Values). If the queue is
54// full `commit` will block.
55// These are in memory, so we use usize
56const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
57// Max size of log overlay. If the overlay is full, processing
58// of commit queue is blocked.
59const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
60// Minimum size of log file before it is considered full.
61const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
62// Number of log files to keep after flush when sync mode is disabled. Give the database some chance
63// to recover in case of crash.
64const KEEP_LOGS: usize = 16;
65// Hard limit on the number of log files in sync mode. The number of log may grow while existing
66// logs are waiting on fsync. Commits will be throttled if total number of log files exceeds this
67// number.
68const MAX_LOG_FILES: usize = 4;
69
70/// Value is just a vector of bytes. Value sizes up to 4Gb are allowed.
71pub type Value = Vec<u8>;
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
74pub enum RcValue {
75	#[cfg(feature = "arc")]
76	Arc(Arc<Value>),
77	#[cfg(feature = "bytes")]
78	Bytes(Bytes),
79}
80
81impl AsRef<[u8]> for RcValue {
82	fn as_ref(&self) -> &[u8] {
83		match self {
84			#[cfg(feature = "arc")]
85			Self::Arc(arc) => arc.as_ref(),
86			#[cfg(feature = "bytes")]
87			Self::Bytes(bytes) => bytes.as_ref(),
88		}
89	}
90}
91
92impl Borrow<[u8]> for RcValue {
93	fn borrow(&self) -> &[u8] {
94		self.as_ref()
95	}
96}
97
98#[cfg(feature = "arc")]
99impl From<Value> for RcValue {
100	fn from(value: Value) -> Self {
101		Self::Arc(value.into())
102	}
103}
104
105#[cfg(not(feature = "arc"))]
106impl From<Value> for RcValue {
107	fn from(value: Value) -> Self {
108		Self::Bytes(value.into())
109	}
110}
111
112#[cfg(feature = "arc")]
113impl From<Arc<Value>> for RcValue {
114	fn from(value: Arc<Value>) -> Self {
115		Self::Arc(value)
116	}
117}
118
119#[cfg(feature = "bytes")]
120impl From<Bytes> for RcValue {
121	fn from(value: Bytes) -> Self {
122		Self::Bytes(value)
123	}
124}
125
126#[cfg(test)]
127impl<const N: usize> TryFrom<RcValue> for [u8; N] {
128	type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
129
130	fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
131		value.as_ref().to_vec().try_into()
132	}
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
136pub struct RcKey(Arc<Vec<u8>>);
137
138impl AsRef<[u8]> for RcKey {
139	fn as_ref(&self) -> &[u8] {
140		self.0.as_ref()
141	}
142}
143
144impl Borrow<[u8]> for RcKey {
145	fn borrow(&self) -> &[u8] {
146		self.as_ref()
147	}
148}
149
150impl From<Vec<u8>> for RcKey {
151	fn from(key: Vec<u8>) -> Self {
152		Self(key.into())
153	}
154}
155
156// Commit data passed to `commit`
157#[derive(Debug, Default)]
158struct Commit {
159	// Commit ID. This is not the same as log record id, as some records
160	// are originated within the DB. E.g. reindex.
161	id: u64,
162	// Size of user data pending insertion (keys + values) or
163	// removal (keys)
164	bytes: usize,
165	// Operations.
166	changeset: CommitChangeSet,
167}
168
169// Pending commits. This may not grow beyond `MAX_COMMIT_QUEUE_BYTES` bytes.
170#[derive(Debug, Default)]
171struct CommitQueue {
172	// Log record.
173	record_id: u64,
174	// Total size of all commits in the queue.
175	bytes: usize,
176	// FIFO queue.
177	commits: VecDeque<Commit>,
178}
179
180#[derive(Debug)]
181struct Trees {
182	readers: HashMap<Key, Weak<RwLock<Box<dyn TreeReader + Send + Sync>>>, IdentityBuildHasher>,
183	/// Number of queued dereferences for each tree
184	to_dereference: HashMap<Key, usize>,
185}
186
187#[derive(Debug)]
188struct DbInner {
189	columns: Vec<Column>,
190	options: Options,
191	shutdown: AtomicBool,
192	log: Log,
193	commit_queue: Mutex<CommitQueue>,
194	commit_queue_full_cv: Condvar,
195	log_worker_wait: WaitCondvar<bool>,
196	commit_worker_wait: Arc<WaitCondvar<bool>>,
197	// Overlay of most recent values in the commit queue.
198	commit_overlay: RwLock<Vec<CommitOverlay>>,
199	trees: RwLock<HashMap<ColId, Trees>>,
200	// This may underflow occasionally, but is bound for 0 eventually.
201	log_queue_wait: WaitCondvar<i64>,
202	flush_worker_wait: Arc<WaitCondvar<bool>>,
203	cleanup_worker_wait: WaitCondvar<bool>,
204	cleanup_queue_wait: WaitCondvar<bool>,
205	iteration_lock: Mutex<()>,
206	last_enacted: AtomicU64,
207	next_reindex: AtomicU64,
208	bg_err: Mutex<Option<Arc<Error>>>,
209	db_version: u32,
210	lock_file: std::fs::File,
211}
212
213#[derive(Debug)]
214struct WaitCondvar<S> {
215	cv: Condvar,
216	work: Mutex<S>,
217}
218
219impl<S: Default> WaitCondvar<S> {
220	fn new() -> Self {
221		WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
222	}
223}
224
225impl WaitCondvar<bool> {
226	fn signal(&self) {
227		let mut work = self.work.lock();
228		*work = true;
229		self.cv.notify_one();
230	}
231
232	pub fn wait(&self) {
233		let mut work = self.work.lock();
234		while !*work {
235			self.cv.wait(&mut work)
236		}
237		*work = false;
238	}
239}
240
241impl DbInner {
242	fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
243		if opening_mode == OpeningMode::Create {
244			try_io!(std::fs::create_dir_all(&options.path));
245		} else if !options.path.is_dir() {
246			return Err(Error::DatabaseNotFound)
247		}
248
249		let mut lock_path: std::path::PathBuf = options.path.clone();
250		lock_path.push("lock");
251		let lock_file = try_io!(std::fs::OpenOptions::new()
252			.create(true)
253			.read(true)
254			.write(true)
255			.open(lock_path.as_path()));
256		lock_file.try_lock_exclusive().map_err(Error::Locked)?;
257
258		let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
259		let mut columns = Vec::with_capacity(metadata.columns.len());
260		let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
261		let log = Log::open(options)?;
262		let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
263		for c in 0..metadata.columns.len() {
264			let column = Column::open(c as ColId, options, &metadata)?;
265			commit_overlay.push(CommitOverlay::new());
266			columns.push(column);
267		}
268		log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
269		let mut options = options.clone();
270		if options.salt.is_none() {
271			options.salt = Some(metadata.salt);
272		}
273
274		Ok(DbInner {
275			columns,
276			options,
277			shutdown: AtomicBool::new(false),
278			log,
279			commit_queue: Mutex::new(Default::default()),
280			commit_queue_full_cv: Condvar::new(),
281			log_worker_wait: WaitCondvar::new(),
282			commit_worker_wait: Arc::new(WaitCondvar::new()),
283			commit_overlay: RwLock::new(commit_overlay),
284			trees: RwLock::new(Default::default()),
285			log_queue_wait: WaitCondvar::new(),
286			flush_worker_wait: Arc::new(WaitCondvar::new()),
287			cleanup_worker_wait: WaitCondvar::new(),
288			cleanup_queue_wait: WaitCondvar::new(),
289			iteration_lock: Mutex::new(()),
290			next_reindex: AtomicU64::new(1),
291			last_enacted: AtomicU64::new(last_enacted),
292			bg_err: Mutex::new(None),
293			db_version: metadata.version,
294			lock_file,
295		})
296	}
297
298	fn init_table_data(&mut self) -> Result<()> {
299		for column in &mut self.columns {
300			column.init_table_data()?;
301		}
302		Ok(())
303	}
304
305	fn get(&self, col: ColId, key: &[u8], external_call: bool) -> Result<Option<Value>> {
306		if self.options.columns[col as usize].multitree && external_call {
307			return Err(Error::InvalidConfiguration(
308				"get not supported for multitree columns.".to_string(),
309			))
310		}
311		match &self.columns[col as usize] {
312			Column::Hash(column) => {
313				let key = column.hash_key(key);
314				let overlay = self.commit_overlay.read();
315				// Check commit overlay first
316				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
317					return Ok(v.map(|i| i.as_ref().to_vec()))
318				}
319				std::mem::drop(overlay);
320				// Go into tables and log overlay.
321				let log = self.log.overlays();
322				Ok(column.get(&key, log)?.map(|(v, _rc)| v))
323			},
324			Column::Tree(column) => {
325				let overlay = self.commit_overlay.read();
326				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
327					return Ok(l.map(|i| i.as_ref().to_vec()))
328				}
329				std::mem::drop(overlay);
330				// We lock log, if btree structure changed while reading that would be an issue.
331				let log = self.log.overlays().read();
332				column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
333			},
334		}
335	}
336
337	fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
338		if self.options.columns[col as usize].multitree {
339			return Err(Error::InvalidConfiguration(
340				"get_size not supported for multitree columns.".to_string(),
341			))
342		}
343		match &self.columns[col as usize] {
344			Column::Hash(column) => {
345				let key = column.hash_key(key);
346				let overlay = self.commit_overlay.read();
347				// Check commit overlay first
348				if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
349					return Ok(l)
350				}
351				// Go into tables and log overlay.
352				let log = self.log.overlays();
353				column.get_size(&key, log)
354			},
355			Column::Tree(column) => {
356				let overlay = self.commit_overlay.read();
357				if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
358					return Ok(l.map(|v| v.as_ref().len() as u32))
359				}
360				let log = self.log.overlays().read();
361				let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
362				Ok(l.map(|v| v.len() as u32))
363			},
364		}
365	}
366
367	fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
368		if !self.options.columns[col as usize].multitree {
369			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
370		}
371		if !self.options.columns[col as usize].append_only &&
372			!self.options.columns[col as usize].allow_direct_node_access
373		{
374			return Err(Error::InvalidConfiguration(
375				"get_root can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
376			))
377		}
378		let value = self.get(col, key, false)?;
379		if let Some(data) = value {
380			return Ok(Some(unpack_node_data(data)?))
381		}
382		Ok(None)
383	}
384
385	fn get_node(
386		&self,
387		col: ColId,
388		node_address: NodeAddress,
389		external_call: bool,
390	) -> Result<Option<(Vec<u8>, Children)>> {
391		if !self.options.columns[col as usize].multitree {
392			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
393		}
394		if !self.options.columns[col as usize].append_only &&
395			!self.options.columns[col as usize].allow_direct_node_access &&
396			external_call
397		{
398			return Err(Error::InvalidConfiguration(
399				"get_node can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
400			))
401		}
402		match &self.columns[col as usize] {
403			Column::Hash(column) => {
404				let overlay = self.commit_overlay.read();
405				// Check commit overlay first
406				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
407				{
408					return Ok(Some(unpack_node_data(v.as_ref().to_vec())?))
409				}
410				let log = self.log.overlays();
411				let value = column.get_value(Address::from_u64(node_address), log)?;
412				if let Some(data) = value {
413					return Ok(Some(unpack_node_data(data)?))
414				}
415				Ok(None)
416			},
417			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
418		}
419	}
420
421	fn get_node_children(
422		&self,
423		col: ColId,
424		node_address: NodeAddress,
425		external_call: bool,
426	) -> Result<Option<Children>> {
427		if !self.options.columns[col as usize].multitree {
428			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
429		}
430		if !self.options.columns[col as usize].append_only &&
431			!self.options.columns[col as usize].allow_direct_node_access &&
432			external_call
433		{
434			return Err(Error::InvalidConfiguration(
435				"get_node_children can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
436			))
437		}
438		match &self.columns[col as usize] {
439			Column::Hash(column) => {
440				let overlay = self.commit_overlay.read();
441				// Check commit overlay first
442				if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
443				{
444					return Ok(Some(unpack_node_children(v.as_ref())?))
445				}
446				let log = self.log.overlays();
447				let value = column.get_value(Address::from_u64(node_address), log)?;
448				if let Some(data) = value {
449					return Ok(Some(unpack_node_children(&data)?))
450				}
451				Ok(None)
452			},
453			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
454		}
455	}
456
457	fn get_tree(
458		&self,
459		db: &Arc<DbInner>,
460		col: ColId,
461		key: &[u8],
462		check_existence: bool,
463	) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
464		if !self.options.columns[col as usize].multitree {
465			return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
466		}
467		match &self.columns[col as usize] {
468			Column::Hash(column) => {
469				// Check if the tree actually exists. We can't return the data from this function as
470				// TreeReader is not locked. That is done by the client.
471				if check_existence {
472					let root = self.get(col, key, false).unwrap();
473					if root.is_none() {
474						return Ok(None)
475					}
476				}
477
478				let hash_key = column.hash_key(key);
479
480				let trees = self.trees.upgradable_read();
481
482				if let Some(column_trees) = trees.get(&col) {
483					if let Some(reader) = column_trees.readers.get(&hash_key) {
484						let reader = reader.upgrade();
485						if let Some(reader) = reader {
486							return Ok(Some(reader))
487						}
488					}
489				}
490
491				let mut trees = RwLockUpgradableReadGuard::upgrade(trees);
492
493				let column_trees = trees.entry(col).or_insert_with(|| Trees {
494					readers: Default::default(),
495					to_dereference: Default::default(),
496				});
497
498				let reader: Box<dyn TreeReader + Send + Sync> =
499					Box::new(DbTreeReader { db: db.clone(), col, key: hash_key });
500				let reader = Arc::new(RwLock::new(reader));
501
502				column_trees.readers.insert(hash_key, Arc::downgrade(&reader));
503
504				Ok(Some(reader))
505			},
506			Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
507		}
508	}
509
510	fn btree_iter(&self, col: ColId) -> Result<BTreeIterator<'_>> {
511		match &self.columns[col as usize] {
512			Column::Hash(_column) =>
513				Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
514			Column::Tree(column) => {
515				let log = self.log.overlays();
516				BTreeIterator::new(column, col, log, &self.commit_overlay)
517			},
518		}
519	}
520
521	// Commit simply adds the data to the queue and to the overlay and
522	// exits as early as possible.
523	fn commit<I, K>(&self, tx: I) -> Result<()>
524	where
525		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
526		K: AsRef<[u8]>,
527	{
528		self.commit_changes(tx.into_iter().map(|(c, k, v)| {
529			(
530				c,
531				match v {
532					Some(v) => Operation::Set(k.as_ref().to_vec(), v),
533					None => Operation::Dereference(k.as_ref().to_vec()),
534				},
535			)
536		}))
537	}
538
539	fn commit_changes<I, V>(&self, tx: I) -> Result<()>
540	where
541		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, V>)>,
542		V: Into<RcValue>,
543	{
544		let mut commit: CommitChangeSet = Default::default();
545		for (col, change) in tx.into_iter() {
546			if self.options.columns[col as usize].btree_index {
547				commit
548					.btree_indexed
549					.entry(col)
550					.or_insert_with(|| BTreeChangeSet::new(col))
551					.push(change)?
552			} else if self.options.columns[col as usize].multitree {
553				match &self.columns[col as usize] {
554					Column::Hash(column) =>
555						match change {
556							Operation::Set(..) |
557							Operation::Reference(..) |
558							Operation::Dereference(..) =>
559								return Err(Error::InvalidConfiguration(
560									"Invalid operation for multitree column".to_string(),
561								)),
562							Operation::InsertTree(..) => {
563								let (root_data, node_values) = column.claim_tree_values(&change)?;
564
565								let trees = self.trees.read();
566								if let Some(column_trees) = trees.get(&col) {
567									for (hash, count) in &column_trees.to_dereference {
568										assert!(*count > 0);
569
570										// Check if TreeReader is active for this tree
571										let mut tree_active = false;
572										if let Some(reader) = column_trees.readers.get(hash) {
573											let reader = reader.upgrade();
574											if let Some(reader) = reader {
575												if reader.is_locked() {
576													tree_active = true;
577												}
578											}
579										}
580										if tree_active {
581											commit
582												.indexed
583												.entry(col)
584												.or_insert_with(|| IndexedChangeSet::new(col))
585												.used_trees
586												.insert(*hash);
587										}
588									}
589								}
590								drop(trees);
591
592								let root_operation = Operation::Set(change.key(), root_data);
593								commit
594									.indexed
595									.entry(col)
596									.or_insert_with(|| IndexedChangeSet::new(col))
597									.push(root_operation, &self.options, self.db_version)?;
598
599								for node_change in node_values {
600									commit
601										.indexed
602										.entry(col)
603										.or_insert_with(|| IndexedChangeSet::new(col))
604										.push_node_change(node_change);
605								}
606							},
607							Operation::ReferenceTree(..) => {
608								if !self.options.columns[col as usize].append_only {
609									let root_operation =
610										Operation::<&_, V>::Reference(change.key());
611									commit
612										.indexed
613										.entry(col)
614										.or_insert_with(|| IndexedChangeSet::new(col))
615										.push(root_operation, &self.options, self.db_version)?;
616								}
617							},
618							Operation::DereferenceTree(key) => {
619								if self.options.columns[col as usize].append_only {
620									return Err(Error::InvalidConfiguration("Attempting to dereference a tree from an append_only column.".to_string()))
621								}
622								let value = self.get(col, &key, false)?;
623								if let Some(data) = value {
624									let root_data = unpack_node_data(data)?;
625									let children = root_data.1;
626									let salt = self.options.salt.unwrap_or_default();
627									let hash = hash_key(
628										&key,
629										&salt,
630										self.options.columns[col as usize].uniform,
631										self.db_version,
632									);
633
634									let mut trees = self.trees.write();
635
636									let column_trees = trees.entry(col).or_insert_with(|| Trees {
637										readers: Default::default(),
638										to_dereference: Default::default(),
639									});
640									let count =
641										column_trees.to_dereference.get(&hash).unwrap_or(&0) + 1;
642									column_trees.to_dereference.insert(hash, count);
643
644									drop(trees);
645
646									commit.check_for_deferral = true;
647
648									let node_change =
649										NodeChange::DereferenceChildren(key, hash, children);
650
651									commit
652										.indexed
653										.entry(col)
654										.or_insert_with(|| IndexedChangeSet::new(col))
655										.push_node_change(node_change);
656								} else {
657									return Err(Error::InvalidConfiguration(
658										"No entry for tree root".to_string(),
659									))
660								}
661							},
662						},
663					Column::Tree(_) =>
664						return Err(Error::InvalidConfiguration("Not a HashColumn".to_string())),
665				}
666			} else {
667				commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
668					change,
669					&self.options,
670					self.db_version,
671				)?
672			}
673		}
674
675		self.commit_raw(commit)
676	}
677
678	fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
679		let mut queue = self.commit_queue.lock();
680
681		#[cfg(any(test, feature = "instrumentation"))]
682		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
683		#[cfg(not(any(test, feature = "instrumentation")))]
684		let might_wait_because_the_queue_is_full = true;
685		if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
686			log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
687			self.commit_queue_full_cv.wait(&mut queue);
688		}
689
690		{
691			let bg_err = self.bg_err.lock();
692			if let Some(err) = &*bg_err {
693				return Err(Error::Background(err.clone()))
694			}
695		}
696
697		let mut overlay = self.commit_overlay.write();
698
699		queue.record_id += 1;
700		let record_id = queue.record_id;
701
702		let mut bytes = 0;
703		for (c, indexed) in &commit.indexed {
704			indexed.copy_to_overlay(
705				&mut overlay[*c as usize],
706				record_id,
707				&mut bytes,
708				&self.options,
709			)?;
710		}
711
712		for (c, iterset) in &commit.btree_indexed {
713			iterset.copy_to_overlay(
714				&mut overlay[*c as usize].btree_indexed,
715				record_id,
716				&mut bytes,
717				&self.options,
718			)?;
719		}
720
721		let commit = Commit { id: record_id, changeset: commit, bytes };
722
723		log::debug!(
724			target: "parity-db",
725			"Queued commit {}, {} bytes",
726			commit.id,
727			bytes,
728		);
729		queue.commits.push_back(commit);
730		queue.bytes += bytes;
731		self.log_worker_wait.signal();
732		Ok(())
733	}
734
735	fn defer_commit(
736		&self,
737		mut queue: MutexGuard<CommitQueue>,
738		mut commit: CommitChangeSet,
739		old_bytes: usize,
740		old_id: u64,
741		new_id: Option<u64>,
742	) -> Result<()> {
743		let record_id = if let Some(id) = new_id {
744			id
745		} else {
746			queue.record_id += 1;
747			queue.record_id
748		};
749
750		let bytes = if record_id != old_id {
751			let mut overlay = self.commit_overlay.write();
752
753			let mut bytes = 0;
754
755			for (c, indexed) in &commit.indexed {
756				indexed.copy_to_overlay(
757					&mut overlay[*c as usize],
758					record_id,
759					&mut bytes,
760					&self.options,
761				)?;
762			}
763
764			for (c, iterset) in &commit.btree_indexed {
765				iterset.copy_to_overlay(
766					&mut overlay[*c as usize].btree_indexed,
767					record_id,
768					&mut bytes,
769					&self.options,
770				)?;
771			}
772
773			{
774				// Cleanup the commit overlay with old id.
775				for (c, key_values) in commit.indexed.iter() {
776					key_values.clean_overlay(&mut overlay[*c as usize], old_id);
777				}
778				for (c, iterset) in commit.btree_indexed.iter_mut() {
779					iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, old_id);
780				}
781			}
782
783			bytes
784		} else {
785			old_bytes
786		};
787
788		let commit = Commit { id: record_id, changeset: commit, bytes };
789
790		log::debug!(
791			target: "parity-db",
792			"Deferred commit, old id: {}, new id: {}",
793			old_id,
794			record_id,
795		);
796		queue.commits.push_back(commit);
797		queue.bytes += bytes;
798		Ok(())
799	}
800
801	fn process_commits(&self, db: &Arc<DbInner>) -> Result<bool> {
802		#[cfg(any(test, feature = "instrumentation"))]
803		let might_wait_because_the_queue_is_full = self.options.with_background_thread;
804		#[cfg(not(any(test, feature = "instrumentation")))]
805		let might_wait_because_the_queue_is_full = true;
806		if might_wait_because_the_queue_is_full {
807			// Wait if the queue is full.
808			let mut queue = self.log_queue_wait.work.lock();
809			if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
810				log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
811				self.log_queue_wait.cv.wait(&mut queue);
812			}
813		}
814		let commit = {
815			let mut queue = self.commit_queue.lock();
816			if let Some(commit) = queue.commits.pop_front() {
817				queue.bytes -= commit.bytes;
818				log::debug!(
819					target: "parity-db",
820					"Removed {}. Still queued commits {} bytes",
821					commit.bytes,
822					queue.bytes,
823				);
824				if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
825					(queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
826				{
827					// Past the waiting threshold.
828					log::debug!(
829						target: "parity-db",
830						"Waking up commit queue worker",
831					);
832					self.commit_queue_full_cv.notify_all();
833				}
834				Some(commit)
835			} else {
836				None
837			}
838		};
839
840		if let Some(mut commit) = commit {
841			if commit.changeset.check_for_deferral {
842				let mut defer = false;
843				'outer: for (col, key_values) in commit.changeset.indexed.iter() {
844					for change in &key_values.node_changes {
845						if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
846							// Check if there are currently any locks on the tree. Will need to
847							// defer if there are.
848							let trees = self.trees.read();
849							if let Some(column_trees) = trees.get(&col) {
850								let mut tree_active = false;
851								if let Some(reader) = column_trees.readers.get(hash) {
852									let reader = reader.upgrade();
853									if let Some(reader) = reader {
854										if reader.is_locked() {
855											tree_active = true;
856										}
857									}
858								}
859								if tree_active {
860									defer = true;
861									break 'outer
862								}
863							}
864							drop(trees);
865
866							// Also check if there are any later commits in the queue that use this
867							// tree. Will need to defer if there are.
868							let queue = self.commit_queue.lock();
869							for commit in &queue.commits {
870								for (_col, change_set) in &commit.changeset.indexed {
871									for tree in &change_set.used_trees {
872										if tree == hash {
873											defer = true;
874											break 'outer
875										}
876									}
877								}
878							}
879						}
880					}
881				}
882				if defer {
883					let queue = self.commit_queue.lock();
884					let new_id = if queue.commits.len() > 0 {
885						// Generate a new id
886						None
887					} else {
888						// Nothing else in the queue so can reuse same id
889						Some(commit.id)
890					};
891					self.defer_commit(queue, commit.changeset, commit.bytes, commit.id, new_id)?;
892
893					return Ok(true)
894				} else {
895					for (col, key_values) in commit.changeset.indexed.iter() {
896						for change in &key_values.node_changes {
897							if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
898								let mut trees = self.trees.write();
899								if let Some(column_trees) = trees.get_mut(&col) {
900									let count = column_trees.to_dereference.get(hash).unwrap_or(&0);
901									assert!(*count > 0);
902									if *count == 1 {
903										column_trees.to_dereference.remove(hash);
904									} else {
905										column_trees.to_dereference.insert(*hash, count - 1);
906									}
907								}
908							}
909						}
910					}
911				}
912			}
913
914			let mut reindex = false;
915			let mut writer = self.log.begin_record();
916			log::debug!(
917				target: "parity-db",
918				"Processing commit {}, record {}, {} bytes",
919				commit.id,
920				writer.record_id(),
921				commit.bytes,
922			);
923			let mut ops: u64 = 0;
924			for (c, key_values) in commit.changeset.indexed.iter() {
925				key_values.write_plan(
926					db,
927					*c,
928					&self.columns[*c as usize],
929					&mut writer,
930					&mut ops,
931					&mut reindex,
932				)?;
933			}
934
935			for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
936				match &self.columns[*c as usize] {
937					Column::Hash(_column) =>
938						return Err(Error::InvalidConfiguration(
939							"Not an indexed column.".to_string(),
940						)),
941					Column::Tree(column) => {
942						btree.write_plan(column, &mut writer, &mut ops)?;
943					},
944				}
945			}
946
947			// Collect final changes to value tables
948			for c in self.columns.iter() {
949				c.complete_plan(&mut writer)?;
950			}
951			let record_id = writer.record_id();
952			let l = writer.drain();
953
954			let bytes = {
955				let bytes = self.log.end_record(l)?;
956				let mut logged_bytes = self.log_queue_wait.work.lock();
957				*logged_bytes += bytes as i64;
958				self.flush_worker_wait.signal();
959				bytes
960			};
961
962			{
963				// Cleanup the commit overlay.
964				let mut overlay = self.commit_overlay.write();
965				for (c, key_values) in commit.changeset.indexed.iter() {
966					key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
967				}
968				for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
969					iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
970				}
971			}
972
973			if reindex {
974				self.start_reindex(record_id);
975			}
976
977			log::debug!(
978				target: "parity-db",
979				"Processed commit {} (record {}), {} ops, {} bytes written",
980				commit.id,
981				record_id,
982				ops,
983				bytes,
984			);
985			Ok(true)
986		} else {
987			Ok(false)
988		}
989	}
990
991	fn start_reindex(&self, record_id: u64) {
992		log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
993		self.next_reindex.store(record_id, Ordering::SeqCst);
994	}
995
996	fn process_reindex(&self) -> Result<bool> {
997		let next_reindex = self.next_reindex.load(Ordering::SeqCst);
998		if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
999			return Ok(false)
1000		}
1001		// Process any pending reindexes
1002		for column in self.columns.iter() {
1003			let column = if let Column::Hash(c) = column { c } else { continue };
1004			let ReindexBatch {
1005				drop_index,
1006				batch,
1007				drop_ref_count,
1008				ref_count_batch,
1009				ref_count_batch_source,
1010			} = column.reindex(&self.log)?;
1011			if !batch.is_empty() || drop_index.is_some() {
1012				debug_assert!(
1013					ref_count_batch.is_empty() &&
1014						ref_count_batch_source.is_none() &&
1015						drop_ref_count.is_none()
1016				);
1017				let mut next_reindex = false;
1018				let mut writer = self.log.begin_record();
1019				log::debug!(
1020					target: "parity-db",
1021					"Creating reindex record {}",
1022					writer.record_id(),
1023				);
1024				for (key, address) in batch.into_iter() {
1025					if let PlanOutcome::NeedReindex =
1026						column.write_reindex_plan(&key, address, &mut writer)?
1027					{
1028						next_reindex = true
1029					}
1030				}
1031				if let Some(table) = drop_index {
1032					writer.drop_table(table);
1033				}
1034				let record_id = writer.record_id();
1035				let l = writer.drain();
1036
1037				let mut logged_bytes = self.log_queue_wait.work.lock();
1038				let bytes = self.log.end_record(l)?;
1039				log::debug!(
1040					target: "parity-db",
1041					"Created reindex record {}, {} bytes",
1042					record_id,
1043					bytes,
1044				);
1045				*logged_bytes += bytes as i64;
1046				if next_reindex {
1047					self.start_reindex(record_id);
1048				}
1049				self.flush_worker_wait.signal();
1050				return Ok(true)
1051			}
1052			if !ref_count_batch.is_empty() || drop_ref_count.is_some() {
1053				debug_assert!(batch.is_empty() && drop_index.is_none());
1054				debug_assert!(ref_count_batch_source.is_some());
1055				let ref_count_source = ref_count_batch_source.unwrap();
1056				let mut next_reindex = false;
1057				let mut writer = self.log.begin_record();
1058				log::debug!(
1059					target: "parity-db",
1060					"Creating ref count reindex record {}",
1061					writer.record_id(),
1062				);
1063				for (address, ref_count) in ref_count_batch.into_iter() {
1064					if let PlanOutcome::NeedReindex = column.write_ref_count_reindex_plan(
1065						address,
1066						ref_count,
1067						ref_count_source,
1068						&mut writer,
1069					)? {
1070						next_reindex = true
1071					}
1072				}
1073				if let Some(table) = drop_ref_count {
1074					writer.drop_ref_count_table(table);
1075				}
1076				let record_id = writer.record_id();
1077				let l = writer.drain();
1078
1079				let mut logged_bytes = self.log_queue_wait.work.lock();
1080				let bytes = self.log.end_record(l)?;
1081				log::debug!(
1082					target: "parity-db",
1083					"Created ref count reindex record {}, {} bytes",
1084					record_id,
1085					bytes,
1086				);
1087				*logged_bytes += bytes as i64;
1088				if next_reindex {
1089					self.start_reindex(record_id);
1090				}
1091				self.flush_worker_wait.signal();
1092				return Ok(true)
1093			}
1094		}
1095		self.next_reindex.store(0, Ordering::SeqCst);
1096		Ok(false)
1097	}
1098
1099	fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
1100		let _iteration_lock = self.iteration_lock.lock();
1101		let cleared = {
1102			let reader = match self.log.read_next(validation_mode) {
1103				Ok(reader) => reader,
1104				Err(Error::Corruption(_)) if validation_mode => {
1105					log::debug!(target: "parity-db", "Bad log header");
1106					self.log.clear_replay_logs();
1107					return Ok(false)
1108				},
1109				Err(e) => return Err(e),
1110			};
1111			if let Some(mut reader) = reader {
1112				log::debug!(
1113					target: "parity-db",
1114					"Enacting log record {}",
1115					reader.record_id(),
1116				);
1117				if validation_mode {
1118					if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
1119						log::warn!(
1120							target: "parity-db",
1121							"Log sequence error. Expected record {}, got {}",
1122							self.last_enacted.load(Ordering::Relaxed) + 1,
1123							reader.record_id(),
1124						);
1125						drop(reader);
1126						self.log.clear_replay_logs();
1127						return Ok(false)
1128					}
1129					// Validate all records before applying anything
1130					loop {
1131						let next = match reader.next() {
1132							Ok(next) => next,
1133							Err(e) => {
1134								log::debug!(target: "parity-db", "Error reading log: {:?}", e);
1135								return Ok(false)
1136							},
1137						};
1138						match next {
1139							LogAction::BeginRecord => {
1140								log::debug!(target: "parity-db", "Unexpected log header");
1141								drop(reader);
1142								self.log.clear_replay_logs();
1143								return Ok(false)
1144							},
1145							LogAction::EndRecord => break,
1146							LogAction::InsertIndex(insertion) => {
1147								let col = insertion.table.col() as usize;
1148								if let Err(e) = self.columns.get(col).map_or_else(
1149									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1150									|col| {
1151										col.validate_plan(
1152											LogAction::InsertIndex(insertion),
1153											&mut reader,
1154										)
1155									},
1156								) {
1157									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1158									drop(reader);
1159									self.log.clear_replay_logs();
1160									return Ok(false)
1161								}
1162							},
1163							LogAction::InsertValue(insertion) => {
1164								let col = insertion.table.col() as usize;
1165								if let Err(e) = self.columns.get(col).map_or_else(
1166									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1167									|col| {
1168										col.validate_plan(
1169											LogAction::InsertValue(insertion),
1170											&mut reader,
1171										)
1172									},
1173								) {
1174									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1175									drop(reader);
1176									self.log.clear_replay_logs();
1177									return Ok(false)
1178								}
1179							},
1180							LogAction::InsertRefCount(insertion) => {
1181								let col = insertion.table.col() as usize;
1182								if let Err(e) = self.columns.get(col).map_or_else(
1183									|| Err(Error::Corruption(format!("Invalid column id {col}"))),
1184									|col| {
1185										col.validate_plan(
1186											LogAction::InsertRefCount(insertion),
1187											&mut reader,
1188										)
1189									},
1190								) {
1191									log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1192									drop(reader);
1193									self.log.clear_replay_logs();
1194									return Ok(false)
1195								}
1196							},
1197							LogAction::DropTable(_) | LogAction::DropRefCountTable(_) => continue,
1198						}
1199					}
1200					reader.reset()?;
1201					reader.next()?;
1202				}
1203				loop {
1204					match reader.next()? {
1205						LogAction::BeginRecord =>
1206							return Err(Error::Corruption("Bad log record".into())),
1207						LogAction::EndRecord => break,
1208						LogAction::InsertIndex(insertion) => {
1209							self.columns[insertion.table.col() as usize]
1210								.enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
1211						},
1212						LogAction::InsertValue(insertion) => {
1213							self.columns[insertion.table.col() as usize]
1214								.enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
1215						},
1216						LogAction::InsertRefCount(insertion) => {
1217							self.columns[insertion.table.col() as usize]
1218								.enact_plan(LogAction::InsertRefCount(insertion), &mut reader)?;
1219						},
1220						LogAction::DropTable(id) => {
1221							log::debug!(
1222								target: "parity-db",
1223								"Dropping index {}",
1224								id,
1225							);
1226							match &self.columns[id.col() as usize] {
1227								Column::Hash(col) => {
1228									col.drop_index(id)?;
1229									// Check if there's another reindex on the next iteration
1230									self.start_reindex(reader.record_id());
1231								},
1232								Column::Tree(_) => (),
1233							}
1234						},
1235						LogAction::DropRefCountTable(id) => {
1236							log::debug!(
1237								target: "parity-db",
1238								"Dropping ref count {}",
1239								id,
1240							);
1241							match &self.columns[id.col() as usize] {
1242								Column::Hash(col) => {
1243									col.drop_ref_count(id)?;
1244									// Check if there's another reindex on the next iteration
1245									self.start_reindex(reader.record_id());
1246								},
1247								Column::Tree(_) => (),
1248							}
1249						},
1250					}
1251				}
1252				log::debug!(
1253					target: "parity-db",
1254					"Enacted log record {}, {} bytes",
1255					reader.record_id(),
1256					reader.read_bytes(),
1257				);
1258				let record_id = reader.record_id();
1259				let bytes = reader.read_bytes();
1260				let cleared = reader.drain();
1261				self.last_enacted.store(record_id, Ordering::SeqCst);
1262				Some((record_id, cleared, bytes))
1263			} else {
1264				log::debug!(target: "parity-db", "End of log");
1265				None
1266			}
1267		};
1268
1269		if let Some((record_id, cleared, bytes)) = cleared {
1270			self.log.end_read(cleared, record_id);
1271			{
1272				if !validation_mode {
1273					let mut queue = self.log_queue_wait.work.lock();
1274					if *queue < bytes as i64 {
1275						log::warn!(
1276							target: "parity-db",
1277							"Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
1278							record_id,
1279							bytes,
1280							*queue,
1281							self.next_reindex.load(Ordering::SeqCst),
1282						);
1283					}
1284					*queue -= bytes as i64;
1285					if *queue <= MAX_LOG_QUEUE_BYTES &&
1286						(*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
1287					{
1288						self.log_queue_wait.cv.notify_one();
1289					}
1290					log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
1291				}
1292
1293				let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
1294				let dirty_logs = self.log.num_dirty_logs();
1295				if !validation_mode {
1296					while self.log.num_dirty_logs() > max_logs {
1297						log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
1298						self.cleanup_queue_wait.wait();
1299					}
1300				}
1301			}
1302			Ok(true)
1303		} else {
1304			Ok(false)
1305		}
1306	}
1307
1308	fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
1309		let has_flushed = self.log.flush_one(min_log_size)?;
1310		if has_flushed {
1311			self.commit_worker_wait.signal();
1312		}
1313		Ok(has_flushed)
1314	}
1315
1316	fn clean_logs(&self) -> Result<bool> {
1317		let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
1318		let num_cleanup = self.log.num_dirty_logs();
1319		let result = if num_cleanup > keep_logs {
1320			if self.options.sync_data {
1321				for c in self.columns.iter() {
1322					c.flush()?;
1323				}
1324			}
1325			self.log.clean_logs(num_cleanup - keep_logs)?
1326		} else {
1327			false
1328		};
1329		self.cleanup_queue_wait.signal();
1330		Ok(result)
1331	}
1332
1333	fn clean_all_logs(&self) -> Result<()> {
1334		for c in self.columns.iter() {
1335			c.flush()?;
1336		}
1337		let num_cleanup = self.log.num_dirty_logs();
1338		self.log.clean_logs(num_cleanup)?;
1339		Ok(())
1340	}
1341
1342	fn replay_all_logs(&self) -> Result<()> {
1343		while let Some(id) = self.log.replay_next()? {
1344			log::debug!(target: "parity-db", "Replaying database log {}", id);
1345			while self.enact_logs(true)? {}
1346		}
1347
1348		// Re-read any cached metadata
1349		for c in self.columns.iter() {
1350			c.refresh_metadata()?;
1351		}
1352		log::debug!(target: "parity-db", "Replay is complete.");
1353		Ok(())
1354	}
1355
1356	fn shutdown(&self) {
1357		self.shutdown.store(true, Ordering::SeqCst);
1358		self.log_queue_wait.cv.notify_one();
1359		self.flush_worker_wait.signal();
1360		self.log_worker_wait.signal();
1361		self.commit_worker_wait.signal();
1362		self.cleanup_worker_wait.signal();
1363	}
1364
1365	fn kill_logs(&self, db: &Arc<DbInner>) -> Result<()> {
1366		{
1367			if let Some(err) = self.bg_err.lock().as_ref() {
1368				// On error the log reader may be left in inconsistent state. So it is important
1369				// to no attempt any further log enactment.
1370				log::debug!(target: "parity-db", "Shutdown with error state {}", err);
1371				self.log.clean_logs(self.log.num_dirty_logs())?;
1372				return Ok(())
1373			}
1374		}
1375		log::debug!(target: "parity-db", "Processing leftover commits");
1376		// Finish logged records and proceed to log and enact queued commits.
1377		while self.enact_logs(false)? {}
1378		self.flush_logs(0)?;
1379		while self.process_commits(db)? {}
1380		while self.enact_logs(false)? {}
1381		self.flush_logs(0)?;
1382		while self.enact_logs(false)? {}
1383		self.clean_all_logs()?;
1384		self.log.kill_logs()?;
1385		if self.options.stats {
1386			let mut path = self.options.path.clone();
1387			path.push("stats.txt");
1388			match std::fs::File::create(path) {
1389				Ok(file) => {
1390					let mut writer = std::io::BufWriter::new(file);
1391					if let Err(e) = self.write_stats_text(&mut writer, None) {
1392						log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
1393					}
1394				},
1395				Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
1396			}
1397		}
1398		Ok(())
1399	}
1400
1401	fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
1402		if let Some(col) = column {
1403			self.columns[col as usize].write_stats_text(writer)
1404		} else {
1405			for c in self.columns.iter() {
1406				c.write_stats_text(writer)?;
1407			}
1408			Ok(())
1409		}
1410	}
1411
1412	fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1413		if let Some(col) = column {
1414			self.columns[col as usize].clear_stats()
1415		} else {
1416			for c in self.columns.iter() {
1417				c.clear_stats()?;
1418			}
1419			Ok(())
1420		}
1421	}
1422
1423	fn stats(&self) -> StatSummary {
1424		StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
1425	}
1426
1427	fn store_err(&self, result: Result<()>) {
1428		if let Err(e) = result {
1429			log::warn!(target: "parity-db", "Background worker error: {}", e);
1430			let mut err = self.bg_err.lock();
1431			if err.is_none() {
1432				*err = Some(Arc::new(e));
1433				self.shutdown();
1434			}
1435			self.commit_queue_full_cv.notify_all();
1436		}
1437	}
1438
1439	fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1440		let _lock = self.iteration_lock.lock();
1441		match &self.columns[c as usize] {
1442			Column::Hash(column) => column.iter_values(&self.log, f),
1443			Column::Tree(_) => unimplemented!(),
1444		}
1445	}
1446
1447	fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
1448		let _lock = self.iteration_lock.lock();
1449		match &self.columns[c as usize] {
1450			Column::Hash(column) => column.iter_index(&self.log, f),
1451			Column::Tree(_) => unimplemented!(),
1452		}
1453	}
1454}
1455
1456/// Database instance.
1457pub struct Db {
1458	inner: Arc<DbInner>,
1459	commit_thread: Option<thread::JoinHandle<()>>,
1460	flush_thread: Option<thread::JoinHandle<()>>,
1461	log_thread: Option<thread::JoinHandle<()>>,
1462	cleanup_thread: Option<thread::JoinHandle<()>>,
1463}
1464
1465impl Db {
1466	#[cfg(test)]
1467	pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
1468		let options = Options::with_columns(path, num_columns);
1469		Self::open_inner(&options, OpeningMode::Create)
1470	}
1471
1472	/// Open the database with given options. An error will be returned if the database does not
1473	/// exist.
1474	pub fn open(options: &Options) -> Result<Db> {
1475		Self::open_inner(options, OpeningMode::Write)
1476	}
1477
1478	/// Open the database using given options. If the database does not exist it will be created
1479	/// empty.
1480	pub fn open_or_create(options: &Options) -> Result<Db> {
1481		Self::open_inner(options, OpeningMode::Create)
1482	}
1483
1484	/// Open an existing database in read-only mode.
1485	pub fn open_read_only(options: &Options) -> Result<Db> {
1486		Self::open_inner(options, OpeningMode::ReadOnly)
1487	}
1488
1489	fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
1490		assert!(options.is_valid());
1491		let mut db = DbInner::open(options, opening_mode)?;
1492		// This needs to be call before log thread: so first reindexing
1493		// will run in correct state.
1494		if let Err(e) = db.replay_all_logs() {
1495			log::debug!(target: "parity-db", "Error during log replay.");
1496			return Err(e)
1497		} else {
1498			db.log.clear_replay_logs();
1499			db.clean_all_logs()?;
1500			db.log.kill_logs()?;
1501		}
1502		db.init_table_data()?;
1503		let db = Arc::new(db);
1504		#[cfg(any(test, feature = "instrumentation"))]
1505		let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
1506		#[cfg(not(any(test, feature = "instrumentation")))]
1507		let start_threads = opening_mode != OpeningMode::ReadOnly;
1508		let commit_thread = if start_threads {
1509			let commit_worker_db = db.clone();
1510			Some(thread::spawn(move || {
1511				commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
1512			}))
1513		} else {
1514			None
1515		};
1516		let flush_thread = if start_threads {
1517			let flush_worker_db = db.clone();
1518			#[cfg(any(test, feature = "instrumentation"))]
1519			let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
1520			#[cfg(not(any(test, feature = "instrumentation")))]
1521			let min_log_size = MIN_LOG_SIZE_BYTES;
1522			Some(thread::spawn(move || {
1523				flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
1524			}))
1525		} else {
1526			None
1527		};
1528		let log_thread = if start_threads {
1529			let log_worker_db = db.clone();
1530			Some(thread::spawn(move || {
1531				log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
1532			}))
1533		} else {
1534			None
1535		};
1536		let cleanup_thread = if start_threads {
1537			let cleanup_worker_db = db.clone();
1538			Some(thread::spawn(move || {
1539				cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
1540			}))
1541		} else {
1542			None
1543		};
1544		Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
1545	}
1546
1547	/// Get a value in a specified column by key. Returns `None` if the key does not exist.
1548	pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
1549		self.inner.get(col, key, true)
1550	}
1551
1552	/// Get value size by key. Returns `None` if the key does not exist.
1553	pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
1554		self.inner.get_size(col, key)
1555	}
1556
1557	/// Iterate over all ordered key-value pairs. Only supported for columns configured with
1558	/// `btree_indexed`.
1559	pub fn iter(&self, col: ColId) -> Result<BTreeIterator<'_>> {
1560		self.inner.btree_iter(col)
1561	}
1562
1563	pub fn get_tree(
1564		&self,
1565		col: ColId,
1566		key: &[u8],
1567	) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
1568		self.inner.get_tree(&self.inner, col, key, true)
1569	}
1570
1571	pub fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
1572		self.inner.get_root(col, key)
1573	}
1574
1575	pub fn get_node(
1576		&self,
1577		col: ColId,
1578		node_address: NodeAddress,
1579	) -> Result<Option<(Vec<u8>, Children)>> {
1580		self.inner.get_node(col, node_address, true)
1581	}
1582
1583	pub fn get_node_children(
1584		&self,
1585		col: ColId,
1586		node_address: NodeAddress,
1587	) -> Result<Option<Children>> {
1588		self.inner.get_node_children(col, node_address, true)
1589	}
1590
1591	/// Commit a set of changes to the database.
1592	pub fn commit<I, K>(&self, tx: I) -> Result<()>
1593	where
1594		I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1595		K: AsRef<[u8]>,
1596	{
1597		self.inner.commit(tx)
1598	}
1599
1600	/// Commit a set of changes to the database.
1601	pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1602	where
1603		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1604	{
1605		self.inner.commit_changes(tx)
1606	}
1607
1608	/// Commit a set of changes to the database.
1609	///
1610	/// This method passes values as `Arc<Vec<u8>>` potentially eliminating an extra copy.
1611	#[cfg(feature = "arc")]
1612	#[deprecated(
1613		note = "This method will be removed in future versions. Use `commit_changes_bytes` instead"
1614	)]
1615	pub fn commit_changes_shared<I>(&self, tx: I) -> Result<()>
1616	where
1617		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Arc<Vec<u8>>>)>,
1618	{
1619		self.inner.commit_changes(tx)
1620	}
1621
1622	/// Commit a set of changes to the database.
1623	///
1624	/// This method passes values as `Bytes` potentially eliminating an extra copy.
1625	#[cfg(feature = "bytes")]
1626	pub fn commit_changes_bytes<I>(&self, tx: I) -> Result<()>
1627	where
1628		I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Bytes>)>,
1629	{
1630		self.inner.commit_changes(tx)
1631	}
1632
1633	pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1634		self.inner.commit_raw(commit)
1635	}
1636
1637	/// Returns the number of columns in the database.
1638	pub fn num_columns(&self) -> u8 {
1639		self.inner.columns.len() as u8
1640	}
1641
1642	/// Iterate a column and call a function for each value. This is only supported for columns with
1643	/// `btree_index` set to `false`. Iteration order is unspecified.
1644	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1645	pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1646		self.inner.iter_column_while(c, f)
1647	}
1648
1649	/// Iterate a column and call a function for each value. This is only supported for columns with
1650	/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
1651	/// `key` field in the state is the hash of the original key.
1652	/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
1653	pub(crate) fn iter_column_index_while(
1654		&self,
1655		c: ColId,
1656		f: impl FnMut(IterState) -> bool,
1657	) -> Result<()> {
1658		self.inner.iter_column_index_while(c, f)
1659	}
1660
1661	fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1662		let mut more_work = false;
1663		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1664			if !more_work {
1665				db.cleanup_worker_wait.signal();
1666				if !db.log.has_log_files_to_read() {
1667					db.commit_worker_wait.wait();
1668				}
1669			}
1670
1671			more_work = db.enact_logs(false)?;
1672		}
1673		log::debug!(target: "parity-db", "Commit worker shutdown");
1674		Ok(())
1675	}
1676
1677	fn log_worker(db: Arc<DbInner>) -> Result<()> {
1678		// Start with pending reindex.
1679		let mut more_reindex = db.process_reindex()?;
1680		let mut more_commits = false;
1681		// Process all commits but allow reindex to be interrupted.
1682		while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1683			if !more_commits && !more_reindex {
1684				db.log_worker_wait.wait();
1685			}
1686
1687			more_commits = db.process_commits(&db)?;
1688			more_reindex = db.process_reindex()?;
1689		}
1690		log::debug!(target: "parity-db", "Log worker shutdown");
1691		Ok(())
1692	}
1693
1694	fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1695		let mut more_work = false;
1696		while !db.shutdown.load(Ordering::SeqCst) {
1697			if !more_work {
1698				db.flush_worker_wait.wait();
1699			}
1700			more_work = db.flush_logs(min_log_size)?;
1701		}
1702		log::debug!(target: "parity-db", "Flush worker shutdown");
1703		Ok(())
1704	}
1705
1706	fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1707		let mut more_work = true;
1708		while !db.shutdown.load(Ordering::SeqCst) || more_work {
1709			if !more_work {
1710				db.cleanup_worker_wait.wait();
1711			}
1712			more_work = db.clean_logs()?;
1713		}
1714		log::debug!(target: "parity-db", "Cleanup worker shutdown");
1715		Ok(())
1716	}
1717
1718	/// Dump full database stats to the text output.
1719	pub fn write_stats_text(
1720		&self,
1721		writer: &mut impl std::io::Write,
1722		column: Option<u8>,
1723	) -> Result<()> {
1724		self.inner.write_stats_text(writer, column)
1725	}
1726
1727	/// Reset internal database statistics for the database or specified column.
1728	pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1729		self.inner.clear_stats(column)
1730	}
1731
1732	/// Print database contents in text form to stderr.
1733	pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1734		if let Some(col) = check_param.column {
1735			self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1736		} else {
1737			for (ix, c) in self.inner.columns.iter().enumerate() {
1738				c.dump(&self.inner.log, &check_param, ix as ColId)?;
1739			}
1740		}
1741		Ok(())
1742	}
1743
1744	/// Get database statistics.
1745	pub fn stats(&self) -> StatSummary {
1746		self.inner.stats()
1747	}
1748
1749	pub fn get_num_column_value_entries(&self, col: ColId) -> Result<u64> {
1750		let column = &self.inner.columns[col as usize];
1751		match column {
1752			Column::Hash(column) => return column.get_num_value_entries(),
1753			Column::Tree(..) =>
1754				return Err(Error::InvalidConfiguration(
1755					"get_num_column_value_entries not implemented for tree columns.".to_string(),
1756				)),
1757		}
1758	}
1759
1760	// We open the DB before to check metadata validity and make sure there are no pending WAL
1761	// logs.
1762	fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1763		let db = Db::open(options)?;
1764		let salt = db.inner.options.salt;
1765		drop(db);
1766		Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1767	}
1768
1769	/// Add a new column with options specified by `new_column_options`.
1770	pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1771		let salt = Self::precheck_column_operation(options)?;
1772
1773		options.columns.push(new_column_options);
1774		options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1775
1776		Ok(())
1777	}
1778
1779	/// Remove last column from the database.
1780	/// Db must be close when called.
1781	pub fn drop_last_column(options: &mut Options) -> Result<()> {
1782		let salt = Self::precheck_column_operation(options)?;
1783		let nb_column = options.columns.len();
1784		if nb_column == 0 {
1785			return Ok(())
1786		}
1787		let index = options.columns.len() - 1;
1788		Self::remove_column_files(options, index as u8)?;
1789		options.columns.pop();
1790		options.write_metadata(&options.path, &salt)?;
1791		Ok(())
1792	}
1793
1794	/// Truncate a column from the database, optionally changing its options.
1795	/// Db must be close when called.
1796	pub fn reset_column(
1797		options: &mut Options,
1798		index: u8,
1799		new_options: Option<ColumnOptions>,
1800	) -> Result<()> {
1801		let salt = Self::precheck_column_operation(options)?;
1802		Self::remove_column_files(options, index)?;
1803
1804		if let Some(new_options) = new_options {
1805			options.columns[index as usize] = new_options;
1806			options.write_metadata(&options.path, &salt)?;
1807		}
1808
1809		Ok(())
1810	}
1811
1812	fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1813		if index as usize >= options.columns.len() {
1814			return Err(Error::IncompatibleColumnConfig {
1815				id: index,
1816				reason: "Column not found".to_string(),
1817			})
1818		}
1819
1820		Column::drop_files(index, options.path.clone())?;
1821		Ok(())
1822	}
1823
1824	#[cfg(feature = "instrumentation")]
1825	pub fn process_reindex(&self) -> Result<()> {
1826		self.inner.process_reindex()?;
1827		Ok(())
1828	}
1829
1830	#[cfg(feature = "instrumentation")]
1831	pub fn process_commits(&self) -> Result<()> {
1832		self.inner.process_commits(&self.inner)?;
1833		Ok(())
1834	}
1835
1836	#[cfg(feature = "instrumentation")]
1837	pub fn flush_logs(&self) -> Result<()> {
1838		self.inner.flush_logs(0)?;
1839		Ok(())
1840	}
1841
1842	#[cfg(feature = "instrumentation")]
1843	pub fn enact_logs(&self) -> Result<()> {
1844		while self.inner.enact_logs(false)? {}
1845		Ok(())
1846	}
1847
1848	#[cfg(feature = "instrumentation")]
1849	pub fn clean_logs(&self) -> Result<()> {
1850		self.inner.clean_logs()?;
1851		Ok(())
1852	}
1853}
1854
1855impl Drop for Db {
1856	fn drop(&mut self) {
1857		self.drop_inner()
1858	}
1859}
1860
1861impl Db {
1862	fn drop_inner(&mut self) {
1863		self.inner.shutdown();
1864		if let Some(t) = self.log_thread.take() {
1865			if let Err(e) = t.join() {
1866				log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1867			}
1868		}
1869		if let Some(t) = self.flush_thread.take() {
1870			if let Err(e) = t.join() {
1871				log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1872			}
1873		}
1874		if let Some(t) = self.commit_thread.take() {
1875			if let Err(e) = t.join() {
1876				log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1877			}
1878		}
1879		if let Some(t) = self.cleanup_thread.take() {
1880			if let Err(e) = t.join() {
1881				log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1882			}
1883		}
1884		if let Err(e) = self.inner.kill_logs(&self.inner) {
1885			log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1886		}
1887		if let Err(e) = fs2::FileExt::unlock(&self.inner.lock_file) {
1888			log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1889		}
1890	}
1891}
1892
1893// Use a trait here to allow client code to have better control over lock guard lifetime without
1894// lifetime proliferation within Db (which would be required if not using a dynamic object).
1895pub trait TreeReader {
1896	fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>>;
1897	fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>>;
1898	fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>>;
1899}
1900
1901#[derive(Debug)]
1902pub struct DbTreeReader {
1903	db: Arc<DbInner>,
1904	col: ColId,
1905	key: Key,
1906}
1907
1908impl TreeReader for DbTreeReader {
1909	fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>> {
1910		/* let value = self.db.get(self.col, &self.key)?;
1911		if let Some(data) = value {
1912			return unpack_node_data(data).map(|x| Some(x))
1913		}
1914		Err(Error::InvalidValueData) */
1915
1916		match &self.db.columns[self.col as usize] {
1917			Column::Hash(column) => {
1918				let overlay = self.db.commit_overlay.read();
1919				// Check commit overlay first
1920				let value = if let Some(v) =
1921					overlay.get(self.col as usize).and_then(|o| o.get(&self.key))
1922				{
1923					Ok(v.map(|i| i.as_ref().to_vec()))
1924				} else {
1925					// Go into tables and log overlay.
1926					let log = self.db.log.overlays();
1927					Ok(column.get(&self.key, log)?.map(|(v, _rc)| v))
1928				}?;
1929
1930				if let Some(data) = value {
1931					return unpack_node_data(data).map(|x| Some(x))
1932				}
1933
1934				return Ok(None)
1935			},
1936			Column::Tree(..) =>
1937				return Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
1938		};
1939	}
1940
1941	fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>> {
1942		self.db.get_node(self.col, node_address, false)
1943	}
1944
1945	fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>> {
1946		self.db.get_node_children(self.col, node_address, false)
1947	}
1948}
1949
1950pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1951pub type AddressCommitOverlay = HashMap<u64, (u64, RcValue)>;
1952pub type BTreeCommitOverlay = BTreeMap<RcKey, (u64, Option<RcValue>)>;
1953
1954#[derive(Debug)]
1955pub struct CommitOverlay {
1956	indexed: IndexedCommitOverlay,
1957	address: AddressCommitOverlay,
1958	btree_indexed: BTreeCommitOverlay,
1959}
1960
1961impl CommitOverlay {
1962	fn new() -> Self {
1963		CommitOverlay {
1964			indexed: Default::default(),
1965			address: Default::default(),
1966			btree_indexed: Default::default(),
1967		}
1968	}
1969
1970	#[cfg(test)]
1971	fn is_empty(&self) -> bool {
1972		self.indexed.is_empty() && self.address.is_empty() && self.btree_indexed.is_empty()
1973	}
1974}
1975
1976impl CommitOverlay {
1977	fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1978		self.indexed.get(key).map(|(_, v)| v.as_ref())
1979	}
1980
1981	fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1982		self.get_ref(key).map(|v| v.cloned())
1983	}
1984
1985	fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1986		self.get_ref(key).map(|res| res.as_ref().map(|b| b.as_ref().len() as u32))
1987	}
1988
1989	fn get_address(&self, address: u64) -> Option<RcValue> {
1990		self.address.get(&address).map(|(_, v)| v.clone())
1991	}
1992
1993	fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1994		self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1995	}
1996
1997	pub fn btree_next(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
1998		use crate::btree::LastKey;
1999		match &last_key {
2000			LastKey::Start => self
2001				.btree_indexed
2002				.range::<[u8], _>(..)
2003				.next()
2004				.map(|(k, (_, v))| (k.clone(), v.clone())),
2005			LastKey::End => None,
2006			LastKey::At(key) => self
2007				.btree_indexed
2008				.range::<[u8], _>((Bound::Excluded(key.as_slice()), Bound::Unbounded))
2009				.next()
2010				.map(|(k, (_, v))| (k.clone(), v.clone())),
2011			LastKey::Seeked(key) => self
2012				.btree_indexed
2013				.range::<[u8], _>((Bound::Included(key.as_slice()), Bound::Unbounded))
2014				.next()
2015				.map(|(k, (_, v))| (k.clone(), v.clone())),
2016		}
2017	}
2018
2019	pub fn btree_prev(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
2020		use crate::btree::LastKey;
2021		match &last_key {
2022			LastKey::End => self
2023				.btree_indexed
2024				.range::<[u8], _>(..)
2025				.rev()
2026				.next()
2027				.map(|(k, (_, v))| (k.clone(), v.clone())),
2028			LastKey::Start => None,
2029			LastKey::At(key) => self
2030				.btree_indexed
2031				.range::<[u8], _>((Bound::Unbounded, Bound::Excluded(key.as_slice())))
2032				.rev()
2033				.next()
2034				.map(|(k, (_, v))| (k.clone(), v.clone())),
2035			LastKey::Seeked(key) => self
2036				.btree_indexed
2037				.range::<[u8], _>((Bound::Unbounded, Bound::Included(key.as_slice())))
2038				.rev()
2039				.next()
2040				.map(|(k, (_, v))| (k.clone(), v.clone())),
2041		}
2042	}
2043}
2044
2045/// Different operations allowed for a commit.
2046/// Behavior may differs depending on column configuration.
2047#[derive(Debug, PartialEq, Eq)]
2048pub enum Operation<Key, Value> {
2049	/// Insert or update the value for a given key.
2050	Set(Key, Value),
2051
2052	/// Dereference at a given key, resulting in
2053	/// either removal of a key value or decrement of its
2054	/// reference count counter.
2055	Dereference(Key),
2056
2057	/// Increment the reference count counter of an existing value for a given key.
2058	/// If no value exists for the key, this operation is skipped.
2059	Reference(Key),
2060
2061	/// Insert a new tree into a MultiTree column using root key and node structure.
2062	InsertTree(Key, NewNode),
2063
2064	/// Increment the reference count of a tree (at root Key) from a MultiTree column.
2065	ReferenceTree(Key),
2066
2067	/// Dereference an existing tree (at root Key) from a MultiTree column, resulting in either
2068	/// removal of the tree or decrement of its reference count.
2069	DereferenceTree(Key),
2070}
2071
2072impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
2073	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2074		Some(self.cmp(other))
2075	}
2076}
2077
2078impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
2079	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2080		self.key().cmp(other.key())
2081	}
2082}
2083
2084impl<Key, Value> Operation<Key, Value> {
2085	pub fn key(&self) -> &Key {
2086		match self {
2087			Operation::Set(k, _) |
2088			Operation::Dereference(k) |
2089			Operation::Reference(k) |
2090			Operation::InsertTree(k, _) |
2091			Operation::ReferenceTree(k) |
2092			Operation::DereferenceTree(k) => k,
2093		}
2094	}
2095
2096	pub fn into_key(self) -> Key {
2097		match self {
2098			Operation::Set(k, _) |
2099			Operation::Dereference(k) |
2100			Operation::Reference(k) |
2101			Operation::InsertTree(k, _) |
2102			Operation::ReferenceTree(k) |
2103			Operation::DereferenceTree(k) => k,
2104		}
2105	}
2106}
2107
2108impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
2109	pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
2110		match self {
2111			Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
2112			Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
2113			Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
2114			Operation::InsertTree(k, n) => Operation::InsertTree(k.as_ref().to_vec(), n),
2115			Operation::ReferenceTree(k) => Operation::ReferenceTree(k.as_ref().to_vec()),
2116			Operation::DereferenceTree(k) => Operation::DereferenceTree(k.as_ref().to_vec()),
2117		}
2118	}
2119}
2120
2121#[derive(Debug, PartialEq, Eq)]
2122pub enum NodeChange {
2123	/// (address, value)
2124	NewValue(u64, RcValue),
2125	/// (address)
2126	IncrementReference(u64),
2127	/// Dereference and remove any of the children in the tree
2128	DereferenceChildren(Vec<u8>, Key, Children),
2129}
2130
2131#[derive(Debug, Default)]
2132pub struct CommitChangeSet {
2133	pub indexed: HashMap<ColId, IndexedChangeSet>,
2134	pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
2135	pub check_for_deferral: bool,
2136}
2137
2138#[derive(Debug)]
2139pub struct IndexedChangeSet {
2140	pub col: ColId,
2141	pub changes: Vec<Operation<Key, RcValue>>,
2142	pub node_changes: Vec<NodeChange>,
2143	pub used_trees: HashSet<Key>,
2144}
2145
2146impl IndexedChangeSet {
2147	pub fn new(col: ColId) -> Self {
2148		IndexedChangeSet {
2149			col,
2150			changes: Default::default(),
2151			node_changes: Default::default(),
2152			used_trees: Default::default(),
2153		}
2154	}
2155
2156	fn push<K: AsRef<[u8]>, V: Into<RcValue>>(
2157		&mut self,
2158		change: Operation<K, V>,
2159		options: &Options,
2160		db_version: u32,
2161	) -> Result<()> {
2162		let salt = options.salt.unwrap_or_default();
2163		let hash_key = |key: &[u8]| -> Key {
2164			hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
2165		};
2166
2167		self.push_change_hashed(match change {
2168			Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
2169			Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
2170			Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
2171			Operation::InsertTree(..) |
2172			Operation::ReferenceTree(..) |
2173			Operation::DereferenceTree(..) =>
2174				return Err(Error::InvalidInput(format!(
2175					"Invalid operation for column {}",
2176					self.col
2177				))),
2178		});
2179
2180		Ok(())
2181	}
2182
2183	fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
2184		self.changes.push(change);
2185	}
2186
2187	fn push_node_change(&mut self, change: NodeChange) {
2188		self.node_changes.push(change);
2189	}
2190
2191	fn copy_to_overlay(
2192		&self,
2193		overlay: &mut CommitOverlay,
2194		record_id: u64,
2195		bytes: &mut usize,
2196		options: &Options,
2197	) -> Result<()> {
2198		let ref_counted = options.columns[self.col as usize].ref_counted;
2199		for change in self.changes.iter() {
2200			match &change {
2201				Operation::Set(k, v) => {
2202					*bytes += k.len();
2203					*bytes += v.as_ref().len();
2204					overlay.indexed.insert(*k, (record_id, Some(v.clone())));
2205				},
2206				Operation::Dereference(k) => {
2207					// Don't add removed ref-counted values to overlay.
2208					if !ref_counted {
2209						overlay.indexed.insert(*k, (record_id, None));
2210					}
2211				},
2212				Operation::Reference(..) => {
2213					// Don't add (we allow remove value in overlay when using rc: some
2214					// indexing on top of it is expected).
2215					if !ref_counted {
2216						return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
2217					}
2218				},
2219				Operation::InsertTree(..) |
2220				Operation::ReferenceTree(..) |
2221				Operation::DereferenceTree(..) =>
2222					return Err(Error::InvalidInput(format!(
2223						"Invalid operation for column {}",
2224						self.col
2225					))),
2226			}
2227		}
2228		for change in self.node_changes.iter() {
2229			if let NodeChange::NewValue(address, val) = change {
2230				*bytes += val.as_ref().len();
2231				overlay.address.insert(*address, (record_id, val.clone()));
2232			}
2233		}
2234		Ok(())
2235	}
2236
2237	fn write_plan(
2238		&self,
2239		db: &Arc<DbInner>,
2240		col: ColId,
2241		column: &Column,
2242		writer: &mut crate::log::LogWriter,
2243		ops: &mut u64,
2244		reindex: &mut bool,
2245	) -> Result<()> {
2246		let column = match column {
2247			Column::Hash(column) => column,
2248			Column::Tree(_) => {
2249				log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
2250				return Ok(())
2251			},
2252		};
2253		for change in self.changes.iter() {
2254			if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
2255				// Reindex has triggered another reindex.
2256				*reindex = true;
2257			}
2258			*ops += 1;
2259		}
2260		for change in self.node_changes.iter() {
2261			match change {
2262				NodeChange::NewValue(address, val) => {
2263					column.write_address_value_plan(
2264						*address,
2265						val.clone(),
2266						false,
2267						val.as_ref().len() as u32,
2268						writer,
2269					)?;
2270				},
2271				NodeChange::IncrementReference(address) => {
2272					if let PlanOutcome::NeedReindex =
2273						column.write_address_inc_ref_plan(*address, writer)?
2274					{
2275						*reindex = true;
2276					}
2277				},
2278				NodeChange::DereferenceChildren(key, hash, children) => {
2279					if let Some((_root, rc)) = column.get(hash, writer)? {
2280						column.write_plan(&Operation::Dereference(*hash), writer)?;
2281						log::debug!(target: "parity-db", "Dereferencing root, rc={}", rc);
2282						if rc == 1 {
2283							let tree = db.get_tree(db, col, key, false).unwrap();
2284							if let Some(tree) = tree {
2285								let guard = tree.write();
2286								let mut num_removed = 0;
2287								self.write_dereference_children_plan(
2288									column,
2289									&guard,
2290									children,
2291									&mut num_removed,
2292									writer,
2293								)?;
2294								log::debug!(target: "parity-db", "Dereferenced tree {:?}, removed {}", &key[0..3], num_removed);
2295							}
2296						}
2297					}
2298					// TODO: Remove TreeReader from Db.
2299				},
2300			}
2301		}
2302		Ok(())
2303	}
2304
2305	fn write_dereference_children_plan(
2306		&self,
2307		column: &HashColumn,
2308		guard: &RwLockWriteGuard<'_, Box<dyn TreeReader + Send + Sync>>,
2309		children: &Vec<u64>,
2310		num_removed: &mut u64,
2311		writer: &mut crate::log::LogWriter,
2312	) -> Result<()> {
2313		for address in children {
2314			// Can't move this after write_address_dec_ref_plan as write_address_dec_ref_plan might
2315			// free the node meaning it could get reclaimed. Then get_node_children will return
2316			// incorrect data.
2317			let node = guard.get_node_children(*address)?;
2318			let (remains, _outcome) = column.write_address_dec_ref_plan(*address, writer)?;
2319			if !remains {
2320				// Was removed
2321				*num_removed += 1;
2322				if let Some(children) = node {
2323					self.write_dereference_children_plan(
2324						column,
2325						guard,
2326						&children,
2327						num_removed,
2328						writer,
2329					)?;
2330				} else {
2331					return Err(Error::InvalidConfiguration("Missing node data".to_string()))
2332				}
2333			}
2334		}
2335		Ok(())
2336	}
2337
2338	fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
2339		use std::collections::hash_map::Entry;
2340		for change in self.changes.iter() {
2341			match change {
2342				Operation::Set(k, _) | Operation::Dereference(k) => {
2343					if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
2344						if e.get().0 == record_id {
2345							e.remove_entry();
2346						}
2347					}
2348				},
2349				Operation::Reference(..) |
2350				Operation::InsertTree(..) |
2351				Operation::ReferenceTree(..) |
2352				Operation::DereferenceTree(..) => (),
2353			}
2354		}
2355		for change in self.node_changes.iter() {
2356			if let NodeChange::NewValue(address, _val) = change {
2357				if let Entry::Occupied(e) = overlay.address.entry(*address) {
2358					if e.get().0 == record_id {
2359						e.remove_entry();
2360					}
2361				}
2362			}
2363		}
2364	}
2365}
2366
2367/// Verification operation utilities.
2368pub mod check {
2369	/// Database dump verbosity.
2370	pub enum CheckDisplay {
2371		/// Don't output any data.
2372		None,
2373		/// Output full data.
2374		Full,
2375		/// Limit value output to the specified size.
2376		Short(u64),
2377	}
2378
2379	/// Options for producing a database dump.
2380	pub struct CheckOptions {
2381		/// Only process this column. If this is `None` all columns will be processed.
2382		pub column: Option<u8>,
2383		/// Start with this index.
2384		pub from: Option<u64>,
2385		/// End with this index.
2386		pub bound: Option<u64>,
2387		/// Verbosity.
2388		pub display: CheckDisplay,
2389		/// Ordered validation.
2390		pub fast: bool,
2391		/// Make sure free lists are correct.
2392		pub validate_free_refs: bool,
2393	}
2394
2395	impl CheckOptions {
2396		/// Create a new instance.
2397		pub fn new(
2398			column: Option<u8>,
2399			from: Option<u64>,
2400			bound: Option<u64>,
2401			display_content: bool,
2402			truncate_value_display: Option<u64>,
2403			fast: bool,
2404			validate_free_refs: bool,
2405		) -> Self {
2406			let display = if display_content {
2407				match truncate_value_display {
2408					Some(t) => CheckDisplay::Short(t),
2409					None => CheckDisplay::Full,
2410				}
2411			} else {
2412				CheckDisplay::None
2413			};
2414			CheckOptions { column, from, bound, display, fast, validate_free_refs }
2415		}
2416	}
2417}
2418
2419#[derive(Eq, PartialEq, Clone, Copy)]
2420enum OpeningMode {
2421	Create,
2422	Write,
2423	ReadOnly,
2424}
2425
2426#[cfg(test)]
2427mod tests {
2428	use super::{Db, Options};
2429	use crate::{
2430		column::ColId,
2431		db::{DbInner, OpeningMode},
2432		ColumnOptions, Value,
2433	};
2434	use rand::Rng;
2435	use std::{
2436		collections::{BTreeMap, HashMap, HashSet},
2437		path::Path,
2438	};
2439	use tempfile::tempdir;
2440
2441	// This is used in tests to disable certain commit stages.
2442	#[derive(Eq, PartialEq, Debug, Clone, Copy)]
2443	enum EnableCommitPipelineStages {
2444		// No threads started, data stays in commit overlay.
2445		#[allow(dead_code)]
2446		CommitOverlay,
2447		// Log worker run, data processed up to the log overlay.
2448		#[allow(dead_code)]
2449		LogOverlay,
2450		// Runing all.
2451		#[allow(dead_code)]
2452		DbFile,
2453		// Default run mode.
2454		Standard,
2455	}
2456
2457	impl EnableCommitPipelineStages {
2458		fn options(&self, path: &Path, num_columns: u8) -> Options {
2459			Options {
2460				path: path.into(),
2461				sync_wal: true,
2462				sync_data: true,
2463				stats: true,
2464				salt: None,
2465				columns: (0..num_columns).map(|_| Default::default()).collect(),
2466				compression_threshold: HashMap::new(),
2467				with_background_thread: *self == Self::Standard,
2468				always_flush: *self == Self::DbFile,
2469			}
2470		}
2471
2472		fn run_stages(&self, db: &Db) {
2473			let db = &db.inner;
2474			if *self == EnableCommitPipelineStages::DbFile ||
2475				*self == EnableCommitPipelineStages::LogOverlay
2476			{
2477				while db.process_commits(db).unwrap() {}
2478				while db.process_reindex().unwrap() {}
2479			}
2480			if *self == EnableCommitPipelineStages::DbFile {
2481				let _ = db.log.flush_one(0).unwrap();
2482				while db.enact_logs(false).unwrap() {}
2483				let _ = db.clean_logs().unwrap();
2484			}
2485		}
2486
2487		fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
2488			match self {
2489				EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
2490					if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
2491						if !overlay.is_empty() {
2492							let mut replayed = 5;
2493							while !overlay.is_empty() {
2494								if replayed > 0 {
2495									replayed -= 1;
2496									// the signal is triggered just before cleaning the overlay, so
2497									// we wait a bit.
2498									// Warning this is still rather flaky and should be ignored
2499									// or removed.
2500									std::thread::sleep(std::time::Duration::from_millis(100));
2501								} else {
2502									return false
2503								}
2504							}
2505						}
2506					}
2507				},
2508				_ => (),
2509			}
2510			true
2511		}
2512	}
2513
2514	#[test]
2515	fn test_db_open_should_fail() {
2516		let tmp = tempdir().unwrap();
2517		let options = Options::with_columns(tmp.path(), 5);
2518		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2519	}
2520
2521	#[test]
2522	fn test_db_open_fail_then_recursively_create() {
2523		let tmp = tempdir().unwrap();
2524		let (db_path_first, db_path_last) = {
2525			let mut db_path_first = tmp.path().to_owned();
2526			db_path_first.push("nope");
2527
2528			let mut db_path_last = db_path_first.to_owned();
2529
2530			for p in ["does", "not", "yet", "exist"] {
2531				db_path_last.push(p);
2532			}
2533
2534			(db_path_first, db_path_last)
2535		};
2536
2537		assert!(
2538			!db_path_first.exists(),
2539			"That directory should not have existed at this point (dir: {db_path_first:?})"
2540		);
2541
2542		let options = Options::with_columns(&db_path_last, 5);
2543		assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2544
2545		assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
2546		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2547
2548		assert!(
2549			db_path_first.is_dir(),
2550			"A directory should have been been created (dir: {db_path_first:?})"
2551		);
2552		assert!(
2553			db_path_last.is_dir(),
2554			"A directory should have been been created (dir: {db_path_last:?})"
2555		);
2556	}
2557
2558	#[test]
2559	fn test_db_open_or_create() {
2560		let tmp = tempdir().unwrap();
2561		let options = Options::with_columns(tmp.path(), 5);
2562		assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2563		assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
2564	}
2565
2566	#[test]
2567	fn test_indexed_keyvalues() {
2568		test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
2569		test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
2570		test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
2571		test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
2572	}
2573	fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
2574		let tmp = tempdir().unwrap();
2575		let options = db_test.options(tmp.path(), 5);
2576		let col_nb = 0;
2577
2578		let key1 = b"key1".to_vec();
2579		let key2 = b"key2".to_vec();
2580		let key3 = b"key3".to_vec();
2581
2582		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2583		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2584
2585		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2586		db_test.run_stages(&db);
2587		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2588
2589		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2590
2591		db.commit(vec![
2592			(col_nb, key1.clone(), None),
2593			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2594			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2595		])
2596		.unwrap();
2597		db_test.run_stages(&db);
2598		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2599
2600		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2601		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2602		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2603
2604		db.commit(vec![
2605			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2606			(col_nb, key3.clone(), None),
2607		])
2608		.unwrap();
2609		db_test.run_stages(&db);
2610		assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2611
2612		assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2613		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2614		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2615	}
2616
2617	#[test]
2618	fn test_indexed_overlay_against_backend() {
2619		let tmp = tempdir().unwrap();
2620		let db_test = EnableCommitPipelineStages::DbFile;
2621		let options = db_test.options(tmp.path(), 5);
2622		let col_nb = 0;
2623
2624		let key1 = b"key1".to_vec();
2625		let key2 = b"key2".to_vec();
2626		let key3 = b"key3".to_vec();
2627
2628		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2629
2630		db.commit(vec![
2631			(col_nb, key1.clone(), Some(b"value1".to_vec())),
2632			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2633			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2634		])
2635		.unwrap();
2636		db_test.run_stages(&db);
2637		drop(db);
2638
2639		// issue with some file reopening when no delay
2640		std::thread::sleep(std::time::Duration::from_millis(100));
2641
2642		let db_test = EnableCommitPipelineStages::CommitOverlay;
2643		let options = db_test.options(tmp.path(), 5);
2644		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2645		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2646		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2647		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2648		db.commit(vec![
2649			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2650			(col_nb, key3.clone(), None),
2651		])
2652		.unwrap();
2653		db_test.run_stages(&db);
2654
2655		assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2656		assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2657		assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2658	}
2659
2660	#[test]
2661	fn test_add_column() {
2662		let tmp = tempdir().unwrap();
2663		let db_test = EnableCommitPipelineStages::DbFile;
2664		let mut options = db_test.options(tmp.path(), 1);
2665		options.salt = Some(options.salt.unwrap_or_default());
2666
2667		let old_col_id = 0;
2668		let new_col_id = 1;
2669		let new_col_indexed_id = 2;
2670
2671		let key1 = b"key1".to_vec();
2672		let key2 = b"key2".to_vec();
2673		let key3 = b"key3".to_vec();
2674
2675		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2676
2677		db.commit(vec![
2678			(old_col_id, key1.clone(), Some(b"value1".to_vec())),
2679			(old_col_id, key2.clone(), Some(b"value2".to_vec())),
2680			(old_col_id, key3.clone(), Some(b"value3".to_vec())),
2681		])
2682		.unwrap();
2683		db_test.run_stages(&db);
2684
2685		drop(db);
2686
2687		Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
2688			.unwrap();
2689
2690		Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
2691			.unwrap();
2692
2693		let mut options = db_test.options(tmp.path(), 3);
2694		options.columns[new_col_indexed_id as usize].btree_index = true;
2695
2696		let db_test = EnableCommitPipelineStages::DbFile;
2697		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2698
2699		// Expected number of columns
2700		assert_eq!(db.num_columns(), 3);
2701
2702		let new_key1 = b"abcdef".to_vec();
2703		let new_key2 = b"123456".to_vec();
2704
2705		// Write to new columns.
2706		db.commit(vec![
2707			(new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
2708			(new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
2709			(new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
2710			(new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
2711		])
2712		.unwrap();
2713		db_test.run_stages(&db);
2714
2715		drop(db);
2716
2717		// Reopen DB and fetch all keys we inserted.
2718		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2719
2720		assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2721		assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2722		assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2723
2724		// Fetch from new columns
2725		assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
2726		assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
2727		assert_eq!(
2728			db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
2729			Some(new_key1.to_vec())
2730		);
2731		assert_eq!(
2732			db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
2733			Some(new_key2.to_vec())
2734		);
2735	}
2736
2737	#[test]
2738	fn test_indexed_btree_1() {
2739		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
2740		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
2741		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
2742		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
2743		test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
2744		test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
2745		test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
2746		test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
2747	}
2748	fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
2749		let tmp = tempdir().unwrap();
2750		let col_nb = 0u8;
2751		let mut options = db_test.options(tmp.path(), 5);
2752		options.columns[col_nb as usize].btree_index = true;
2753
2754		let (key1, key2, key3, key4) = if !long_key {
2755			(b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
2756		} else {
2757			let key2 = vec![2; 272];
2758			let mut key3 = key2.clone();
2759			key3[271] = 3;
2760			(vec![1; 953], key2, key3, vec![4; 79])
2761		};
2762
2763		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2764		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2765
2766		let mut iter = db.iter(col_nb).unwrap();
2767		assert_eq!(iter.next().unwrap(), None);
2768		assert_eq!(iter.prev().unwrap(), None);
2769
2770		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2771		db_test.run_stages(&db);
2772
2773		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2774		iter.seek_to_first().unwrap();
2775		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2776		assert_eq!(iter.next().unwrap(), None);
2777		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2778		assert_eq!(iter.prev().unwrap(), None);
2779		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2780		assert_eq!(iter.next().unwrap(), None);
2781
2782		iter.seek_to_first().unwrap();
2783		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2784		assert_eq!(iter.prev().unwrap(), None);
2785
2786		iter.seek(&[0xff]).unwrap();
2787		assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2788		assert_eq!(iter.prev().unwrap(), None);
2789
2790		db.commit(vec![
2791			(col_nb, key1.clone(), None),
2792			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2793			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2794		])
2795		.unwrap();
2796		db_test.run_stages(&db);
2797
2798		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2799		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2800		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2801
2802		iter.seek(key2.as_slice()).unwrap();
2803		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2804		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2805		assert_eq!(iter.next().unwrap(), None);
2806
2807		iter.seek(key3.as_slice()).unwrap();
2808		assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2809		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2810		assert_eq!(iter.prev().unwrap(), None);
2811
2812		db.commit(vec![
2813			(col_nb, key2.clone(), Some(b"value2b".to_vec())),
2814			(col_nb, key4.clone(), Some(b"value4".to_vec())),
2815			(col_nb, key3.clone(), None),
2816		])
2817		.unwrap();
2818		db_test.run_stages(&db);
2819
2820		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2821		assert_eq!(db.get(col_nb, &key3).unwrap(), None);
2822		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
2823		assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
2824		let mut key22 = key2.clone();
2825		key22.push(2);
2826		iter.seek(key22.as_slice()).unwrap();
2827		assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
2828		assert_eq!(iter.next().unwrap(), None);
2829	}
2830
2831	#[test]
2832	fn test_indexed_btree_2() {
2833		test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
2834		test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
2835	}
2836	fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
2837		let tmp = tempdir().unwrap();
2838		let col_nb = 0u8;
2839		let mut options = db_test.options(tmp.path(), 5);
2840		options.columns[col_nb as usize].btree_index = true;
2841
2842		let key1 = b"key1".to_vec();
2843		let key2 = b"key2".to_vec();
2844		let key3 = b"key3".to_vec();
2845
2846		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2847		let mut iter = db.iter(col_nb).unwrap();
2848		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2849		assert_eq!(iter.next().unwrap(), None);
2850
2851		db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2852		EnableCommitPipelineStages::DbFile.run_stages(&db);
2853		drop(db);
2854
2855		// issue with some file reopening when no delay
2856		std::thread::sleep(std::time::Duration::from_millis(100));
2857
2858		let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2859
2860		let mut iter = db.iter(col_nb).unwrap();
2861		assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2862		iter.seek_to_first().unwrap();
2863		assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2864		assert_eq!(iter.next().unwrap(), None);
2865
2866		db.commit(vec![
2867			(col_nb, key1.clone(), None),
2868			(col_nb, key2.clone(), Some(b"value2".to_vec())),
2869			(col_nb, key3.clone(), Some(b"value3".to_vec())),
2870		])
2871		.unwrap();
2872		db_test.run_stages(&db);
2873
2874		assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2875		assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2876		assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2877		iter.seek(key2.as_slice()).unwrap();
2878		assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2879		assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2880		assert_eq!(iter.next().unwrap(), None);
2881
2882		iter.seek_to_last().unwrap();
2883		assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2884		assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2885		assert_eq!(iter.prev().unwrap(), None);
2886	}
2887
2888	#[test]
2889	fn test_indexed_btree_3() {
2890		test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2891		test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2892		test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2893		test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2894	}
2895
2896	fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2897		use rand::SeedableRng;
2898
2899		use std::collections::BTreeSet;
2900
2901		let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2902
2903		let tmp = tempdir().unwrap();
2904		let col_nb = 0u8;
2905		let mut options = db_test.options(tmp.path(), 5);
2906		options.columns[col_nb as usize].btree_index = true;
2907
2908		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2909
2910		db.commit(
2911			(0u64..1024)
2912				.map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2913				.chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2914		)
2915		.unwrap();
2916		let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2917		let mut iter = db.iter(0).unwrap();
2918
2919		for _ in 0..100 {
2920			let at = rng.random_range(0u64..=1024);
2921			iter.seek(&at.to_be_bytes()).unwrap();
2922
2923			let mut prev_run: bool = rng.random();
2924			let at = if prev_run {
2925				let take = rng.random_range(1..100);
2926				let got = std::iter::from_fn(|| iter.next().unwrap())
2927					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2928					.take(take)
2929					.collect::<Vec<_>>();
2930				let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2931				assert_eq!(got, expected);
2932				if got.is_empty() {
2933					prev_run = false;
2934				}
2935				if got.len() < take {
2936					prev_run = false;
2937				}
2938				expected.last().copied().unwrap_or(at)
2939			} else {
2940				at
2941			};
2942
2943			let at = {
2944				let take = rng.random_range(1..100);
2945				let got = std::iter::from_fn(|| iter.prev().unwrap())
2946					.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2947					.take(take)
2948					.collect::<Vec<_>>();
2949				let expected = if prev_run {
2950					expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2951				} else {
2952					expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2953				};
2954				assert_eq!(got, expected);
2955				prev_run = !got.is_empty();
2956				if take > got.len() {
2957					prev_run = false;
2958				}
2959				expected.last().copied().unwrap_or(at)
2960			};
2961
2962			let take = rng.random_range(1..100);
2963			let mut got = std::iter::from_fn(|| iter.next().unwrap())
2964				.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2965				.take(take)
2966				.collect::<Vec<_>>();
2967			let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2968			if prev_run {
2969				expected = expected.split_off(1);
2970				if got.len() == take {
2971					got.pop();
2972				}
2973			}
2974			assert_eq!(got, expected);
2975		}
2976
2977		let take = rng.random_range(20..100);
2978		iter.seek_to_last().unwrap();
2979		let got = std::iter::from_fn(|| iter.prev().unwrap())
2980			.map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2981			.take(take)
2982			.collect::<Vec<_>>();
2983		let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2984		assert_eq!(got, expected);
2985	}
2986
2987	fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2988		test_basic_inner(change_set, false, false);
2989		test_basic_inner(change_set, false, true);
2990		test_basic_inner(change_set, true, false);
2991		test_basic_inner(change_set, true, true);
2992	}
2993
2994	fn test_basic_inner(
2995		change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2996		btree_index: bool,
2997		ref_counted: bool,
2998	) {
2999		let tmp = tempdir().unwrap();
3000		let col_nb = 1u8;
3001		let db_test = EnableCommitPipelineStages::DbFile;
3002		let mut options = db_test.options(tmp.path(), 2);
3003		options.columns[col_nb as usize].btree_index = btree_index;
3004		options.columns[col_nb as usize].ref_counted = ref_counted;
3005		options.columns[col_nb as usize].preimage = ref_counted;
3006		// ref counted and commit overlay currently don't support removal
3007		assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
3008		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3009
3010		let iter = btree_index.then(|| db.iter(col_nb).unwrap());
3011		assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
3012
3013		db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
3014			.unwrap();
3015		db_test.run_stages(&db);
3016
3017		let mut keys = HashSet::new();
3018		let mut expected_count: u64 = 0;
3019		for (k, v) in change_set.iter() {
3020			if v.is_some() {
3021				if keys.insert(k) {
3022					expected_count += 1;
3023				}
3024			} else if keys.remove(k) {
3025				expected_count -= 1;
3026			}
3027		}
3028		if ref_counted {
3029			let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
3030			for (k, v) in change_set.iter() {
3031				let mut remove = false;
3032				let mut insert = false;
3033				match state.get_mut(k) {
3034					Some(Some((_, counter))) =>
3035						if v.is_some() {
3036							*counter += 1;
3037						} else if *counter == 1 {
3038							remove = true;
3039						} else {
3040							*counter -= 1;
3041						},
3042					Some(None) | None =>
3043						if v.is_some() {
3044							insert = true;
3045						},
3046				}
3047				if insert {
3048					state.insert(k.clone(), v.clone().map(|v| (v, 1)));
3049				}
3050				if remove {
3051					state.remove(k);
3052				}
3053			}
3054			for (key, value) in state {
3055				assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
3056			}
3057		} else {
3058			let stats = db.stats();
3059			// btree do not have stats implemented
3060			if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
3061				assert_eq!(stats.total_values, expected_count);
3062			}
3063
3064			let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
3065				change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
3066			for (key, value) in state.iter() {
3067				assert_eq!(&db.get(col_nb, key).unwrap(), value);
3068			}
3069		}
3070	}
3071
3072	#[test]
3073	fn test_random() {
3074		fdlimit::raise_fd_limit().unwrap();
3075		for i in 0..100 {
3076			test_random_inner(60, 60, i);
3077		}
3078		for i in 0..50 {
3079			test_random_inner(20, 60, i);
3080		}
3081	}
3082	fn test_random_inner(size: usize, key_size: usize, seed: u64) {
3083		use rand::{RngCore, SeedableRng};
3084		let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
3085		let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
3086		for i in 0..size {
3087			let nb_delete: u32 = rng.next_u32(); // should be out of loop, yet it makes alternance insert/delete in some case.
3088			let nb_delete = (nb_delete as usize % size) / 2;
3089			let mut key = vec![0u8; key_size];
3090			rng.fill_bytes(&mut key[..]);
3091			let value = if i > size - nb_delete {
3092				let random_key = rng.next_u32();
3093				let random_key = (random_key % 4) > 0;
3094				if !random_key {
3095					key = data[i - size / 2].0.clone();
3096				}
3097				None
3098			} else {
3099				Some(key.clone())
3100			};
3101			let var_keysize = rng.next_u32();
3102			let var_keysize = var_keysize as usize % (key_size / 2);
3103			key.truncate(key_size - var_keysize);
3104			data.push((key, value));
3105		}
3106		test_basic(&data[..]);
3107	}
3108
3109	#[test]
3110	fn test_simple() {
3111		test_basic(&[
3112			(b"key1".to_vec(), Some(b"value1".to_vec())),
3113			(b"key1".to_vec(), Some(b"value1".to_vec())),
3114			(b"key1".to_vec(), None),
3115		]);
3116		test_basic(&[
3117			(b"key1".to_vec(), Some(b"value1".to_vec())),
3118			(b"key1".to_vec(), Some(b"value1".to_vec())),
3119			(b"key1".to_vec(), None),
3120			(b"key1".to_vec(), None),
3121		]);
3122		test_basic(&[
3123			(b"key1".to_vec(), Some(b"value1".to_vec())),
3124			(b"key1".to_vec(), Some(b"value2".to_vec())),
3125		]);
3126		test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
3127		test_basic(&[
3128			(b"key1".to_vec(), Some(b"value1".to_vec())),
3129			(b"key2".to_vec(), Some(b"value2".to_vec())),
3130		]);
3131		test_basic(&[
3132			(b"key1".to_vec(), Some(b"value1".to_vec())),
3133			(b"key2".to_vec(), Some(b"value2".to_vec())),
3134			(b"key3".to_vec(), Some(b"value3".to_vec())),
3135		]);
3136		test_basic(&[
3137			(b"key1".to_vec(), Some(b"value1".to_vec())),
3138			(b"key3".to_vec(), Some(b"value3".to_vec())),
3139			(b"key2".to_vec(), Some(b"value2".to_vec())),
3140		]);
3141		test_basic(&[
3142			(b"key3".to_vec(), Some(b"value3".to_vec())),
3143			(b"key2".to_vec(), Some(b"value2".to_vec())),
3144			(b"key1".to_vec(), Some(b"value1".to_vec())),
3145		]);
3146		test_basic(&[
3147			(b"key1".to_vec(), Some(b"value1".to_vec())),
3148			(b"key2".to_vec(), Some(b"value2".to_vec())),
3149			(b"key3".to_vec(), Some(b"value3".to_vec())),
3150			(b"key4".to_vec(), Some(b"value4".to_vec())),
3151		]);
3152		test_basic(&[
3153			(b"key1".to_vec(), Some(b"value1".to_vec())),
3154			(b"key2".to_vec(), Some(b"value2".to_vec())),
3155			(b"key3".to_vec(), Some(b"value3".to_vec())),
3156			(b"key4".to_vec(), Some(b"value4".to_vec())),
3157			(b"key5".to_vec(), Some(b"value5".to_vec())),
3158		]);
3159		test_basic(&[
3160			(b"key5".to_vec(), Some(b"value5".to_vec())),
3161			(b"key3".to_vec(), Some(b"value3".to_vec())),
3162			(b"key4".to_vec(), Some(b"value4".to_vec())),
3163			(b"key2".to_vec(), Some(b"value2".to_vec())),
3164			(b"key1".to_vec(), Some(b"value1".to_vec())),
3165		]);
3166		test_basic(&[
3167			(b"key5".to_vec(), Some(b"value5".to_vec())),
3168			(b"key3".to_vec(), Some(b"value3".to_vec())),
3169			(b"key4".to_vec(), Some(b"value4".to_vec())),
3170			(b"key2".to_vec(), Some(b"value2".to_vec())),
3171			(b"key1".to_vec(), Some(b"value1".to_vec())),
3172			(b"key11".to_vec(), Some(b"value31".to_vec())),
3173			(b"key12".to_vec(), Some(b"value32".to_vec())),
3174		]);
3175		test_basic(&[
3176			(b"key5".to_vec(), Some(b"value5".to_vec())),
3177			(b"key3".to_vec(), Some(b"value3".to_vec())),
3178			(b"key4".to_vec(), Some(b"value4".to_vec())),
3179			(b"key2".to_vec(), Some(b"value2".to_vec())),
3180			(b"key1".to_vec(), Some(b"value1".to_vec())),
3181			(b"key51".to_vec(), Some(b"value31".to_vec())),
3182			(b"key52".to_vec(), Some(b"value32".to_vec())),
3183		]);
3184		test_basic(&[
3185			(b"key5".to_vec(), Some(b"value5".to_vec())),
3186			(b"key3".to_vec(), Some(b"value3".to_vec())),
3187			(b"key4".to_vec(), Some(b"value4".to_vec())),
3188			(b"key2".to_vec(), Some(b"value2".to_vec())),
3189			(b"key1".to_vec(), Some(b"value1".to_vec())),
3190			(b"key31".to_vec(), Some(b"value31".to_vec())),
3191			(b"key32".to_vec(), Some(b"value32".to_vec())),
3192		]);
3193		test_basic(&[
3194			(b"key1".to_vec(), Some(b"value5".to_vec())),
3195			(b"key2".to_vec(), Some(b"value3".to_vec())),
3196			(b"key3".to_vec(), Some(b"value4".to_vec())),
3197			(b"key4".to_vec(), Some(b"value7".to_vec())),
3198			(b"key5".to_vec(), Some(b"value2".to_vec())),
3199			(b"key6".to_vec(), Some(b"value1".to_vec())),
3200			(b"key3".to_vec(), None),
3201		]);
3202		test_basic(&[
3203			(b"key1".to_vec(), Some(b"value5".to_vec())),
3204			(b"key2".to_vec(), Some(b"value3".to_vec())),
3205			(b"key3".to_vec(), Some(b"value4".to_vec())),
3206			(b"key4".to_vec(), Some(b"value7".to_vec())),
3207			(b"key5".to_vec(), Some(b"value2".to_vec())),
3208			(b"key0".to_vec(), Some(b"value1".to_vec())),
3209			(b"key3".to_vec(), None),
3210		]);
3211		test_basic(&[
3212			(b"key1".to_vec(), Some(b"value5".to_vec())),
3213			(b"key2".to_vec(), Some(b"value3".to_vec())),
3214			(b"key3".to_vec(), Some(b"value4".to_vec())),
3215			(b"key4".to_vec(), Some(b"value7".to_vec())),
3216			(b"key5".to_vec(), Some(b"value2".to_vec())),
3217			(b"key3".to_vec(), None),
3218		]);
3219		test_basic(&[
3220			(b"key1".to_vec(), Some(b"value5".to_vec())),
3221			(b"key4".to_vec(), Some(b"value3".to_vec())),
3222			(b"key5".to_vec(), Some(b"value4".to_vec())),
3223			(b"key6".to_vec(), Some(b"value4".to_vec())),
3224			(b"key7".to_vec(), Some(b"value2".to_vec())),
3225			(b"key8".to_vec(), Some(b"value1".to_vec())),
3226			(b"key5".to_vec(), None),
3227		]);
3228		test_basic(&[
3229			(b"key1".to_vec(), Some(b"value5".to_vec())),
3230			(b"key4".to_vec(), Some(b"value3".to_vec())),
3231			(b"key5".to_vec(), Some(b"value4".to_vec())),
3232			(b"key7".to_vec(), Some(b"value2".to_vec())),
3233			(b"key8".to_vec(), Some(b"value1".to_vec())),
3234			(b"key3".to_vec(), None),
3235		]);
3236		test_basic(&[
3237			(b"key5".to_vec(), Some(b"value5".to_vec())),
3238			(b"key3".to_vec(), Some(b"value3".to_vec())),
3239			(b"key4".to_vec(), Some(b"value4".to_vec())),
3240			(b"key2".to_vec(), Some(b"value2".to_vec())),
3241			(b"key1".to_vec(), Some(b"value1".to_vec())),
3242			(b"key5".to_vec(), None),
3243			(b"key3".to_vec(), None),
3244		]);
3245		test_basic(&[
3246			(b"key5".to_vec(), Some(b"value5".to_vec())),
3247			(b"key3".to_vec(), Some(b"value3".to_vec())),
3248			(b"key4".to_vec(), Some(b"value4".to_vec())),
3249			(b"key2".to_vec(), Some(b"value2".to_vec())),
3250			(b"key1".to_vec(), Some(b"value1".to_vec())),
3251			(b"key5".to_vec(), None),
3252			(b"key3".to_vec(), None),
3253			(b"key2".to_vec(), None),
3254			(b"key4".to_vec(), None),
3255		]);
3256		test_basic(&[
3257			(b"key5".to_vec(), Some(b"value5".to_vec())),
3258			(b"key3".to_vec(), Some(b"value3".to_vec())),
3259			(b"key4".to_vec(), Some(b"value4".to_vec())),
3260			(b"key2".to_vec(), Some(b"value2".to_vec())),
3261			(b"key1".to_vec(), Some(b"value1".to_vec())),
3262			(b"key5".to_vec(), None),
3263			(b"key3".to_vec(), None),
3264			(b"key2".to_vec(), None),
3265			(b"key4".to_vec(), None),
3266			(b"key1".to_vec(), None),
3267		]);
3268		test_basic(&[
3269			([5u8; 250].to_vec(), Some(b"value5".to_vec())),
3270			([5u8; 200].to_vec(), Some(b"value3".to_vec())),
3271			([5u8; 100].to_vec(), Some(b"value4".to_vec())),
3272			([5u8; 150].to_vec(), Some(b"value2".to_vec())),
3273			([5u8; 101].to_vec(), Some(b"value1".to_vec())),
3274			([5u8; 250].to_vec(), None),
3275			([5u8; 101].to_vec(), None),
3276		]);
3277	}
3278
3279	#[test]
3280	fn test_btree_iter() {
3281		let col_nb = 0;
3282		let mut data_start = Vec::new();
3283		for i in 0u8..100 {
3284			let mut key = b"key0".to_vec();
3285			key[3] = i;
3286			let mut value = b"val0".to_vec();
3287			value[3] = i;
3288			data_start.push((col_nb, key, Some(value)));
3289		}
3290		let mut data_change = Vec::new();
3291		for i in 0u8..100 {
3292			let mut key = b"key0".to_vec();
3293			if i % 2 == 0 {
3294				key[2] = i;
3295				let mut value = b"val0".to_vec();
3296				value[2] = i;
3297				data_change.push((col_nb, key, Some(value)));
3298			} else if i % 3 == 0 {
3299				key[3] = i;
3300				data_change.push((col_nb, key, None));
3301			} else {
3302				key[3] = i;
3303				let mut value = b"val0".to_vec();
3304				value[2] = i;
3305				data_change.push((col_nb, key, Some(value)));
3306			}
3307		}
3308
3309		let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3310			data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3311		let mut end_state = start_state.clone();
3312		for (_c, k, v) in data_change.iter() {
3313			if let Some(v) = v {
3314				end_state.insert(k.clone(), v.clone());
3315			} else {
3316				end_state.remove(k);
3317			}
3318		}
3319
3320		for stage in [
3321			EnableCommitPipelineStages::CommitOverlay,
3322			EnableCommitPipelineStages::LogOverlay,
3323			EnableCommitPipelineStages::DbFile,
3324			EnableCommitPipelineStages::Standard,
3325		] {
3326			for i in 0..10 {
3327				test_btree_iter_inner(
3328					stage,
3329					&data_start,
3330					&data_change,
3331					&start_state,
3332					&end_state,
3333					i * 5,
3334				);
3335			}
3336			let data_start = vec![
3337				(0, b"key1".to_vec(), Some(b"val1".to_vec())),
3338				(0, b"key3".to_vec(), Some(b"val3".to_vec())),
3339			];
3340			let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
3341			let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3342				data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3343			let mut end_state = start_state.clone();
3344			for (_c, k, v) in data_change.iter() {
3345				if let Some(v) = v {
3346					end_state.insert(k.clone(), v.clone());
3347				} else {
3348					end_state.remove(k);
3349				}
3350			}
3351			test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
3352		}
3353	}
3354	fn test_btree_iter_inner(
3355		db_test: EnableCommitPipelineStages,
3356		data_start: &[(u8, Vec<u8>, Option<Value>)],
3357		data_change: &[(u8, Vec<u8>, Option<Value>)],
3358		start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3359		end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3360		commit_at: usize,
3361	) {
3362		let tmp = tempdir().unwrap();
3363		let mut options = db_test.options(tmp.path(), 5);
3364		let col_nb = 0;
3365		options.columns[col_nb as usize].btree_index = true;
3366		let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3367
3368		db.commit(data_start.iter().cloned()).unwrap();
3369		db_test.run_stages(&db);
3370
3371		let mut iter = db.iter(col_nb).unwrap();
3372		let mut iter_state = start_state.iter();
3373		let mut last_key = Value::new();
3374		for _ in 0..commit_at {
3375			let next = iter.next().unwrap();
3376			if let Some((k, _)) = next.as_ref() {
3377				last_key = k.clone();
3378			}
3379			assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
3380		}
3381
3382		db.commit(data_change.iter().cloned()).unwrap();
3383		db_test.run_stages(&db);
3384
3385		let mut iter_state = end_state.range(last_key.clone()..);
3386		for _ in commit_at..100 {
3387			let mut state_next = iter_state.next();
3388			if let Some((k, _v)) = state_next.as_ref() {
3389				if *k == &last_key {
3390					state_next = iter_state.next();
3391				}
3392			}
3393			let iter_next = iter.next().unwrap();
3394			assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
3395		}
3396		let mut iter_state_rev = end_state.iter().rev();
3397		let mut iter = db.iter(col_nb).unwrap();
3398		iter.seek_to_last().unwrap();
3399		for _ in 0..100 {
3400			let next = iter.prev().unwrap();
3401			assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
3402		}
3403	}
3404
3405	#[cfg(feature = "instrumentation")]
3406	#[test]
3407	fn test_recover_from_log_on_error() {
3408		let tmp = tempdir().unwrap();
3409		let mut options = Options::with_columns(tmp.path(), 1);
3410		options.always_flush = true;
3411		options.with_background_thread = false;
3412
3413		// We do 2 commits and we fail while enacting the second one
3414		{
3415			let db = Db::open_or_create(&options).unwrap();
3416			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3417			db.process_commits().unwrap();
3418			db.flush_logs().unwrap();
3419			db.enact_logs().unwrap();
3420			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3421			db.process_commits().unwrap();
3422			db.flush_logs().unwrap();
3423			crate::set_number_of_allowed_io_operations(4);
3424
3425			// Set the background error explicitly as background threads are disabled in tests.
3426			let err = db.enact_logs();
3427			assert!(err.is_err());
3428			db.inner.store_err(err);
3429			crate::set_number_of_allowed_io_operations(usize::MAX);
3430		}
3431
3432		// Open the databases and check that both values are there.
3433		{
3434			let db = Db::open(&options).unwrap();
3435			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3436			assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
3437		}
3438	}
3439
3440	#[cfg(feature = "instrumentation")]
3441	#[test]
3442	fn test_partial_log_recovery() {
3443		let tmp = tempdir().unwrap();
3444		let mut options = Options::with_columns(tmp.path(), 1);
3445		options.columns[0].btree_index = true;
3446		options.always_flush = true;
3447		options.with_background_thread = false;
3448
3449		// We do 2 commits and we fail while writing the second one
3450		{
3451			let db = Db::open_or_create(&options).unwrap();
3452			db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3453			db.process_commits().unwrap();
3454			db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3455			crate::set_number_of_allowed_io_operations(4);
3456			assert!(db.process_commits().is_err());
3457			crate::set_number_of_allowed_io_operations(usize::MAX);
3458			db.flush_logs().unwrap();
3459		}
3460
3461		// We open a first time, the first value is there
3462		{
3463			let db = Db::open(&options).unwrap();
3464			assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3465		}
3466
3467		// We open a second time, the first value should be still there
3468		{
3469			let db = Db::open(&options).unwrap();
3470			assert!(db.get(0, &[0]).unwrap().is_some());
3471		}
3472	}
3473
3474	#[cfg(feature = "instrumentation")]
3475	#[test]
3476	fn test_continue_reindex() {
3477		let _ = env_logger::try_init();
3478		let tmp = tempdir().unwrap();
3479		let mut options = Options::with_columns(tmp.path(), 1);
3480		options.columns[0].preimage = true;
3481		options.columns[0].uniform = true;
3482		options.always_flush = true;
3483		options.with_background_thread = false;
3484		options.salt = Some(Default::default());
3485
3486		{
3487			// Force a reindex by committing more than 64 values with the same 16 bit prefix
3488			let db = Db::open_or_create(&options).unwrap();
3489			let commit: Vec<_> = (0..65u32)
3490				.map(|index| {
3491					let mut key = [0u8; 32];
3492					key[2] = (index as u8) << 1;
3493					(0, key.to_vec(), Some(vec![index as u8]))
3494				})
3495				.collect();
3496			db.commit(commit).unwrap();
3497
3498			db.process_commits().unwrap();
3499			db.flush_logs().unwrap();
3500			db.enact_logs().unwrap();
3501			// i16 now contains 64 values and i17 contains a single value that did not fit
3502
3503			// Simulate interrupted reindex by processing it first and then restoring the old index
3504			// file. Make a copy of the index file first.
3505			std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
3506				.unwrap();
3507			db.process_reindex().unwrap();
3508			db.flush_logs().unwrap();
3509			db.enact_logs().unwrap();
3510			db.clean_logs().unwrap();
3511			std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
3512				.unwrap();
3513		}
3514
3515		// Reopen the database which should load the reindex.
3516		{
3517			let db = Db::open(&options).unwrap();
3518			db.process_reindex().unwrap();
3519			let mut entries = 0;
3520			db.iter_column_while(0, |_| {
3521				entries += 1;
3522				true
3523			})
3524			.unwrap();
3525
3526			assert_eq!(entries, 65);
3527			assert_eq!(db.inner.columns[0].index_bits(), Some(17));
3528		}
3529	}
3530
3531	#[test]
3532	fn test_remove_column() {
3533		let tmp = tempdir().unwrap();
3534		let db_test_file = EnableCommitPipelineStages::DbFile;
3535		let mut options_db_files = db_test_file.options(tmp.path(), 2);
3536		options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
3537		let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
3538		options_std.salt = options_db_files.salt.clone();
3539
3540		let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
3541
3542		let payload: Vec<(u8, _, _)> = (0u16..100)
3543			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3544			.collect();
3545
3546		db.commit(payload.clone()).unwrap();
3547
3548		db_test_file.run_stages(&db);
3549		drop(db);
3550
3551		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3552		for (col, key, value) in payload.iter() {
3553			assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
3554		}
3555		drop(db);
3556		Db::reset_column(&mut options_db_files, 1, None).unwrap();
3557
3558		let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
3559		for (col, key, _value) in payload.iter() {
3560			assert_eq!(db.get(*col, key).unwrap(), None);
3561		}
3562
3563		let payload: Vec<(u8, _, _)> = (0u16..10)
3564			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3565			.collect();
3566
3567		db.commit(payload.clone()).unwrap();
3568
3569		db_test_file.run_stages(&db);
3570		drop(db);
3571
3572		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3573		let payload: Vec<(u8, _, _)> = (10u16..100)
3574			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3575			.collect();
3576
3577		db.commit(payload.clone()).unwrap();
3578		assert!(db.iter(1).is_err());
3579
3580		drop(db);
3581
3582		let mut col_option = options_std.columns[1].clone();
3583		col_option.btree_index = true;
3584		Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
3585
3586		let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3587		let payload: Vec<(u8, _, _)> = (0u16..10)
3588			.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3589			.collect();
3590
3591		db.commit(payload.clone()).unwrap();
3592		assert!(db.iter(1).is_ok());
3593	}
3594}