1use crate::{
22 btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23 column::{
24 hash_key, unpack_node_children, unpack_node_data, ColId, Column, HashColumn, IterState,
25 ReindexBatch, ValueIterState,
26 },
27 error::{try_io, Error, Result},
28 hash::IdentityBuildHasher,
29 index::{Address, PlanOutcome},
30 log::{Log, LogAction},
31 multitree::{Children, NewNode, NodeAddress},
32 options::{Options, CURRENT_VERSION},
33 parking_lot::{
34 Condvar, Mutex, MutexGuard, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard,
35 },
36 stats::StatSummary,
37 ColumnOptions, Key,
38};
39#[cfg(feature = "bytes")]
40use bytes::Bytes;
41use fs2::FileExt;
42use std::{
43 borrow::Borrow,
44 collections::{BTreeMap, HashMap, HashSet, VecDeque},
45 ops::Bound,
46 sync::{
47 atomic::{AtomicBool, AtomicU64, Ordering},
48 Arc, Weak,
49 },
50 thread,
51};
52
53const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
57const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
60const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
62const KEEP_LOGS: usize = 16;
65const MAX_LOG_FILES: usize = 4;
69
70pub type Value = Vec<u8>;
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
74pub enum RcValue {
75 #[cfg(feature = "arc")]
76 Arc(Arc<Value>),
77 #[cfg(feature = "bytes")]
78 Bytes(Bytes),
79}
80
81impl AsRef<[u8]> for RcValue {
82 fn as_ref(&self) -> &[u8] {
83 match self {
84 #[cfg(feature = "arc")]
85 Self::Arc(arc) => arc.as_ref(),
86 #[cfg(feature = "bytes")]
87 Self::Bytes(bytes) => bytes.as_ref(),
88 }
89 }
90}
91
92impl Borrow<[u8]> for RcValue {
93 fn borrow(&self) -> &[u8] {
94 self.as_ref()
95 }
96}
97
98#[cfg(feature = "arc")]
99impl From<Value> for RcValue {
100 fn from(value: Value) -> Self {
101 Self::Arc(value.into())
102 }
103}
104
105#[cfg(not(feature = "arc"))]
106impl From<Value> for RcValue {
107 fn from(value: Value) -> Self {
108 Self::Bytes(value.into())
109 }
110}
111
112#[cfg(feature = "arc")]
113impl From<Arc<Value>> for RcValue {
114 fn from(value: Arc<Value>) -> Self {
115 Self::Arc(value)
116 }
117}
118
119#[cfg(feature = "bytes")]
120impl From<Bytes> for RcValue {
121 fn from(value: Bytes) -> Self {
122 Self::Bytes(value)
123 }
124}
125
126#[cfg(test)]
127impl<const N: usize> TryFrom<RcValue> for [u8; N] {
128 type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
129
130 fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
131 value.as_ref().to_vec().try_into()
132 }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
136pub struct RcKey(Arc<Vec<u8>>);
137
138impl AsRef<[u8]> for RcKey {
139 fn as_ref(&self) -> &[u8] {
140 self.0.as_ref()
141 }
142}
143
144impl Borrow<[u8]> for RcKey {
145 fn borrow(&self) -> &[u8] {
146 self.as_ref()
147 }
148}
149
150impl From<Vec<u8>> for RcKey {
151 fn from(key: Vec<u8>) -> Self {
152 Self(key.into())
153 }
154}
155
156#[derive(Debug, Default)]
158struct Commit {
159 id: u64,
162 bytes: usize,
165 changeset: CommitChangeSet,
167}
168
169#[derive(Debug, Default)]
171struct CommitQueue {
172 record_id: u64,
174 bytes: usize,
176 commits: VecDeque<Commit>,
178}
179
180#[derive(Debug)]
181struct Trees {
182 readers: HashMap<Key, Weak<RwLock<Box<dyn TreeReader + Send + Sync>>>, IdentityBuildHasher>,
183 to_dereference: HashMap<Key, usize>,
185}
186
187#[derive(Debug)]
188struct DbInner {
189 columns: Vec<Column>,
190 options: Options,
191 shutdown: AtomicBool,
192 log: Log,
193 commit_queue: Mutex<CommitQueue>,
194 commit_queue_full_cv: Condvar,
195 log_worker_wait: WaitCondvar<bool>,
196 commit_worker_wait: Arc<WaitCondvar<bool>>,
197 commit_overlay: RwLock<Vec<CommitOverlay>>,
199 trees: RwLock<HashMap<ColId, Trees>>,
200 log_queue_wait: WaitCondvar<i64>,
202 flush_worker_wait: Arc<WaitCondvar<bool>>,
203 cleanup_worker_wait: WaitCondvar<bool>,
204 cleanup_queue_wait: WaitCondvar<bool>,
205 iteration_lock: Mutex<()>,
206 last_enacted: AtomicU64,
207 next_reindex: AtomicU64,
208 bg_err: Mutex<Option<Arc<Error>>>,
209 db_version: u32,
210 lock_file: std::fs::File,
211}
212
213#[derive(Debug)]
214struct WaitCondvar<S> {
215 cv: Condvar,
216 work: Mutex<S>,
217}
218
219impl<S: Default> WaitCondvar<S> {
220 fn new() -> Self {
221 WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
222 }
223}
224
225impl WaitCondvar<bool> {
226 fn signal(&self) {
227 let mut work = self.work.lock();
228 *work = true;
229 self.cv.notify_one();
230 }
231
232 pub fn wait(&self) {
233 let mut work = self.work.lock();
234 while !*work {
235 self.cv.wait(&mut work)
236 }
237 *work = false;
238 }
239}
240
241impl DbInner {
242 fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
243 if opening_mode == OpeningMode::Create {
244 try_io!(std::fs::create_dir_all(&options.path));
245 } else if !options.path.is_dir() {
246 return Err(Error::DatabaseNotFound)
247 }
248
249 let mut lock_path: std::path::PathBuf = options.path.clone();
250 lock_path.push("lock");
251 let lock_file = try_io!(std::fs::OpenOptions::new()
252 .create(true)
253 .read(true)
254 .write(true)
255 .open(lock_path.as_path()));
256 lock_file.try_lock_exclusive().map_err(Error::Locked)?;
257
258 let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
259 let mut columns = Vec::with_capacity(metadata.columns.len());
260 let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
261 let log = Log::open(options)?;
262 let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
263 for c in 0..metadata.columns.len() {
264 let column = Column::open(c as ColId, options, &metadata)?;
265 commit_overlay.push(CommitOverlay::new());
266 columns.push(column);
267 }
268 log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
269 let mut options = options.clone();
270 if options.salt.is_none() {
271 options.salt = Some(metadata.salt);
272 }
273
274 Ok(DbInner {
275 columns,
276 options,
277 shutdown: AtomicBool::new(false),
278 log,
279 commit_queue: Mutex::new(Default::default()),
280 commit_queue_full_cv: Condvar::new(),
281 log_worker_wait: WaitCondvar::new(),
282 commit_worker_wait: Arc::new(WaitCondvar::new()),
283 commit_overlay: RwLock::new(commit_overlay),
284 trees: RwLock::new(Default::default()),
285 log_queue_wait: WaitCondvar::new(),
286 flush_worker_wait: Arc::new(WaitCondvar::new()),
287 cleanup_worker_wait: WaitCondvar::new(),
288 cleanup_queue_wait: WaitCondvar::new(),
289 iteration_lock: Mutex::new(()),
290 next_reindex: AtomicU64::new(1),
291 last_enacted: AtomicU64::new(last_enacted),
292 bg_err: Mutex::new(None),
293 db_version: metadata.version,
294 lock_file,
295 })
296 }
297
298 fn init_table_data(&mut self) -> Result<()> {
299 for column in &mut self.columns {
300 column.init_table_data()?;
301 }
302 Ok(())
303 }
304
305 fn get(&self, col: ColId, key: &[u8], external_call: bool) -> Result<Option<Value>> {
306 if self.options.columns[col as usize].multitree && external_call {
307 return Err(Error::InvalidConfiguration(
308 "get not supported for multitree columns.".to_string(),
309 ))
310 }
311 match &self.columns[col as usize] {
312 Column::Hash(column) => {
313 let key = column.hash_key(key);
314 let overlay = self.commit_overlay.read();
315 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
317 return Ok(v.map(|i| i.as_ref().to_vec()))
318 }
319 std::mem::drop(overlay);
320 let log = self.log.overlays();
322 Ok(column.get(&key, log)?.map(|(v, _rc)| v))
323 },
324 Column::Tree(column) => {
325 let overlay = self.commit_overlay.read();
326 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
327 return Ok(l.map(|i| i.as_ref().to_vec()))
328 }
329 std::mem::drop(overlay);
330 let log = self.log.overlays().read();
332 column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
333 },
334 }
335 }
336
337 fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
338 if self.options.columns[col as usize].multitree {
339 return Err(Error::InvalidConfiguration(
340 "get_size not supported for multitree columns.".to_string(),
341 ))
342 }
343 match &self.columns[col as usize] {
344 Column::Hash(column) => {
345 let key = column.hash_key(key);
346 let overlay = self.commit_overlay.read();
347 if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
349 return Ok(l)
350 }
351 let log = self.log.overlays();
353 column.get_size(&key, log)
354 },
355 Column::Tree(column) => {
356 let overlay = self.commit_overlay.read();
357 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
358 return Ok(l.map(|v| v.as_ref().len() as u32))
359 }
360 let log = self.log.overlays().read();
361 let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
362 Ok(l.map(|v| v.len() as u32))
363 },
364 }
365 }
366
367 fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
368 if !self.options.columns[col as usize].multitree {
369 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
370 }
371 if !self.options.columns[col as usize].append_only &&
372 !self.options.columns[col as usize].allow_direct_node_access
373 {
374 return Err(Error::InvalidConfiguration(
375 "get_root can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
376 ))
377 }
378 let value = self.get(col, key, false)?;
379 if let Some(data) = value {
380 return Ok(Some(unpack_node_data(data)?))
381 }
382 Ok(None)
383 }
384
385 fn get_node(
386 &self,
387 col: ColId,
388 node_address: NodeAddress,
389 external_call: bool,
390 ) -> Result<Option<(Vec<u8>, Children)>> {
391 if !self.options.columns[col as usize].multitree {
392 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
393 }
394 if !self.options.columns[col as usize].append_only &&
395 !self.options.columns[col as usize].allow_direct_node_access &&
396 external_call
397 {
398 return Err(Error::InvalidConfiguration(
399 "get_node can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
400 ))
401 }
402 match &self.columns[col as usize] {
403 Column::Hash(column) => {
404 let overlay = self.commit_overlay.read();
405 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
407 {
408 return Ok(Some(unpack_node_data(v.as_ref().to_vec())?))
409 }
410 let log = self.log.overlays();
411 let value = column.get_value(Address::from_u64(node_address), log)?;
412 if let Some(data) = value {
413 return Ok(Some(unpack_node_data(data)?))
414 }
415 Ok(None)
416 },
417 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
418 }
419 }
420
421 fn get_node_children(
422 &self,
423 col: ColId,
424 node_address: NodeAddress,
425 external_call: bool,
426 ) -> Result<Option<Children>> {
427 if !self.options.columns[col as usize].multitree {
428 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
429 }
430 if !self.options.columns[col as usize].append_only &&
431 !self.options.columns[col as usize].allow_direct_node_access &&
432 external_call
433 {
434 return Err(Error::InvalidConfiguration(
435 "get_node_children can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
436 ))
437 }
438 match &self.columns[col as usize] {
439 Column::Hash(column) => {
440 let overlay = self.commit_overlay.read();
441 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
443 {
444 return Ok(Some(unpack_node_children(v.as_ref())?))
445 }
446 let log = self.log.overlays();
447 let value = column.get_value(Address::from_u64(node_address), log)?;
448 if let Some(data) = value {
449 return Ok(Some(unpack_node_children(&data)?))
450 }
451 Ok(None)
452 },
453 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
454 }
455 }
456
457 fn get_tree(
458 &self,
459 db: &Arc<DbInner>,
460 col: ColId,
461 key: &[u8],
462 check_existence: bool,
463 ) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
464 if !self.options.columns[col as usize].multitree {
465 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
466 }
467 match &self.columns[col as usize] {
468 Column::Hash(column) => {
469 if check_existence {
472 let root = self.get(col, key, false).unwrap();
473 if root.is_none() {
474 return Ok(None)
475 }
476 }
477
478 let hash_key = column.hash_key(key);
479
480 let trees = self.trees.upgradable_read();
481
482 if let Some(column_trees) = trees.get(&col) {
483 if let Some(reader) = column_trees.readers.get(&hash_key) {
484 let reader = reader.upgrade();
485 if let Some(reader) = reader {
486 return Ok(Some(reader))
487 }
488 }
489 }
490
491 let mut trees = RwLockUpgradableReadGuard::upgrade(trees);
492
493 let column_trees = trees.entry(col).or_insert_with(|| Trees {
494 readers: Default::default(),
495 to_dereference: Default::default(),
496 });
497
498 let reader: Box<dyn TreeReader + Send + Sync> =
499 Box::new(DbTreeReader { db: db.clone(), col, key: hash_key });
500 let reader = Arc::new(RwLock::new(reader));
501
502 column_trees.readers.insert(hash_key, Arc::downgrade(&reader));
503
504 Ok(Some(reader))
505 },
506 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
507 }
508 }
509
510 fn btree_iter(&self, col: ColId) -> Result<BTreeIterator<'_>> {
511 match &self.columns[col as usize] {
512 Column::Hash(_column) =>
513 Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
514 Column::Tree(column) => {
515 let log = self.log.overlays();
516 BTreeIterator::new(column, col, log, &self.commit_overlay)
517 },
518 }
519 }
520
521 fn commit<I, K>(&self, tx: I) -> Result<()>
524 where
525 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
526 K: AsRef<[u8]>,
527 {
528 self.commit_changes(tx.into_iter().map(|(c, k, v)| {
529 (
530 c,
531 match v {
532 Some(v) => Operation::Set(k.as_ref().to_vec(), v),
533 None => Operation::Dereference(k.as_ref().to_vec()),
534 },
535 )
536 }))
537 }
538
539 fn commit_changes<I, V>(&self, tx: I) -> Result<()>
540 where
541 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, V>)>,
542 V: Into<RcValue>,
543 {
544 let mut commit: CommitChangeSet = Default::default();
545 for (col, change) in tx.into_iter() {
546 if self.options.columns[col as usize].btree_index {
547 commit
548 .btree_indexed
549 .entry(col)
550 .or_insert_with(|| BTreeChangeSet::new(col))
551 .push(change)?
552 } else if self.options.columns[col as usize].multitree {
553 match &self.columns[col as usize] {
554 Column::Hash(column) =>
555 match change {
556 Operation::Set(..) |
557 Operation::Reference(..) |
558 Operation::Dereference(..) =>
559 return Err(Error::InvalidConfiguration(
560 "Invalid operation for multitree column".to_string(),
561 )),
562 Operation::InsertTree(..) => {
563 let (root_data, node_values) = column.claim_tree_values(&change)?;
564
565 let trees = self.trees.read();
566 if let Some(column_trees) = trees.get(&col) {
567 for (hash, count) in &column_trees.to_dereference {
568 assert!(*count > 0);
569
570 let mut tree_active = false;
572 if let Some(reader) = column_trees.readers.get(hash) {
573 let reader = reader.upgrade();
574 if let Some(reader) = reader {
575 if reader.is_locked() {
576 tree_active = true;
577 }
578 }
579 }
580 if tree_active {
581 commit
582 .indexed
583 .entry(col)
584 .or_insert_with(|| IndexedChangeSet::new(col))
585 .used_trees
586 .insert(*hash);
587 }
588 }
589 }
590 drop(trees);
591
592 let root_operation = Operation::Set(change.key(), root_data);
593 commit
594 .indexed
595 .entry(col)
596 .or_insert_with(|| IndexedChangeSet::new(col))
597 .push(root_operation, &self.options, self.db_version)?;
598
599 for node_change in node_values {
600 commit
601 .indexed
602 .entry(col)
603 .or_insert_with(|| IndexedChangeSet::new(col))
604 .push_node_change(node_change);
605 }
606 },
607 Operation::ReferenceTree(..) => {
608 if !self.options.columns[col as usize].append_only {
609 let root_operation =
610 Operation::<&_, V>::Reference(change.key());
611 commit
612 .indexed
613 .entry(col)
614 .or_insert_with(|| IndexedChangeSet::new(col))
615 .push(root_operation, &self.options, self.db_version)?;
616 }
617 },
618 Operation::DereferenceTree(key) => {
619 if self.options.columns[col as usize].append_only {
620 return Err(Error::InvalidConfiguration("Attempting to dereference a tree from an append_only column.".to_string()))
621 }
622 let value = self.get(col, &key, false)?;
623 if let Some(data) = value {
624 let root_data = unpack_node_data(data)?;
625 let children = root_data.1;
626 let salt = self.options.salt.unwrap_or_default();
627 let hash = hash_key(
628 &key,
629 &salt,
630 self.options.columns[col as usize].uniform,
631 self.db_version,
632 );
633
634 let mut trees = self.trees.write();
635
636 let column_trees = trees.entry(col).or_insert_with(|| Trees {
637 readers: Default::default(),
638 to_dereference: Default::default(),
639 });
640 let count =
641 column_trees.to_dereference.get(&hash).unwrap_or(&0) + 1;
642 column_trees.to_dereference.insert(hash, count);
643
644 drop(trees);
645
646 commit.check_for_deferral = true;
647
648 let node_change =
649 NodeChange::DereferenceChildren(key, hash, children);
650
651 commit
652 .indexed
653 .entry(col)
654 .or_insert_with(|| IndexedChangeSet::new(col))
655 .push_node_change(node_change);
656 } else {
657 return Err(Error::InvalidConfiguration(
658 "No entry for tree root".to_string(),
659 ))
660 }
661 },
662 },
663 Column::Tree(_) =>
664 return Err(Error::InvalidConfiguration("Not a HashColumn".to_string())),
665 }
666 } else {
667 commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
668 change,
669 &self.options,
670 self.db_version,
671 )?
672 }
673 }
674
675 self.commit_raw(commit)
676 }
677
678 fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
679 let mut queue = self.commit_queue.lock();
680
681 #[cfg(any(test, feature = "instrumentation"))]
682 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
683 #[cfg(not(any(test, feature = "instrumentation")))]
684 let might_wait_because_the_queue_is_full = true;
685 if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
686 log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
687 self.commit_queue_full_cv.wait(&mut queue);
688 }
689
690 {
691 let bg_err = self.bg_err.lock();
692 if let Some(err) = &*bg_err {
693 return Err(Error::Background(err.clone()))
694 }
695 }
696
697 let mut overlay = self.commit_overlay.write();
698
699 queue.record_id += 1;
700 let record_id = queue.record_id;
701
702 let mut bytes = 0;
703 for (c, indexed) in &commit.indexed {
704 indexed.copy_to_overlay(
705 &mut overlay[*c as usize],
706 record_id,
707 &mut bytes,
708 &self.options,
709 )?;
710 }
711
712 for (c, iterset) in &commit.btree_indexed {
713 iterset.copy_to_overlay(
714 &mut overlay[*c as usize].btree_indexed,
715 record_id,
716 &mut bytes,
717 &self.options,
718 )?;
719 }
720
721 let commit = Commit { id: record_id, changeset: commit, bytes };
722
723 log::debug!(
724 target: "parity-db",
725 "Queued commit {}, {} bytes",
726 commit.id,
727 bytes,
728 );
729 queue.commits.push_back(commit);
730 queue.bytes += bytes;
731 self.log_worker_wait.signal();
732 Ok(())
733 }
734
735 fn defer_commit(
736 &self,
737 mut queue: MutexGuard<CommitQueue>,
738 mut commit: CommitChangeSet,
739 old_bytes: usize,
740 old_id: u64,
741 new_id: Option<u64>,
742 ) -> Result<()> {
743 let record_id = if let Some(id) = new_id {
744 id
745 } else {
746 queue.record_id += 1;
747 queue.record_id
748 };
749
750 let bytes = if record_id != old_id {
751 let mut overlay = self.commit_overlay.write();
752
753 let mut bytes = 0;
754
755 for (c, indexed) in &commit.indexed {
756 indexed.copy_to_overlay(
757 &mut overlay[*c as usize],
758 record_id,
759 &mut bytes,
760 &self.options,
761 )?;
762 }
763
764 for (c, iterset) in &commit.btree_indexed {
765 iterset.copy_to_overlay(
766 &mut overlay[*c as usize].btree_indexed,
767 record_id,
768 &mut bytes,
769 &self.options,
770 )?;
771 }
772
773 {
774 for (c, key_values) in commit.indexed.iter() {
776 key_values.clean_overlay(&mut overlay[*c as usize], old_id);
777 }
778 for (c, iterset) in commit.btree_indexed.iter_mut() {
779 iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, old_id);
780 }
781 }
782
783 bytes
784 } else {
785 old_bytes
786 };
787
788 let commit = Commit { id: record_id, changeset: commit, bytes };
789
790 log::debug!(
791 target: "parity-db",
792 "Deferred commit, old id: {}, new id: {}",
793 old_id,
794 record_id,
795 );
796 queue.commits.push_back(commit);
797 queue.bytes += bytes;
798 Ok(())
799 }
800
801 fn process_commits(&self, db: &Arc<DbInner>) -> Result<bool> {
802 #[cfg(any(test, feature = "instrumentation"))]
803 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
804 #[cfg(not(any(test, feature = "instrumentation")))]
805 let might_wait_because_the_queue_is_full = true;
806 if might_wait_because_the_queue_is_full {
807 let mut queue = self.log_queue_wait.work.lock();
809 if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
810 log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
811 self.log_queue_wait.cv.wait(&mut queue);
812 }
813 }
814 let commit = {
815 let mut queue = self.commit_queue.lock();
816 if let Some(commit) = queue.commits.pop_front() {
817 queue.bytes -= commit.bytes;
818 log::debug!(
819 target: "parity-db",
820 "Removed {}. Still queued commits {} bytes",
821 commit.bytes,
822 queue.bytes,
823 );
824 if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
825 (queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
826 {
827 log::debug!(
829 target: "parity-db",
830 "Waking up commit queue worker",
831 );
832 self.commit_queue_full_cv.notify_all();
833 }
834 Some(commit)
835 } else {
836 None
837 }
838 };
839
840 if let Some(mut commit) = commit {
841 if commit.changeset.check_for_deferral {
842 let mut defer = false;
843 'outer: for (col, key_values) in commit.changeset.indexed.iter() {
844 for change in &key_values.node_changes {
845 if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
846 let trees = self.trees.read();
849 if let Some(column_trees) = trees.get(&col) {
850 let mut tree_active = false;
851 if let Some(reader) = column_trees.readers.get(hash) {
852 let reader = reader.upgrade();
853 if let Some(reader) = reader {
854 if reader.is_locked() {
855 tree_active = true;
856 }
857 }
858 }
859 if tree_active {
860 defer = true;
861 break 'outer
862 }
863 }
864 drop(trees);
865
866 let queue = self.commit_queue.lock();
869 for commit in &queue.commits {
870 for (_col, change_set) in &commit.changeset.indexed {
871 for tree in &change_set.used_trees {
872 if tree == hash {
873 defer = true;
874 break 'outer
875 }
876 }
877 }
878 }
879 }
880 }
881 }
882 if defer {
883 let queue = self.commit_queue.lock();
884 let new_id = if queue.commits.len() > 0 {
885 None
887 } else {
888 Some(commit.id)
890 };
891 self.defer_commit(queue, commit.changeset, commit.bytes, commit.id, new_id)?;
892
893 return Ok(true)
894 } else {
895 for (col, key_values) in commit.changeset.indexed.iter() {
896 for change in &key_values.node_changes {
897 if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
898 let mut trees = self.trees.write();
899 if let Some(column_trees) = trees.get_mut(&col) {
900 let count = column_trees.to_dereference.get(hash).unwrap_or(&0);
901 assert!(*count > 0);
902 if *count == 1 {
903 column_trees.to_dereference.remove(hash);
904 } else {
905 column_trees.to_dereference.insert(*hash, count - 1);
906 }
907 }
908 }
909 }
910 }
911 }
912 }
913
914 let mut reindex = false;
915 let mut writer = self.log.begin_record();
916 log::debug!(
917 target: "parity-db",
918 "Processing commit {}, record {}, {} bytes",
919 commit.id,
920 writer.record_id(),
921 commit.bytes,
922 );
923 let mut ops: u64 = 0;
924 for (c, key_values) in commit.changeset.indexed.iter() {
925 key_values.write_plan(
926 db,
927 *c,
928 &self.columns[*c as usize],
929 &mut writer,
930 &mut ops,
931 &mut reindex,
932 )?;
933 }
934
935 for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
936 match &self.columns[*c as usize] {
937 Column::Hash(_column) =>
938 return Err(Error::InvalidConfiguration(
939 "Not an indexed column.".to_string(),
940 )),
941 Column::Tree(column) => {
942 btree.write_plan(column, &mut writer, &mut ops)?;
943 },
944 }
945 }
946
947 for c in self.columns.iter() {
949 c.complete_plan(&mut writer)?;
950 }
951 let record_id = writer.record_id();
952 let l = writer.drain();
953
954 let bytes = {
955 let bytes = self.log.end_record(l)?;
956 let mut logged_bytes = self.log_queue_wait.work.lock();
957 *logged_bytes += bytes as i64;
958 self.flush_worker_wait.signal();
959 bytes
960 };
961
962 {
963 let mut overlay = self.commit_overlay.write();
965 for (c, key_values) in commit.changeset.indexed.iter() {
966 key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
967 }
968 for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
969 iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
970 }
971 }
972
973 if reindex {
974 self.start_reindex(record_id);
975 }
976
977 log::debug!(
978 target: "parity-db",
979 "Processed commit {} (record {}), {} ops, {} bytes written",
980 commit.id,
981 record_id,
982 ops,
983 bytes,
984 );
985 Ok(true)
986 } else {
987 Ok(false)
988 }
989 }
990
991 fn start_reindex(&self, record_id: u64) {
992 log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
993 self.next_reindex.store(record_id, Ordering::SeqCst);
994 }
995
996 fn process_reindex(&self) -> Result<bool> {
997 let next_reindex = self.next_reindex.load(Ordering::SeqCst);
998 if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
999 return Ok(false)
1000 }
1001 for column in self.columns.iter() {
1003 let column = if let Column::Hash(c) = column { c } else { continue };
1004 let ReindexBatch {
1005 drop_index,
1006 batch,
1007 drop_ref_count,
1008 ref_count_batch,
1009 ref_count_batch_source,
1010 } = column.reindex(&self.log)?;
1011 if !batch.is_empty() || drop_index.is_some() {
1012 debug_assert!(
1013 ref_count_batch.is_empty() &&
1014 ref_count_batch_source.is_none() &&
1015 drop_ref_count.is_none()
1016 );
1017 let mut next_reindex = false;
1018 let mut writer = self.log.begin_record();
1019 log::debug!(
1020 target: "parity-db",
1021 "Creating reindex record {}",
1022 writer.record_id(),
1023 );
1024 for (key, address) in batch.into_iter() {
1025 if let PlanOutcome::NeedReindex =
1026 column.write_reindex_plan(&key, address, &mut writer)?
1027 {
1028 next_reindex = true
1029 }
1030 }
1031 if let Some(table) = drop_index {
1032 writer.drop_table(table);
1033 }
1034 let record_id = writer.record_id();
1035 let l = writer.drain();
1036
1037 let mut logged_bytes = self.log_queue_wait.work.lock();
1038 let bytes = self.log.end_record(l)?;
1039 log::debug!(
1040 target: "parity-db",
1041 "Created reindex record {}, {} bytes",
1042 record_id,
1043 bytes,
1044 );
1045 *logged_bytes += bytes as i64;
1046 if next_reindex {
1047 self.start_reindex(record_id);
1048 }
1049 self.flush_worker_wait.signal();
1050 return Ok(true)
1051 }
1052 if !ref_count_batch.is_empty() || drop_ref_count.is_some() {
1053 debug_assert!(batch.is_empty() && drop_index.is_none());
1054 debug_assert!(ref_count_batch_source.is_some());
1055 let ref_count_source = ref_count_batch_source.unwrap();
1056 let mut next_reindex = false;
1057 let mut writer = self.log.begin_record();
1058 log::debug!(
1059 target: "parity-db",
1060 "Creating ref count reindex record {}",
1061 writer.record_id(),
1062 );
1063 for (address, ref_count) in ref_count_batch.into_iter() {
1064 if let PlanOutcome::NeedReindex = column.write_ref_count_reindex_plan(
1065 address,
1066 ref_count,
1067 ref_count_source,
1068 &mut writer,
1069 )? {
1070 next_reindex = true
1071 }
1072 }
1073 if let Some(table) = drop_ref_count {
1074 writer.drop_ref_count_table(table);
1075 }
1076 let record_id = writer.record_id();
1077 let l = writer.drain();
1078
1079 let mut logged_bytes = self.log_queue_wait.work.lock();
1080 let bytes = self.log.end_record(l)?;
1081 log::debug!(
1082 target: "parity-db",
1083 "Created ref count reindex record {}, {} bytes",
1084 record_id,
1085 bytes,
1086 );
1087 *logged_bytes += bytes as i64;
1088 if next_reindex {
1089 self.start_reindex(record_id);
1090 }
1091 self.flush_worker_wait.signal();
1092 return Ok(true)
1093 }
1094 }
1095 self.next_reindex.store(0, Ordering::SeqCst);
1096 Ok(false)
1097 }
1098
1099 fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
1100 let _iteration_lock = self.iteration_lock.lock();
1101 let cleared = {
1102 let reader = match self.log.read_next(validation_mode) {
1103 Ok(reader) => reader,
1104 Err(Error::Corruption(_)) if validation_mode => {
1105 log::debug!(target: "parity-db", "Bad log header");
1106 self.log.clear_replay_logs();
1107 return Ok(false)
1108 },
1109 Err(e) => return Err(e),
1110 };
1111 if let Some(mut reader) = reader {
1112 log::debug!(
1113 target: "parity-db",
1114 "Enacting log record {}",
1115 reader.record_id(),
1116 );
1117 if validation_mode {
1118 if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
1119 log::warn!(
1120 target: "parity-db",
1121 "Log sequence error. Expected record {}, got {}",
1122 self.last_enacted.load(Ordering::Relaxed) + 1,
1123 reader.record_id(),
1124 );
1125 drop(reader);
1126 self.log.clear_replay_logs();
1127 return Ok(false)
1128 }
1129 loop {
1131 let next = match reader.next() {
1132 Ok(next) => next,
1133 Err(e) => {
1134 log::debug!(target: "parity-db", "Error reading log: {:?}", e);
1135 return Ok(false)
1136 },
1137 };
1138 match next {
1139 LogAction::BeginRecord => {
1140 log::debug!(target: "parity-db", "Unexpected log header");
1141 drop(reader);
1142 self.log.clear_replay_logs();
1143 return Ok(false)
1144 },
1145 LogAction::EndRecord => break,
1146 LogAction::InsertIndex(insertion) => {
1147 let col = insertion.table.col() as usize;
1148 if let Err(e) = self.columns.get(col).map_or_else(
1149 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1150 |col| {
1151 col.validate_plan(
1152 LogAction::InsertIndex(insertion),
1153 &mut reader,
1154 )
1155 },
1156 ) {
1157 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1158 drop(reader);
1159 self.log.clear_replay_logs();
1160 return Ok(false)
1161 }
1162 },
1163 LogAction::InsertValue(insertion) => {
1164 let col = insertion.table.col() as usize;
1165 if let Err(e) = self.columns.get(col).map_or_else(
1166 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1167 |col| {
1168 col.validate_plan(
1169 LogAction::InsertValue(insertion),
1170 &mut reader,
1171 )
1172 },
1173 ) {
1174 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1175 drop(reader);
1176 self.log.clear_replay_logs();
1177 return Ok(false)
1178 }
1179 },
1180 LogAction::InsertRefCount(insertion) => {
1181 let col = insertion.table.col() as usize;
1182 if let Err(e) = self.columns.get(col).map_or_else(
1183 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1184 |col| {
1185 col.validate_plan(
1186 LogAction::InsertRefCount(insertion),
1187 &mut reader,
1188 )
1189 },
1190 ) {
1191 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1192 drop(reader);
1193 self.log.clear_replay_logs();
1194 return Ok(false)
1195 }
1196 },
1197 LogAction::DropTable(_) | LogAction::DropRefCountTable(_) => continue,
1198 }
1199 }
1200 reader.reset()?;
1201 reader.next()?;
1202 }
1203 loop {
1204 match reader.next()? {
1205 LogAction::BeginRecord =>
1206 return Err(Error::Corruption("Bad log record".into())),
1207 LogAction::EndRecord => break,
1208 LogAction::InsertIndex(insertion) => {
1209 self.columns[insertion.table.col() as usize]
1210 .enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
1211 },
1212 LogAction::InsertValue(insertion) => {
1213 self.columns[insertion.table.col() as usize]
1214 .enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
1215 },
1216 LogAction::InsertRefCount(insertion) => {
1217 self.columns[insertion.table.col() as usize]
1218 .enact_plan(LogAction::InsertRefCount(insertion), &mut reader)?;
1219 },
1220 LogAction::DropTable(id) => {
1221 log::debug!(
1222 target: "parity-db",
1223 "Dropping index {}",
1224 id,
1225 );
1226 match &self.columns[id.col() as usize] {
1227 Column::Hash(col) => {
1228 col.drop_index(id)?;
1229 self.start_reindex(reader.record_id());
1231 },
1232 Column::Tree(_) => (),
1233 }
1234 },
1235 LogAction::DropRefCountTable(id) => {
1236 log::debug!(
1237 target: "parity-db",
1238 "Dropping ref count {}",
1239 id,
1240 );
1241 match &self.columns[id.col() as usize] {
1242 Column::Hash(col) => {
1243 col.drop_ref_count(id)?;
1244 self.start_reindex(reader.record_id());
1246 },
1247 Column::Tree(_) => (),
1248 }
1249 },
1250 }
1251 }
1252 log::debug!(
1253 target: "parity-db",
1254 "Enacted log record {}, {} bytes",
1255 reader.record_id(),
1256 reader.read_bytes(),
1257 );
1258 let record_id = reader.record_id();
1259 let bytes = reader.read_bytes();
1260 let cleared = reader.drain();
1261 self.last_enacted.store(record_id, Ordering::SeqCst);
1262 Some((record_id, cleared, bytes))
1263 } else {
1264 log::debug!(target: "parity-db", "End of log");
1265 None
1266 }
1267 };
1268
1269 if let Some((record_id, cleared, bytes)) = cleared {
1270 self.log.end_read(cleared, record_id);
1271 {
1272 if !validation_mode {
1273 let mut queue = self.log_queue_wait.work.lock();
1274 if *queue < bytes as i64 {
1275 log::warn!(
1276 target: "parity-db",
1277 "Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
1278 record_id,
1279 bytes,
1280 *queue,
1281 self.next_reindex.load(Ordering::SeqCst),
1282 );
1283 }
1284 *queue -= bytes as i64;
1285 if *queue <= MAX_LOG_QUEUE_BYTES &&
1286 (*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
1287 {
1288 self.log_queue_wait.cv.notify_one();
1289 }
1290 log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
1291 }
1292
1293 let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
1294 let dirty_logs = self.log.num_dirty_logs();
1295 if !validation_mode {
1296 while self.log.num_dirty_logs() > max_logs {
1297 log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
1298 self.cleanup_queue_wait.wait();
1299 }
1300 }
1301 }
1302 Ok(true)
1303 } else {
1304 Ok(false)
1305 }
1306 }
1307
1308 fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
1309 let has_flushed = self.log.flush_one(min_log_size)?;
1310 if has_flushed {
1311 self.commit_worker_wait.signal();
1312 }
1313 Ok(has_flushed)
1314 }
1315
1316 fn clean_logs(&self) -> Result<bool> {
1317 let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
1318 let num_cleanup = self.log.num_dirty_logs();
1319 let result = if num_cleanup > keep_logs {
1320 if self.options.sync_data {
1321 for c in self.columns.iter() {
1322 c.flush()?;
1323 }
1324 }
1325 self.log.clean_logs(num_cleanup - keep_logs)?
1326 } else {
1327 false
1328 };
1329 self.cleanup_queue_wait.signal();
1330 Ok(result)
1331 }
1332
1333 fn clean_all_logs(&self) -> Result<()> {
1334 for c in self.columns.iter() {
1335 c.flush()?;
1336 }
1337 let num_cleanup = self.log.num_dirty_logs();
1338 self.log.clean_logs(num_cleanup)?;
1339 Ok(())
1340 }
1341
1342 fn replay_all_logs(&self) -> Result<()> {
1343 while let Some(id) = self.log.replay_next()? {
1344 log::debug!(target: "parity-db", "Replaying database log {}", id);
1345 while self.enact_logs(true)? {}
1346 }
1347
1348 for c in self.columns.iter() {
1350 c.refresh_metadata()?;
1351 }
1352 log::debug!(target: "parity-db", "Replay is complete.");
1353 Ok(())
1354 }
1355
1356 fn shutdown(&self) {
1357 self.shutdown.store(true, Ordering::SeqCst);
1358 self.log_queue_wait.cv.notify_one();
1359 self.flush_worker_wait.signal();
1360 self.log_worker_wait.signal();
1361 self.commit_worker_wait.signal();
1362 self.cleanup_worker_wait.signal();
1363 }
1364
1365 fn kill_logs(&self, db: &Arc<DbInner>) -> Result<()> {
1366 {
1367 if let Some(err) = self.bg_err.lock().as_ref() {
1368 log::debug!(target: "parity-db", "Shutdown with error state {}", err);
1371 self.log.clean_logs(self.log.num_dirty_logs())?;
1372 return Ok(())
1373 }
1374 }
1375 log::debug!(target: "parity-db", "Processing leftover commits");
1376 while self.enact_logs(false)? {}
1378 self.flush_logs(0)?;
1379 while self.process_commits(db)? {}
1380 while self.enact_logs(false)? {}
1381 self.flush_logs(0)?;
1382 while self.enact_logs(false)? {}
1383 self.clean_all_logs()?;
1384 self.log.kill_logs()?;
1385 if self.options.stats {
1386 let mut path = self.options.path.clone();
1387 path.push("stats.txt");
1388 match std::fs::File::create(path) {
1389 Ok(file) => {
1390 let mut writer = std::io::BufWriter::new(file);
1391 if let Err(e) = self.write_stats_text(&mut writer, None) {
1392 log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
1393 }
1394 },
1395 Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
1396 }
1397 }
1398 Ok(())
1399 }
1400
1401 fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
1402 if let Some(col) = column {
1403 self.columns[col as usize].write_stats_text(writer)
1404 } else {
1405 for c in self.columns.iter() {
1406 c.write_stats_text(writer)?;
1407 }
1408 Ok(())
1409 }
1410 }
1411
1412 fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1413 if let Some(col) = column {
1414 self.columns[col as usize].clear_stats()
1415 } else {
1416 for c in self.columns.iter() {
1417 c.clear_stats()?;
1418 }
1419 Ok(())
1420 }
1421 }
1422
1423 fn stats(&self) -> StatSummary {
1424 StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
1425 }
1426
1427 fn store_err(&self, result: Result<()>) {
1428 if let Err(e) = result {
1429 log::warn!(target: "parity-db", "Background worker error: {}", e);
1430 let mut err = self.bg_err.lock();
1431 if err.is_none() {
1432 *err = Some(Arc::new(e));
1433 self.shutdown();
1434 }
1435 self.commit_queue_full_cv.notify_all();
1436 }
1437 }
1438
1439 fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1440 let _lock = self.iteration_lock.lock();
1441 match &self.columns[c as usize] {
1442 Column::Hash(column) => column.iter_values(&self.log, f),
1443 Column::Tree(_) => unimplemented!(),
1444 }
1445 }
1446
1447 fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
1448 let _lock = self.iteration_lock.lock();
1449 match &self.columns[c as usize] {
1450 Column::Hash(column) => column.iter_index(&self.log, f),
1451 Column::Tree(_) => unimplemented!(),
1452 }
1453 }
1454}
1455
1456pub struct Db {
1458 inner: Arc<DbInner>,
1459 commit_thread: Option<thread::JoinHandle<()>>,
1460 flush_thread: Option<thread::JoinHandle<()>>,
1461 log_thread: Option<thread::JoinHandle<()>>,
1462 cleanup_thread: Option<thread::JoinHandle<()>>,
1463}
1464
1465impl Db {
1466 #[cfg(test)]
1467 pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
1468 let options = Options::with_columns(path, num_columns);
1469 Self::open_inner(&options, OpeningMode::Create)
1470 }
1471
1472 pub fn open(options: &Options) -> Result<Db> {
1475 Self::open_inner(options, OpeningMode::Write)
1476 }
1477
1478 pub fn open_or_create(options: &Options) -> Result<Db> {
1481 Self::open_inner(options, OpeningMode::Create)
1482 }
1483
1484 pub fn open_read_only(options: &Options) -> Result<Db> {
1486 Self::open_inner(options, OpeningMode::ReadOnly)
1487 }
1488
1489 fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
1490 assert!(options.is_valid());
1491 let mut db = DbInner::open(options, opening_mode)?;
1492 if let Err(e) = db.replay_all_logs() {
1495 log::debug!(target: "parity-db", "Error during log replay.");
1496 return Err(e)
1497 } else {
1498 db.log.clear_replay_logs();
1499 db.clean_all_logs()?;
1500 db.log.kill_logs()?;
1501 }
1502 db.init_table_data()?;
1503 let db = Arc::new(db);
1504 #[cfg(any(test, feature = "instrumentation"))]
1505 let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
1506 #[cfg(not(any(test, feature = "instrumentation")))]
1507 let start_threads = opening_mode != OpeningMode::ReadOnly;
1508 let commit_thread = if start_threads {
1509 let commit_worker_db = db.clone();
1510 Some(thread::spawn(move || {
1511 commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
1512 }))
1513 } else {
1514 None
1515 };
1516 let flush_thread = if start_threads {
1517 let flush_worker_db = db.clone();
1518 #[cfg(any(test, feature = "instrumentation"))]
1519 let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
1520 #[cfg(not(any(test, feature = "instrumentation")))]
1521 let min_log_size = MIN_LOG_SIZE_BYTES;
1522 Some(thread::spawn(move || {
1523 flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
1524 }))
1525 } else {
1526 None
1527 };
1528 let log_thread = if start_threads {
1529 let log_worker_db = db.clone();
1530 Some(thread::spawn(move || {
1531 log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
1532 }))
1533 } else {
1534 None
1535 };
1536 let cleanup_thread = if start_threads {
1537 let cleanup_worker_db = db.clone();
1538 Some(thread::spawn(move || {
1539 cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
1540 }))
1541 } else {
1542 None
1543 };
1544 Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
1545 }
1546
1547 pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
1549 self.inner.get(col, key, true)
1550 }
1551
1552 pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
1554 self.inner.get_size(col, key)
1555 }
1556
1557 pub fn iter(&self, col: ColId) -> Result<BTreeIterator<'_>> {
1560 self.inner.btree_iter(col)
1561 }
1562
1563 pub fn get_tree(
1564 &self,
1565 col: ColId,
1566 key: &[u8],
1567 ) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
1568 self.inner.get_tree(&self.inner, col, key, true)
1569 }
1570
1571 pub fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
1572 self.inner.get_root(col, key)
1573 }
1574
1575 pub fn get_node(
1576 &self,
1577 col: ColId,
1578 node_address: NodeAddress,
1579 ) -> Result<Option<(Vec<u8>, Children)>> {
1580 self.inner.get_node(col, node_address, true)
1581 }
1582
1583 pub fn get_node_children(
1584 &self,
1585 col: ColId,
1586 node_address: NodeAddress,
1587 ) -> Result<Option<Children>> {
1588 self.inner.get_node_children(col, node_address, true)
1589 }
1590
1591 pub fn commit<I, K>(&self, tx: I) -> Result<()>
1593 where
1594 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1595 K: AsRef<[u8]>,
1596 {
1597 self.inner.commit(tx)
1598 }
1599
1600 pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1602 where
1603 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1604 {
1605 self.inner.commit_changes(tx)
1606 }
1607
1608 #[cfg(feature = "arc")]
1612 #[deprecated(
1613 note = "This method will be removed in future versions. Use `commit_changes_bytes` instead"
1614 )]
1615 pub fn commit_changes_shared<I>(&self, tx: I) -> Result<()>
1616 where
1617 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Arc<Vec<u8>>>)>,
1618 {
1619 self.inner.commit_changes(tx)
1620 }
1621
1622 #[cfg(feature = "bytes")]
1626 pub fn commit_changes_bytes<I>(&self, tx: I) -> Result<()>
1627 where
1628 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Bytes>)>,
1629 {
1630 self.inner.commit_changes(tx)
1631 }
1632
1633 pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1634 self.inner.commit_raw(commit)
1635 }
1636
1637 pub fn num_columns(&self) -> u8 {
1639 self.inner.columns.len() as u8
1640 }
1641
1642 pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1646 self.inner.iter_column_while(c, f)
1647 }
1648
1649 pub(crate) fn iter_column_index_while(
1654 &self,
1655 c: ColId,
1656 f: impl FnMut(IterState) -> bool,
1657 ) -> Result<()> {
1658 self.inner.iter_column_index_while(c, f)
1659 }
1660
1661 fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1662 let mut more_work = false;
1663 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1664 if !more_work {
1665 db.cleanup_worker_wait.signal();
1666 if !db.log.has_log_files_to_read() {
1667 db.commit_worker_wait.wait();
1668 }
1669 }
1670
1671 more_work = db.enact_logs(false)?;
1672 }
1673 log::debug!(target: "parity-db", "Commit worker shutdown");
1674 Ok(())
1675 }
1676
1677 fn log_worker(db: Arc<DbInner>) -> Result<()> {
1678 let mut more_reindex = db.process_reindex()?;
1680 let mut more_commits = false;
1681 while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1683 if !more_commits && !more_reindex {
1684 db.log_worker_wait.wait();
1685 }
1686
1687 more_commits = db.process_commits(&db)?;
1688 more_reindex = db.process_reindex()?;
1689 }
1690 log::debug!(target: "parity-db", "Log worker shutdown");
1691 Ok(())
1692 }
1693
1694 fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1695 let mut more_work = false;
1696 while !db.shutdown.load(Ordering::SeqCst) {
1697 if !more_work {
1698 db.flush_worker_wait.wait();
1699 }
1700 more_work = db.flush_logs(min_log_size)?;
1701 }
1702 log::debug!(target: "parity-db", "Flush worker shutdown");
1703 Ok(())
1704 }
1705
1706 fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1707 let mut more_work = true;
1708 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1709 if !more_work {
1710 db.cleanup_worker_wait.wait();
1711 }
1712 more_work = db.clean_logs()?;
1713 }
1714 log::debug!(target: "parity-db", "Cleanup worker shutdown");
1715 Ok(())
1716 }
1717
1718 pub fn write_stats_text(
1720 &self,
1721 writer: &mut impl std::io::Write,
1722 column: Option<u8>,
1723 ) -> Result<()> {
1724 self.inner.write_stats_text(writer, column)
1725 }
1726
1727 pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1729 self.inner.clear_stats(column)
1730 }
1731
1732 pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1734 if let Some(col) = check_param.column {
1735 self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1736 } else {
1737 for (ix, c) in self.inner.columns.iter().enumerate() {
1738 c.dump(&self.inner.log, &check_param, ix as ColId)?;
1739 }
1740 }
1741 Ok(())
1742 }
1743
1744 pub fn stats(&self) -> StatSummary {
1746 self.inner.stats()
1747 }
1748
1749 pub fn get_num_column_value_entries(&self, col: ColId) -> Result<u64> {
1750 let column = &self.inner.columns[col as usize];
1751 match column {
1752 Column::Hash(column) => return column.get_num_value_entries(),
1753 Column::Tree(..) =>
1754 return Err(Error::InvalidConfiguration(
1755 "get_num_column_value_entries not implemented for tree columns.".to_string(),
1756 )),
1757 }
1758 }
1759
1760 fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1763 let db = Db::open(options)?;
1764 let salt = db.inner.options.salt;
1765 drop(db);
1766 Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1767 }
1768
1769 pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1771 let salt = Self::precheck_column_operation(options)?;
1772
1773 options.columns.push(new_column_options);
1774 options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1775
1776 Ok(())
1777 }
1778
1779 pub fn drop_last_column(options: &mut Options) -> Result<()> {
1782 let salt = Self::precheck_column_operation(options)?;
1783 let nb_column = options.columns.len();
1784 if nb_column == 0 {
1785 return Ok(())
1786 }
1787 let index = options.columns.len() - 1;
1788 Self::remove_column_files(options, index as u8)?;
1789 options.columns.pop();
1790 options.write_metadata(&options.path, &salt)?;
1791 Ok(())
1792 }
1793
1794 pub fn reset_column(
1797 options: &mut Options,
1798 index: u8,
1799 new_options: Option<ColumnOptions>,
1800 ) -> Result<()> {
1801 let salt = Self::precheck_column_operation(options)?;
1802 Self::remove_column_files(options, index)?;
1803
1804 if let Some(new_options) = new_options {
1805 options.columns[index as usize] = new_options;
1806 options.write_metadata(&options.path, &salt)?;
1807 }
1808
1809 Ok(())
1810 }
1811
1812 fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1813 if index as usize >= options.columns.len() {
1814 return Err(Error::IncompatibleColumnConfig {
1815 id: index,
1816 reason: "Column not found".to_string(),
1817 })
1818 }
1819
1820 Column::drop_files(index, options.path.clone())?;
1821 Ok(())
1822 }
1823
1824 #[cfg(feature = "instrumentation")]
1825 pub fn process_reindex(&self) -> Result<()> {
1826 self.inner.process_reindex()?;
1827 Ok(())
1828 }
1829
1830 #[cfg(feature = "instrumentation")]
1831 pub fn process_commits(&self) -> Result<()> {
1832 self.inner.process_commits(&self.inner)?;
1833 Ok(())
1834 }
1835
1836 #[cfg(feature = "instrumentation")]
1837 pub fn flush_logs(&self) -> Result<()> {
1838 self.inner.flush_logs(0)?;
1839 Ok(())
1840 }
1841
1842 #[cfg(feature = "instrumentation")]
1843 pub fn enact_logs(&self) -> Result<()> {
1844 while self.inner.enact_logs(false)? {}
1845 Ok(())
1846 }
1847
1848 #[cfg(feature = "instrumentation")]
1849 pub fn clean_logs(&self) -> Result<()> {
1850 self.inner.clean_logs()?;
1851 Ok(())
1852 }
1853}
1854
1855impl Drop for Db {
1856 fn drop(&mut self) {
1857 self.drop_inner()
1858 }
1859}
1860
1861impl Db {
1862 fn drop_inner(&mut self) {
1863 self.inner.shutdown();
1864 if let Some(t) = self.log_thread.take() {
1865 if let Err(e) = t.join() {
1866 log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1867 }
1868 }
1869 if let Some(t) = self.flush_thread.take() {
1870 if let Err(e) = t.join() {
1871 log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1872 }
1873 }
1874 if let Some(t) = self.commit_thread.take() {
1875 if let Err(e) = t.join() {
1876 log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1877 }
1878 }
1879 if let Some(t) = self.cleanup_thread.take() {
1880 if let Err(e) = t.join() {
1881 log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1882 }
1883 }
1884 if let Err(e) = self.inner.kill_logs(&self.inner) {
1885 log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1886 }
1887 if let Err(e) = fs2::FileExt::unlock(&self.inner.lock_file) {
1888 log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1889 }
1890 }
1891}
1892
1893pub trait TreeReader {
1896 fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>>;
1897 fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>>;
1898 fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>>;
1899}
1900
1901#[derive(Debug)]
1902pub struct DbTreeReader {
1903 db: Arc<DbInner>,
1904 col: ColId,
1905 key: Key,
1906}
1907
1908impl TreeReader for DbTreeReader {
1909 fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>> {
1910 match &self.db.columns[self.col as usize] {
1917 Column::Hash(column) => {
1918 let overlay = self.db.commit_overlay.read();
1919 let value = if let Some(v) =
1921 overlay.get(self.col as usize).and_then(|o| o.get(&self.key))
1922 {
1923 Ok(v.map(|i| i.as_ref().to_vec()))
1924 } else {
1925 let log = self.db.log.overlays();
1927 Ok(column.get(&self.key, log)?.map(|(v, _rc)| v))
1928 }?;
1929
1930 if let Some(data) = value {
1931 return unpack_node_data(data).map(|x| Some(x))
1932 }
1933
1934 return Ok(None)
1935 },
1936 Column::Tree(..) =>
1937 return Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
1938 };
1939 }
1940
1941 fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>> {
1942 self.db.get_node(self.col, node_address, false)
1943 }
1944
1945 fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>> {
1946 self.db.get_node_children(self.col, node_address, false)
1947 }
1948}
1949
1950pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1951pub type AddressCommitOverlay = HashMap<u64, (u64, RcValue)>;
1952pub type BTreeCommitOverlay = BTreeMap<RcKey, (u64, Option<RcValue>)>;
1953
1954#[derive(Debug)]
1955pub struct CommitOverlay {
1956 indexed: IndexedCommitOverlay,
1957 address: AddressCommitOverlay,
1958 btree_indexed: BTreeCommitOverlay,
1959}
1960
1961impl CommitOverlay {
1962 fn new() -> Self {
1963 CommitOverlay {
1964 indexed: Default::default(),
1965 address: Default::default(),
1966 btree_indexed: Default::default(),
1967 }
1968 }
1969
1970 #[cfg(test)]
1971 fn is_empty(&self) -> bool {
1972 self.indexed.is_empty() && self.address.is_empty() && self.btree_indexed.is_empty()
1973 }
1974}
1975
1976impl CommitOverlay {
1977 fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1978 self.indexed.get(key).map(|(_, v)| v.as_ref())
1979 }
1980
1981 fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1982 self.get_ref(key).map(|v| v.cloned())
1983 }
1984
1985 fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1986 self.get_ref(key).map(|res| res.as_ref().map(|b| b.as_ref().len() as u32))
1987 }
1988
1989 fn get_address(&self, address: u64) -> Option<RcValue> {
1990 self.address.get(&address).map(|(_, v)| v.clone())
1991 }
1992
1993 fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1994 self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1995 }
1996
1997 pub fn btree_next(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
1998 use crate::btree::LastKey;
1999 match &last_key {
2000 LastKey::Start => self
2001 .btree_indexed
2002 .range::<[u8], _>(..)
2003 .next()
2004 .map(|(k, (_, v))| (k.clone(), v.clone())),
2005 LastKey::End => None,
2006 LastKey::At(key) => self
2007 .btree_indexed
2008 .range::<[u8], _>((Bound::Excluded(key.as_slice()), Bound::Unbounded))
2009 .next()
2010 .map(|(k, (_, v))| (k.clone(), v.clone())),
2011 LastKey::Seeked(key) => self
2012 .btree_indexed
2013 .range::<[u8], _>((Bound::Included(key.as_slice()), Bound::Unbounded))
2014 .next()
2015 .map(|(k, (_, v))| (k.clone(), v.clone())),
2016 }
2017 }
2018
2019 pub fn btree_prev(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
2020 use crate::btree::LastKey;
2021 match &last_key {
2022 LastKey::End => self
2023 .btree_indexed
2024 .range::<[u8], _>(..)
2025 .rev()
2026 .next()
2027 .map(|(k, (_, v))| (k.clone(), v.clone())),
2028 LastKey::Start => None,
2029 LastKey::At(key) => self
2030 .btree_indexed
2031 .range::<[u8], _>((Bound::Unbounded, Bound::Excluded(key.as_slice())))
2032 .rev()
2033 .next()
2034 .map(|(k, (_, v))| (k.clone(), v.clone())),
2035 LastKey::Seeked(key) => self
2036 .btree_indexed
2037 .range::<[u8], _>((Bound::Unbounded, Bound::Included(key.as_slice())))
2038 .rev()
2039 .next()
2040 .map(|(k, (_, v))| (k.clone(), v.clone())),
2041 }
2042 }
2043}
2044
2045#[derive(Debug, PartialEq, Eq)]
2048pub enum Operation<Key, Value> {
2049 Set(Key, Value),
2051
2052 Dereference(Key),
2056
2057 Reference(Key),
2060
2061 InsertTree(Key, NewNode),
2063
2064 ReferenceTree(Key),
2066
2067 DereferenceTree(Key),
2070}
2071
2072impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
2073 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2074 Some(self.cmp(other))
2075 }
2076}
2077
2078impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
2079 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2080 self.key().cmp(other.key())
2081 }
2082}
2083
2084impl<Key, Value> Operation<Key, Value> {
2085 pub fn key(&self) -> &Key {
2086 match self {
2087 Operation::Set(k, _) |
2088 Operation::Dereference(k) |
2089 Operation::Reference(k) |
2090 Operation::InsertTree(k, _) |
2091 Operation::ReferenceTree(k) |
2092 Operation::DereferenceTree(k) => k,
2093 }
2094 }
2095
2096 pub fn into_key(self) -> Key {
2097 match self {
2098 Operation::Set(k, _) |
2099 Operation::Dereference(k) |
2100 Operation::Reference(k) |
2101 Operation::InsertTree(k, _) |
2102 Operation::ReferenceTree(k) |
2103 Operation::DereferenceTree(k) => k,
2104 }
2105 }
2106}
2107
2108impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
2109 pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
2110 match self {
2111 Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
2112 Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
2113 Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
2114 Operation::InsertTree(k, n) => Operation::InsertTree(k.as_ref().to_vec(), n),
2115 Operation::ReferenceTree(k) => Operation::ReferenceTree(k.as_ref().to_vec()),
2116 Operation::DereferenceTree(k) => Operation::DereferenceTree(k.as_ref().to_vec()),
2117 }
2118 }
2119}
2120
2121#[derive(Debug, PartialEq, Eq)]
2122pub enum NodeChange {
2123 NewValue(u64, RcValue),
2125 IncrementReference(u64),
2127 DereferenceChildren(Vec<u8>, Key, Children),
2129}
2130
2131#[derive(Debug, Default)]
2132pub struct CommitChangeSet {
2133 pub indexed: HashMap<ColId, IndexedChangeSet>,
2134 pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
2135 pub check_for_deferral: bool,
2136}
2137
2138#[derive(Debug)]
2139pub struct IndexedChangeSet {
2140 pub col: ColId,
2141 pub changes: Vec<Operation<Key, RcValue>>,
2142 pub node_changes: Vec<NodeChange>,
2143 pub used_trees: HashSet<Key>,
2144}
2145
2146impl IndexedChangeSet {
2147 pub fn new(col: ColId) -> Self {
2148 IndexedChangeSet {
2149 col,
2150 changes: Default::default(),
2151 node_changes: Default::default(),
2152 used_trees: Default::default(),
2153 }
2154 }
2155
2156 fn push<K: AsRef<[u8]>, V: Into<RcValue>>(
2157 &mut self,
2158 change: Operation<K, V>,
2159 options: &Options,
2160 db_version: u32,
2161 ) -> Result<()> {
2162 let salt = options.salt.unwrap_or_default();
2163 let hash_key = |key: &[u8]| -> Key {
2164 hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
2165 };
2166
2167 self.push_change_hashed(match change {
2168 Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
2169 Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
2170 Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
2171 Operation::InsertTree(..) |
2172 Operation::ReferenceTree(..) |
2173 Operation::DereferenceTree(..) =>
2174 return Err(Error::InvalidInput(format!(
2175 "Invalid operation for column {}",
2176 self.col
2177 ))),
2178 });
2179
2180 Ok(())
2181 }
2182
2183 fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
2184 self.changes.push(change);
2185 }
2186
2187 fn push_node_change(&mut self, change: NodeChange) {
2188 self.node_changes.push(change);
2189 }
2190
2191 fn copy_to_overlay(
2192 &self,
2193 overlay: &mut CommitOverlay,
2194 record_id: u64,
2195 bytes: &mut usize,
2196 options: &Options,
2197 ) -> Result<()> {
2198 let ref_counted = options.columns[self.col as usize].ref_counted;
2199 for change in self.changes.iter() {
2200 match &change {
2201 Operation::Set(k, v) => {
2202 *bytes += k.len();
2203 *bytes += v.as_ref().len();
2204 overlay.indexed.insert(*k, (record_id, Some(v.clone())));
2205 },
2206 Operation::Dereference(k) => {
2207 if !ref_counted {
2209 overlay.indexed.insert(*k, (record_id, None));
2210 }
2211 },
2212 Operation::Reference(..) => {
2213 if !ref_counted {
2216 return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
2217 }
2218 },
2219 Operation::InsertTree(..) |
2220 Operation::ReferenceTree(..) |
2221 Operation::DereferenceTree(..) =>
2222 return Err(Error::InvalidInput(format!(
2223 "Invalid operation for column {}",
2224 self.col
2225 ))),
2226 }
2227 }
2228 for change in self.node_changes.iter() {
2229 if let NodeChange::NewValue(address, val) = change {
2230 *bytes += val.as_ref().len();
2231 overlay.address.insert(*address, (record_id, val.clone()));
2232 }
2233 }
2234 Ok(())
2235 }
2236
2237 fn write_plan(
2238 &self,
2239 db: &Arc<DbInner>,
2240 col: ColId,
2241 column: &Column,
2242 writer: &mut crate::log::LogWriter,
2243 ops: &mut u64,
2244 reindex: &mut bool,
2245 ) -> Result<()> {
2246 let column = match column {
2247 Column::Hash(column) => column,
2248 Column::Tree(_) => {
2249 log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
2250 return Ok(())
2251 },
2252 };
2253 for change in self.changes.iter() {
2254 if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
2255 *reindex = true;
2257 }
2258 *ops += 1;
2259 }
2260 for change in self.node_changes.iter() {
2261 match change {
2262 NodeChange::NewValue(address, val) => {
2263 column.write_address_value_plan(
2264 *address,
2265 val.clone(),
2266 false,
2267 val.as_ref().len() as u32,
2268 writer,
2269 )?;
2270 },
2271 NodeChange::IncrementReference(address) => {
2272 if let PlanOutcome::NeedReindex =
2273 column.write_address_inc_ref_plan(*address, writer)?
2274 {
2275 *reindex = true;
2276 }
2277 },
2278 NodeChange::DereferenceChildren(key, hash, children) => {
2279 if let Some((_root, rc)) = column.get(hash, writer)? {
2280 column.write_plan(&Operation::Dereference(*hash), writer)?;
2281 log::debug!(target: "parity-db", "Dereferencing root, rc={}", rc);
2282 if rc == 1 {
2283 let tree = db.get_tree(db, col, key, false).unwrap();
2284 if let Some(tree) = tree {
2285 let guard = tree.write();
2286 let mut num_removed = 0;
2287 self.write_dereference_children_plan(
2288 column,
2289 &guard,
2290 children,
2291 &mut num_removed,
2292 writer,
2293 )?;
2294 log::debug!(target: "parity-db", "Dereferenced tree {:?}, removed {}", &key[0..3], num_removed);
2295 }
2296 }
2297 }
2298 },
2300 }
2301 }
2302 Ok(())
2303 }
2304
2305 fn write_dereference_children_plan(
2306 &self,
2307 column: &HashColumn,
2308 guard: &RwLockWriteGuard<'_, Box<dyn TreeReader + Send + Sync>>,
2309 children: &Vec<u64>,
2310 num_removed: &mut u64,
2311 writer: &mut crate::log::LogWriter,
2312 ) -> Result<()> {
2313 for address in children {
2314 let node = guard.get_node_children(*address)?;
2318 let (remains, _outcome) = column.write_address_dec_ref_plan(*address, writer)?;
2319 if !remains {
2320 *num_removed += 1;
2322 if let Some(children) = node {
2323 self.write_dereference_children_plan(
2324 column,
2325 guard,
2326 &children,
2327 num_removed,
2328 writer,
2329 )?;
2330 } else {
2331 return Err(Error::InvalidConfiguration("Missing node data".to_string()))
2332 }
2333 }
2334 }
2335 Ok(())
2336 }
2337
2338 fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
2339 use std::collections::hash_map::Entry;
2340 for change in self.changes.iter() {
2341 match change {
2342 Operation::Set(k, _) | Operation::Dereference(k) => {
2343 if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
2344 if e.get().0 == record_id {
2345 e.remove_entry();
2346 }
2347 }
2348 },
2349 Operation::Reference(..) |
2350 Operation::InsertTree(..) |
2351 Operation::ReferenceTree(..) |
2352 Operation::DereferenceTree(..) => (),
2353 }
2354 }
2355 for change in self.node_changes.iter() {
2356 if let NodeChange::NewValue(address, _val) = change {
2357 if let Entry::Occupied(e) = overlay.address.entry(*address) {
2358 if e.get().0 == record_id {
2359 e.remove_entry();
2360 }
2361 }
2362 }
2363 }
2364 }
2365}
2366
2367pub mod check {
2369 pub enum CheckDisplay {
2371 None,
2373 Full,
2375 Short(u64),
2377 }
2378
2379 pub struct CheckOptions {
2381 pub column: Option<u8>,
2383 pub from: Option<u64>,
2385 pub bound: Option<u64>,
2387 pub display: CheckDisplay,
2389 pub fast: bool,
2391 pub validate_free_refs: bool,
2393 }
2394
2395 impl CheckOptions {
2396 pub fn new(
2398 column: Option<u8>,
2399 from: Option<u64>,
2400 bound: Option<u64>,
2401 display_content: bool,
2402 truncate_value_display: Option<u64>,
2403 fast: bool,
2404 validate_free_refs: bool,
2405 ) -> Self {
2406 let display = if display_content {
2407 match truncate_value_display {
2408 Some(t) => CheckDisplay::Short(t),
2409 None => CheckDisplay::Full,
2410 }
2411 } else {
2412 CheckDisplay::None
2413 };
2414 CheckOptions { column, from, bound, display, fast, validate_free_refs }
2415 }
2416 }
2417}
2418
2419#[derive(Eq, PartialEq, Clone, Copy)]
2420enum OpeningMode {
2421 Create,
2422 Write,
2423 ReadOnly,
2424}
2425
2426#[cfg(test)]
2427mod tests {
2428 use super::{Db, Options};
2429 use crate::{
2430 column::ColId,
2431 db::{DbInner, OpeningMode},
2432 ColumnOptions, Value,
2433 };
2434 use rand::Rng;
2435 use std::{
2436 collections::{BTreeMap, HashMap, HashSet},
2437 path::Path,
2438 };
2439 use tempfile::tempdir;
2440
2441 #[derive(Eq, PartialEq, Debug, Clone, Copy)]
2443 enum EnableCommitPipelineStages {
2444 #[allow(dead_code)]
2446 CommitOverlay,
2447 #[allow(dead_code)]
2449 LogOverlay,
2450 #[allow(dead_code)]
2452 DbFile,
2453 Standard,
2455 }
2456
2457 impl EnableCommitPipelineStages {
2458 fn options(&self, path: &Path, num_columns: u8) -> Options {
2459 Options {
2460 path: path.into(),
2461 sync_wal: true,
2462 sync_data: true,
2463 stats: true,
2464 salt: None,
2465 columns: (0..num_columns).map(|_| Default::default()).collect(),
2466 compression_threshold: HashMap::new(),
2467 with_background_thread: *self == Self::Standard,
2468 always_flush: *self == Self::DbFile,
2469 }
2470 }
2471
2472 fn run_stages(&self, db: &Db) {
2473 let db = &db.inner;
2474 if *self == EnableCommitPipelineStages::DbFile ||
2475 *self == EnableCommitPipelineStages::LogOverlay
2476 {
2477 while db.process_commits(db).unwrap() {}
2478 while db.process_reindex().unwrap() {}
2479 }
2480 if *self == EnableCommitPipelineStages::DbFile {
2481 let _ = db.log.flush_one(0).unwrap();
2482 while db.enact_logs(false).unwrap() {}
2483 let _ = db.clean_logs().unwrap();
2484 }
2485 }
2486
2487 fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
2488 match self {
2489 EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
2490 if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
2491 if !overlay.is_empty() {
2492 let mut replayed = 5;
2493 while !overlay.is_empty() {
2494 if replayed > 0 {
2495 replayed -= 1;
2496 std::thread::sleep(std::time::Duration::from_millis(100));
2501 } else {
2502 return false
2503 }
2504 }
2505 }
2506 }
2507 },
2508 _ => (),
2509 }
2510 true
2511 }
2512 }
2513
2514 #[test]
2515 fn test_db_open_should_fail() {
2516 let tmp = tempdir().unwrap();
2517 let options = Options::with_columns(tmp.path(), 5);
2518 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2519 }
2520
2521 #[test]
2522 fn test_db_open_fail_then_recursively_create() {
2523 let tmp = tempdir().unwrap();
2524 let (db_path_first, db_path_last) = {
2525 let mut db_path_first = tmp.path().to_owned();
2526 db_path_first.push("nope");
2527
2528 let mut db_path_last = db_path_first.to_owned();
2529
2530 for p in ["does", "not", "yet", "exist"] {
2531 db_path_last.push(p);
2532 }
2533
2534 (db_path_first, db_path_last)
2535 };
2536
2537 assert!(
2538 !db_path_first.exists(),
2539 "That directory should not have existed at this point (dir: {db_path_first:?})"
2540 );
2541
2542 let options = Options::with_columns(&db_path_last, 5);
2543 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2544
2545 assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
2546 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2547
2548 assert!(
2549 db_path_first.is_dir(),
2550 "A directory should have been been created (dir: {db_path_first:?})"
2551 );
2552 assert!(
2553 db_path_last.is_dir(),
2554 "A directory should have been been created (dir: {db_path_last:?})"
2555 );
2556 }
2557
2558 #[test]
2559 fn test_db_open_or_create() {
2560 let tmp = tempdir().unwrap();
2561 let options = Options::with_columns(tmp.path(), 5);
2562 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2563 assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
2564 }
2565
2566 #[test]
2567 fn test_indexed_keyvalues() {
2568 test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
2569 test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
2570 test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
2571 test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
2572 }
2573 fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
2574 let tmp = tempdir().unwrap();
2575 let options = db_test.options(tmp.path(), 5);
2576 let col_nb = 0;
2577
2578 let key1 = b"key1".to_vec();
2579 let key2 = b"key2".to_vec();
2580 let key3 = b"key3".to_vec();
2581
2582 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2583 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2584
2585 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2586 db_test.run_stages(&db);
2587 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2588
2589 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2590
2591 db.commit(vec![
2592 (col_nb, key1.clone(), None),
2593 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2594 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2595 ])
2596 .unwrap();
2597 db_test.run_stages(&db);
2598 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2599
2600 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2601 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2602 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2603
2604 db.commit(vec![
2605 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2606 (col_nb, key3.clone(), None),
2607 ])
2608 .unwrap();
2609 db_test.run_stages(&db);
2610 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2611
2612 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2613 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2614 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2615 }
2616
2617 #[test]
2618 fn test_indexed_overlay_against_backend() {
2619 let tmp = tempdir().unwrap();
2620 let db_test = EnableCommitPipelineStages::DbFile;
2621 let options = db_test.options(tmp.path(), 5);
2622 let col_nb = 0;
2623
2624 let key1 = b"key1".to_vec();
2625 let key2 = b"key2".to_vec();
2626 let key3 = b"key3".to_vec();
2627
2628 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2629
2630 db.commit(vec![
2631 (col_nb, key1.clone(), Some(b"value1".to_vec())),
2632 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2633 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2634 ])
2635 .unwrap();
2636 db_test.run_stages(&db);
2637 drop(db);
2638
2639 std::thread::sleep(std::time::Duration::from_millis(100));
2641
2642 let db_test = EnableCommitPipelineStages::CommitOverlay;
2643 let options = db_test.options(tmp.path(), 5);
2644 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2645 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2646 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2647 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2648 db.commit(vec![
2649 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2650 (col_nb, key3.clone(), None),
2651 ])
2652 .unwrap();
2653 db_test.run_stages(&db);
2654
2655 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2656 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2657 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2658 }
2659
2660 #[test]
2661 fn test_add_column() {
2662 let tmp = tempdir().unwrap();
2663 let db_test = EnableCommitPipelineStages::DbFile;
2664 let mut options = db_test.options(tmp.path(), 1);
2665 options.salt = Some(options.salt.unwrap_or_default());
2666
2667 let old_col_id = 0;
2668 let new_col_id = 1;
2669 let new_col_indexed_id = 2;
2670
2671 let key1 = b"key1".to_vec();
2672 let key2 = b"key2".to_vec();
2673 let key3 = b"key3".to_vec();
2674
2675 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2676
2677 db.commit(vec![
2678 (old_col_id, key1.clone(), Some(b"value1".to_vec())),
2679 (old_col_id, key2.clone(), Some(b"value2".to_vec())),
2680 (old_col_id, key3.clone(), Some(b"value3".to_vec())),
2681 ])
2682 .unwrap();
2683 db_test.run_stages(&db);
2684
2685 drop(db);
2686
2687 Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
2688 .unwrap();
2689
2690 Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
2691 .unwrap();
2692
2693 let mut options = db_test.options(tmp.path(), 3);
2694 options.columns[new_col_indexed_id as usize].btree_index = true;
2695
2696 let db_test = EnableCommitPipelineStages::DbFile;
2697 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2698
2699 assert_eq!(db.num_columns(), 3);
2701
2702 let new_key1 = b"abcdef".to_vec();
2703 let new_key2 = b"123456".to_vec();
2704
2705 db.commit(vec![
2707 (new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
2708 (new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
2709 (new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
2710 (new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
2711 ])
2712 .unwrap();
2713 db_test.run_stages(&db);
2714
2715 drop(db);
2716
2717 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2719
2720 assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2721 assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2722 assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2723
2724 assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
2726 assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
2727 assert_eq!(
2728 db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
2729 Some(new_key1.to_vec())
2730 );
2731 assert_eq!(
2732 db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
2733 Some(new_key2.to_vec())
2734 );
2735 }
2736
2737 #[test]
2738 fn test_indexed_btree_1() {
2739 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
2740 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
2741 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
2742 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
2743 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
2744 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
2745 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
2746 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
2747 }
2748 fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
2749 let tmp = tempdir().unwrap();
2750 let col_nb = 0u8;
2751 let mut options = db_test.options(tmp.path(), 5);
2752 options.columns[col_nb as usize].btree_index = true;
2753
2754 let (key1, key2, key3, key4) = if !long_key {
2755 (b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
2756 } else {
2757 let key2 = vec![2; 272];
2758 let mut key3 = key2.clone();
2759 key3[271] = 3;
2760 (vec![1; 953], key2, key3, vec![4; 79])
2761 };
2762
2763 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2764 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2765
2766 let mut iter = db.iter(col_nb).unwrap();
2767 assert_eq!(iter.next().unwrap(), None);
2768 assert_eq!(iter.prev().unwrap(), None);
2769
2770 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2771 db_test.run_stages(&db);
2772
2773 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2774 iter.seek_to_first().unwrap();
2775 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2776 assert_eq!(iter.next().unwrap(), None);
2777 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2778 assert_eq!(iter.prev().unwrap(), None);
2779 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2780 assert_eq!(iter.next().unwrap(), None);
2781
2782 iter.seek_to_first().unwrap();
2783 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2784 assert_eq!(iter.prev().unwrap(), None);
2785
2786 iter.seek(&[0xff]).unwrap();
2787 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2788 assert_eq!(iter.prev().unwrap(), None);
2789
2790 db.commit(vec![
2791 (col_nb, key1.clone(), None),
2792 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2793 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2794 ])
2795 .unwrap();
2796 db_test.run_stages(&db);
2797
2798 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2799 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2800 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2801
2802 iter.seek(key2.as_slice()).unwrap();
2803 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2804 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2805 assert_eq!(iter.next().unwrap(), None);
2806
2807 iter.seek(key3.as_slice()).unwrap();
2808 assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2809 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2810 assert_eq!(iter.prev().unwrap(), None);
2811
2812 db.commit(vec![
2813 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2814 (col_nb, key4.clone(), Some(b"value4".to_vec())),
2815 (col_nb, key3.clone(), None),
2816 ])
2817 .unwrap();
2818 db_test.run_stages(&db);
2819
2820 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2821 assert_eq!(db.get(col_nb, &key3).unwrap(), None);
2822 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
2823 assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
2824 let mut key22 = key2.clone();
2825 key22.push(2);
2826 iter.seek(key22.as_slice()).unwrap();
2827 assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
2828 assert_eq!(iter.next().unwrap(), None);
2829 }
2830
2831 #[test]
2832 fn test_indexed_btree_2() {
2833 test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
2834 test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
2835 }
2836 fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
2837 let tmp = tempdir().unwrap();
2838 let col_nb = 0u8;
2839 let mut options = db_test.options(tmp.path(), 5);
2840 options.columns[col_nb as usize].btree_index = true;
2841
2842 let key1 = b"key1".to_vec();
2843 let key2 = b"key2".to_vec();
2844 let key3 = b"key3".to_vec();
2845
2846 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2847 let mut iter = db.iter(col_nb).unwrap();
2848 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2849 assert_eq!(iter.next().unwrap(), None);
2850
2851 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2852 EnableCommitPipelineStages::DbFile.run_stages(&db);
2853 drop(db);
2854
2855 std::thread::sleep(std::time::Duration::from_millis(100));
2857
2858 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2859
2860 let mut iter = db.iter(col_nb).unwrap();
2861 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2862 iter.seek_to_first().unwrap();
2863 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2864 assert_eq!(iter.next().unwrap(), None);
2865
2866 db.commit(vec![
2867 (col_nb, key1.clone(), None),
2868 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2869 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2870 ])
2871 .unwrap();
2872 db_test.run_stages(&db);
2873
2874 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2875 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2876 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2877 iter.seek(key2.as_slice()).unwrap();
2878 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2879 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2880 assert_eq!(iter.next().unwrap(), None);
2881
2882 iter.seek_to_last().unwrap();
2883 assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2884 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2885 assert_eq!(iter.prev().unwrap(), None);
2886 }
2887
2888 #[test]
2889 fn test_indexed_btree_3() {
2890 test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2891 test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2892 test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2893 test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2894 }
2895
2896 fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2897 use rand::SeedableRng;
2898
2899 use std::collections::BTreeSet;
2900
2901 let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2902
2903 let tmp = tempdir().unwrap();
2904 let col_nb = 0u8;
2905 let mut options = db_test.options(tmp.path(), 5);
2906 options.columns[col_nb as usize].btree_index = true;
2907
2908 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2909
2910 db.commit(
2911 (0u64..1024)
2912 .map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2913 .chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2914 )
2915 .unwrap();
2916 let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2917 let mut iter = db.iter(0).unwrap();
2918
2919 for _ in 0..100 {
2920 let at = rng.random_range(0u64..=1024);
2921 iter.seek(&at.to_be_bytes()).unwrap();
2922
2923 let mut prev_run: bool = rng.random();
2924 let at = if prev_run {
2925 let take = rng.random_range(1..100);
2926 let got = std::iter::from_fn(|| iter.next().unwrap())
2927 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2928 .take(take)
2929 .collect::<Vec<_>>();
2930 let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2931 assert_eq!(got, expected);
2932 if got.is_empty() {
2933 prev_run = false;
2934 }
2935 if got.len() < take {
2936 prev_run = false;
2937 }
2938 expected.last().copied().unwrap_or(at)
2939 } else {
2940 at
2941 };
2942
2943 let at = {
2944 let take = rng.random_range(1..100);
2945 let got = std::iter::from_fn(|| iter.prev().unwrap())
2946 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2947 .take(take)
2948 .collect::<Vec<_>>();
2949 let expected = if prev_run {
2950 expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2951 } else {
2952 expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2953 };
2954 assert_eq!(got, expected);
2955 prev_run = !got.is_empty();
2956 if take > got.len() {
2957 prev_run = false;
2958 }
2959 expected.last().copied().unwrap_or(at)
2960 };
2961
2962 let take = rng.random_range(1..100);
2963 let mut got = std::iter::from_fn(|| iter.next().unwrap())
2964 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2965 .take(take)
2966 .collect::<Vec<_>>();
2967 let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2968 if prev_run {
2969 expected = expected.split_off(1);
2970 if got.len() == take {
2971 got.pop();
2972 }
2973 }
2974 assert_eq!(got, expected);
2975 }
2976
2977 let take = rng.random_range(20..100);
2978 iter.seek_to_last().unwrap();
2979 let got = std::iter::from_fn(|| iter.prev().unwrap())
2980 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2981 .take(take)
2982 .collect::<Vec<_>>();
2983 let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2984 assert_eq!(got, expected);
2985 }
2986
2987 fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2988 test_basic_inner(change_set, false, false);
2989 test_basic_inner(change_set, false, true);
2990 test_basic_inner(change_set, true, false);
2991 test_basic_inner(change_set, true, true);
2992 }
2993
2994 fn test_basic_inner(
2995 change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2996 btree_index: bool,
2997 ref_counted: bool,
2998 ) {
2999 let tmp = tempdir().unwrap();
3000 let col_nb = 1u8;
3001 let db_test = EnableCommitPipelineStages::DbFile;
3002 let mut options = db_test.options(tmp.path(), 2);
3003 options.columns[col_nb as usize].btree_index = btree_index;
3004 options.columns[col_nb as usize].ref_counted = ref_counted;
3005 options.columns[col_nb as usize].preimage = ref_counted;
3006 assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
3008 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3009
3010 let iter = btree_index.then(|| db.iter(col_nb).unwrap());
3011 assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
3012
3013 db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
3014 .unwrap();
3015 db_test.run_stages(&db);
3016
3017 let mut keys = HashSet::new();
3018 let mut expected_count: u64 = 0;
3019 for (k, v) in change_set.iter() {
3020 if v.is_some() {
3021 if keys.insert(k) {
3022 expected_count += 1;
3023 }
3024 } else if keys.remove(k) {
3025 expected_count -= 1;
3026 }
3027 }
3028 if ref_counted {
3029 let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
3030 for (k, v) in change_set.iter() {
3031 let mut remove = false;
3032 let mut insert = false;
3033 match state.get_mut(k) {
3034 Some(Some((_, counter))) =>
3035 if v.is_some() {
3036 *counter += 1;
3037 } else if *counter == 1 {
3038 remove = true;
3039 } else {
3040 *counter -= 1;
3041 },
3042 Some(None) | None =>
3043 if v.is_some() {
3044 insert = true;
3045 },
3046 }
3047 if insert {
3048 state.insert(k.clone(), v.clone().map(|v| (v, 1)));
3049 }
3050 if remove {
3051 state.remove(k);
3052 }
3053 }
3054 for (key, value) in state {
3055 assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
3056 }
3057 } else {
3058 let stats = db.stats();
3059 if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
3061 assert_eq!(stats.total_values, expected_count);
3062 }
3063
3064 let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
3065 change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
3066 for (key, value) in state.iter() {
3067 assert_eq!(&db.get(col_nb, key).unwrap(), value);
3068 }
3069 }
3070 }
3071
3072 #[test]
3073 fn test_random() {
3074 fdlimit::raise_fd_limit().unwrap();
3075 for i in 0..100 {
3076 test_random_inner(60, 60, i);
3077 }
3078 for i in 0..50 {
3079 test_random_inner(20, 60, i);
3080 }
3081 }
3082 fn test_random_inner(size: usize, key_size: usize, seed: u64) {
3083 use rand::{RngCore, SeedableRng};
3084 let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
3085 let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
3086 for i in 0..size {
3087 let nb_delete: u32 = rng.next_u32(); let nb_delete = (nb_delete as usize % size) / 2;
3089 let mut key = vec![0u8; key_size];
3090 rng.fill_bytes(&mut key[..]);
3091 let value = if i > size - nb_delete {
3092 let random_key = rng.next_u32();
3093 let random_key = (random_key % 4) > 0;
3094 if !random_key {
3095 key = data[i - size / 2].0.clone();
3096 }
3097 None
3098 } else {
3099 Some(key.clone())
3100 };
3101 let var_keysize = rng.next_u32();
3102 let var_keysize = var_keysize as usize % (key_size / 2);
3103 key.truncate(key_size - var_keysize);
3104 data.push((key, value));
3105 }
3106 test_basic(&data[..]);
3107 }
3108
3109 #[test]
3110 fn test_simple() {
3111 test_basic(&[
3112 (b"key1".to_vec(), Some(b"value1".to_vec())),
3113 (b"key1".to_vec(), Some(b"value1".to_vec())),
3114 (b"key1".to_vec(), None),
3115 ]);
3116 test_basic(&[
3117 (b"key1".to_vec(), Some(b"value1".to_vec())),
3118 (b"key1".to_vec(), Some(b"value1".to_vec())),
3119 (b"key1".to_vec(), None),
3120 (b"key1".to_vec(), None),
3121 ]);
3122 test_basic(&[
3123 (b"key1".to_vec(), Some(b"value1".to_vec())),
3124 (b"key1".to_vec(), Some(b"value2".to_vec())),
3125 ]);
3126 test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
3127 test_basic(&[
3128 (b"key1".to_vec(), Some(b"value1".to_vec())),
3129 (b"key2".to_vec(), Some(b"value2".to_vec())),
3130 ]);
3131 test_basic(&[
3132 (b"key1".to_vec(), Some(b"value1".to_vec())),
3133 (b"key2".to_vec(), Some(b"value2".to_vec())),
3134 (b"key3".to_vec(), Some(b"value3".to_vec())),
3135 ]);
3136 test_basic(&[
3137 (b"key1".to_vec(), Some(b"value1".to_vec())),
3138 (b"key3".to_vec(), Some(b"value3".to_vec())),
3139 (b"key2".to_vec(), Some(b"value2".to_vec())),
3140 ]);
3141 test_basic(&[
3142 (b"key3".to_vec(), Some(b"value3".to_vec())),
3143 (b"key2".to_vec(), Some(b"value2".to_vec())),
3144 (b"key1".to_vec(), Some(b"value1".to_vec())),
3145 ]);
3146 test_basic(&[
3147 (b"key1".to_vec(), Some(b"value1".to_vec())),
3148 (b"key2".to_vec(), Some(b"value2".to_vec())),
3149 (b"key3".to_vec(), Some(b"value3".to_vec())),
3150 (b"key4".to_vec(), Some(b"value4".to_vec())),
3151 ]);
3152 test_basic(&[
3153 (b"key1".to_vec(), Some(b"value1".to_vec())),
3154 (b"key2".to_vec(), Some(b"value2".to_vec())),
3155 (b"key3".to_vec(), Some(b"value3".to_vec())),
3156 (b"key4".to_vec(), Some(b"value4".to_vec())),
3157 (b"key5".to_vec(), Some(b"value5".to_vec())),
3158 ]);
3159 test_basic(&[
3160 (b"key5".to_vec(), Some(b"value5".to_vec())),
3161 (b"key3".to_vec(), Some(b"value3".to_vec())),
3162 (b"key4".to_vec(), Some(b"value4".to_vec())),
3163 (b"key2".to_vec(), Some(b"value2".to_vec())),
3164 (b"key1".to_vec(), Some(b"value1".to_vec())),
3165 ]);
3166 test_basic(&[
3167 (b"key5".to_vec(), Some(b"value5".to_vec())),
3168 (b"key3".to_vec(), Some(b"value3".to_vec())),
3169 (b"key4".to_vec(), Some(b"value4".to_vec())),
3170 (b"key2".to_vec(), Some(b"value2".to_vec())),
3171 (b"key1".to_vec(), Some(b"value1".to_vec())),
3172 (b"key11".to_vec(), Some(b"value31".to_vec())),
3173 (b"key12".to_vec(), Some(b"value32".to_vec())),
3174 ]);
3175 test_basic(&[
3176 (b"key5".to_vec(), Some(b"value5".to_vec())),
3177 (b"key3".to_vec(), Some(b"value3".to_vec())),
3178 (b"key4".to_vec(), Some(b"value4".to_vec())),
3179 (b"key2".to_vec(), Some(b"value2".to_vec())),
3180 (b"key1".to_vec(), Some(b"value1".to_vec())),
3181 (b"key51".to_vec(), Some(b"value31".to_vec())),
3182 (b"key52".to_vec(), Some(b"value32".to_vec())),
3183 ]);
3184 test_basic(&[
3185 (b"key5".to_vec(), Some(b"value5".to_vec())),
3186 (b"key3".to_vec(), Some(b"value3".to_vec())),
3187 (b"key4".to_vec(), Some(b"value4".to_vec())),
3188 (b"key2".to_vec(), Some(b"value2".to_vec())),
3189 (b"key1".to_vec(), Some(b"value1".to_vec())),
3190 (b"key31".to_vec(), Some(b"value31".to_vec())),
3191 (b"key32".to_vec(), Some(b"value32".to_vec())),
3192 ]);
3193 test_basic(&[
3194 (b"key1".to_vec(), Some(b"value5".to_vec())),
3195 (b"key2".to_vec(), Some(b"value3".to_vec())),
3196 (b"key3".to_vec(), Some(b"value4".to_vec())),
3197 (b"key4".to_vec(), Some(b"value7".to_vec())),
3198 (b"key5".to_vec(), Some(b"value2".to_vec())),
3199 (b"key6".to_vec(), Some(b"value1".to_vec())),
3200 (b"key3".to_vec(), None),
3201 ]);
3202 test_basic(&[
3203 (b"key1".to_vec(), Some(b"value5".to_vec())),
3204 (b"key2".to_vec(), Some(b"value3".to_vec())),
3205 (b"key3".to_vec(), Some(b"value4".to_vec())),
3206 (b"key4".to_vec(), Some(b"value7".to_vec())),
3207 (b"key5".to_vec(), Some(b"value2".to_vec())),
3208 (b"key0".to_vec(), Some(b"value1".to_vec())),
3209 (b"key3".to_vec(), None),
3210 ]);
3211 test_basic(&[
3212 (b"key1".to_vec(), Some(b"value5".to_vec())),
3213 (b"key2".to_vec(), Some(b"value3".to_vec())),
3214 (b"key3".to_vec(), Some(b"value4".to_vec())),
3215 (b"key4".to_vec(), Some(b"value7".to_vec())),
3216 (b"key5".to_vec(), Some(b"value2".to_vec())),
3217 (b"key3".to_vec(), None),
3218 ]);
3219 test_basic(&[
3220 (b"key1".to_vec(), Some(b"value5".to_vec())),
3221 (b"key4".to_vec(), Some(b"value3".to_vec())),
3222 (b"key5".to_vec(), Some(b"value4".to_vec())),
3223 (b"key6".to_vec(), Some(b"value4".to_vec())),
3224 (b"key7".to_vec(), Some(b"value2".to_vec())),
3225 (b"key8".to_vec(), Some(b"value1".to_vec())),
3226 (b"key5".to_vec(), None),
3227 ]);
3228 test_basic(&[
3229 (b"key1".to_vec(), Some(b"value5".to_vec())),
3230 (b"key4".to_vec(), Some(b"value3".to_vec())),
3231 (b"key5".to_vec(), Some(b"value4".to_vec())),
3232 (b"key7".to_vec(), Some(b"value2".to_vec())),
3233 (b"key8".to_vec(), Some(b"value1".to_vec())),
3234 (b"key3".to_vec(), None),
3235 ]);
3236 test_basic(&[
3237 (b"key5".to_vec(), Some(b"value5".to_vec())),
3238 (b"key3".to_vec(), Some(b"value3".to_vec())),
3239 (b"key4".to_vec(), Some(b"value4".to_vec())),
3240 (b"key2".to_vec(), Some(b"value2".to_vec())),
3241 (b"key1".to_vec(), Some(b"value1".to_vec())),
3242 (b"key5".to_vec(), None),
3243 (b"key3".to_vec(), None),
3244 ]);
3245 test_basic(&[
3246 (b"key5".to_vec(), Some(b"value5".to_vec())),
3247 (b"key3".to_vec(), Some(b"value3".to_vec())),
3248 (b"key4".to_vec(), Some(b"value4".to_vec())),
3249 (b"key2".to_vec(), Some(b"value2".to_vec())),
3250 (b"key1".to_vec(), Some(b"value1".to_vec())),
3251 (b"key5".to_vec(), None),
3252 (b"key3".to_vec(), None),
3253 (b"key2".to_vec(), None),
3254 (b"key4".to_vec(), None),
3255 ]);
3256 test_basic(&[
3257 (b"key5".to_vec(), Some(b"value5".to_vec())),
3258 (b"key3".to_vec(), Some(b"value3".to_vec())),
3259 (b"key4".to_vec(), Some(b"value4".to_vec())),
3260 (b"key2".to_vec(), Some(b"value2".to_vec())),
3261 (b"key1".to_vec(), Some(b"value1".to_vec())),
3262 (b"key5".to_vec(), None),
3263 (b"key3".to_vec(), None),
3264 (b"key2".to_vec(), None),
3265 (b"key4".to_vec(), None),
3266 (b"key1".to_vec(), None),
3267 ]);
3268 test_basic(&[
3269 ([5u8; 250].to_vec(), Some(b"value5".to_vec())),
3270 ([5u8; 200].to_vec(), Some(b"value3".to_vec())),
3271 ([5u8; 100].to_vec(), Some(b"value4".to_vec())),
3272 ([5u8; 150].to_vec(), Some(b"value2".to_vec())),
3273 ([5u8; 101].to_vec(), Some(b"value1".to_vec())),
3274 ([5u8; 250].to_vec(), None),
3275 ([5u8; 101].to_vec(), None),
3276 ]);
3277 }
3278
3279 #[test]
3280 fn test_btree_iter() {
3281 let col_nb = 0;
3282 let mut data_start = Vec::new();
3283 for i in 0u8..100 {
3284 let mut key = b"key0".to_vec();
3285 key[3] = i;
3286 let mut value = b"val0".to_vec();
3287 value[3] = i;
3288 data_start.push((col_nb, key, Some(value)));
3289 }
3290 let mut data_change = Vec::new();
3291 for i in 0u8..100 {
3292 let mut key = b"key0".to_vec();
3293 if i % 2 == 0 {
3294 key[2] = i;
3295 let mut value = b"val0".to_vec();
3296 value[2] = i;
3297 data_change.push((col_nb, key, Some(value)));
3298 } else if i % 3 == 0 {
3299 key[3] = i;
3300 data_change.push((col_nb, key, None));
3301 } else {
3302 key[3] = i;
3303 let mut value = b"val0".to_vec();
3304 value[2] = i;
3305 data_change.push((col_nb, key, Some(value)));
3306 }
3307 }
3308
3309 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3310 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3311 let mut end_state = start_state.clone();
3312 for (_c, k, v) in data_change.iter() {
3313 if let Some(v) = v {
3314 end_state.insert(k.clone(), v.clone());
3315 } else {
3316 end_state.remove(k);
3317 }
3318 }
3319
3320 for stage in [
3321 EnableCommitPipelineStages::CommitOverlay,
3322 EnableCommitPipelineStages::LogOverlay,
3323 EnableCommitPipelineStages::DbFile,
3324 EnableCommitPipelineStages::Standard,
3325 ] {
3326 for i in 0..10 {
3327 test_btree_iter_inner(
3328 stage,
3329 &data_start,
3330 &data_change,
3331 &start_state,
3332 &end_state,
3333 i * 5,
3334 );
3335 }
3336 let data_start = vec![
3337 (0, b"key1".to_vec(), Some(b"val1".to_vec())),
3338 (0, b"key3".to_vec(), Some(b"val3".to_vec())),
3339 ];
3340 let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
3341 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3342 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3343 let mut end_state = start_state.clone();
3344 for (_c, k, v) in data_change.iter() {
3345 if let Some(v) = v {
3346 end_state.insert(k.clone(), v.clone());
3347 } else {
3348 end_state.remove(k);
3349 }
3350 }
3351 test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
3352 }
3353 }
3354 fn test_btree_iter_inner(
3355 db_test: EnableCommitPipelineStages,
3356 data_start: &[(u8, Vec<u8>, Option<Value>)],
3357 data_change: &[(u8, Vec<u8>, Option<Value>)],
3358 start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3359 end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3360 commit_at: usize,
3361 ) {
3362 let tmp = tempdir().unwrap();
3363 let mut options = db_test.options(tmp.path(), 5);
3364 let col_nb = 0;
3365 options.columns[col_nb as usize].btree_index = true;
3366 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3367
3368 db.commit(data_start.iter().cloned()).unwrap();
3369 db_test.run_stages(&db);
3370
3371 let mut iter = db.iter(col_nb).unwrap();
3372 let mut iter_state = start_state.iter();
3373 let mut last_key = Value::new();
3374 for _ in 0..commit_at {
3375 let next = iter.next().unwrap();
3376 if let Some((k, _)) = next.as_ref() {
3377 last_key = k.clone();
3378 }
3379 assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
3380 }
3381
3382 db.commit(data_change.iter().cloned()).unwrap();
3383 db_test.run_stages(&db);
3384
3385 let mut iter_state = end_state.range(last_key.clone()..);
3386 for _ in commit_at..100 {
3387 let mut state_next = iter_state.next();
3388 if let Some((k, _v)) = state_next.as_ref() {
3389 if *k == &last_key {
3390 state_next = iter_state.next();
3391 }
3392 }
3393 let iter_next = iter.next().unwrap();
3394 assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
3395 }
3396 let mut iter_state_rev = end_state.iter().rev();
3397 let mut iter = db.iter(col_nb).unwrap();
3398 iter.seek_to_last().unwrap();
3399 for _ in 0..100 {
3400 let next = iter.prev().unwrap();
3401 assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
3402 }
3403 }
3404
3405 #[cfg(feature = "instrumentation")]
3406 #[test]
3407 fn test_recover_from_log_on_error() {
3408 let tmp = tempdir().unwrap();
3409 let mut options = Options::with_columns(tmp.path(), 1);
3410 options.always_flush = true;
3411 options.with_background_thread = false;
3412
3413 {
3415 let db = Db::open_or_create(&options).unwrap();
3416 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3417 db.process_commits().unwrap();
3418 db.flush_logs().unwrap();
3419 db.enact_logs().unwrap();
3420 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3421 db.process_commits().unwrap();
3422 db.flush_logs().unwrap();
3423 crate::set_number_of_allowed_io_operations(4);
3424
3425 let err = db.enact_logs();
3427 assert!(err.is_err());
3428 db.inner.store_err(err);
3429 crate::set_number_of_allowed_io_operations(usize::MAX);
3430 }
3431
3432 {
3434 let db = Db::open(&options).unwrap();
3435 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3436 assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
3437 }
3438 }
3439
3440 #[cfg(feature = "instrumentation")]
3441 #[test]
3442 fn test_partial_log_recovery() {
3443 let tmp = tempdir().unwrap();
3444 let mut options = Options::with_columns(tmp.path(), 1);
3445 options.columns[0].btree_index = true;
3446 options.always_flush = true;
3447 options.with_background_thread = false;
3448
3449 {
3451 let db = Db::open_or_create(&options).unwrap();
3452 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3453 db.process_commits().unwrap();
3454 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3455 crate::set_number_of_allowed_io_operations(4);
3456 assert!(db.process_commits().is_err());
3457 crate::set_number_of_allowed_io_operations(usize::MAX);
3458 db.flush_logs().unwrap();
3459 }
3460
3461 {
3463 let db = Db::open(&options).unwrap();
3464 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3465 }
3466
3467 {
3469 let db = Db::open(&options).unwrap();
3470 assert!(db.get(0, &[0]).unwrap().is_some());
3471 }
3472 }
3473
3474 #[cfg(feature = "instrumentation")]
3475 #[test]
3476 fn test_continue_reindex() {
3477 let _ = env_logger::try_init();
3478 let tmp = tempdir().unwrap();
3479 let mut options = Options::with_columns(tmp.path(), 1);
3480 options.columns[0].preimage = true;
3481 options.columns[0].uniform = true;
3482 options.always_flush = true;
3483 options.with_background_thread = false;
3484 options.salt = Some(Default::default());
3485
3486 {
3487 let db = Db::open_or_create(&options).unwrap();
3489 let commit: Vec<_> = (0..65u32)
3490 .map(|index| {
3491 let mut key = [0u8; 32];
3492 key[2] = (index as u8) << 1;
3493 (0, key.to_vec(), Some(vec![index as u8]))
3494 })
3495 .collect();
3496 db.commit(commit).unwrap();
3497
3498 db.process_commits().unwrap();
3499 db.flush_logs().unwrap();
3500 db.enact_logs().unwrap();
3501 std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
3506 .unwrap();
3507 db.process_reindex().unwrap();
3508 db.flush_logs().unwrap();
3509 db.enact_logs().unwrap();
3510 db.clean_logs().unwrap();
3511 std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
3512 .unwrap();
3513 }
3514
3515 {
3517 let db = Db::open(&options).unwrap();
3518 db.process_reindex().unwrap();
3519 let mut entries = 0;
3520 db.iter_column_while(0, |_| {
3521 entries += 1;
3522 true
3523 })
3524 .unwrap();
3525
3526 assert_eq!(entries, 65);
3527 assert_eq!(db.inner.columns[0].index_bits(), Some(17));
3528 }
3529 }
3530
3531 #[test]
3532 fn test_remove_column() {
3533 let tmp = tempdir().unwrap();
3534 let db_test_file = EnableCommitPipelineStages::DbFile;
3535 let mut options_db_files = db_test_file.options(tmp.path(), 2);
3536 options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
3537 let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
3538 options_std.salt = options_db_files.salt.clone();
3539
3540 let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
3541
3542 let payload: Vec<(u8, _, _)> = (0u16..100)
3543 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3544 .collect();
3545
3546 db.commit(payload.clone()).unwrap();
3547
3548 db_test_file.run_stages(&db);
3549 drop(db);
3550
3551 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3552 for (col, key, value) in payload.iter() {
3553 assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
3554 }
3555 drop(db);
3556 Db::reset_column(&mut options_db_files, 1, None).unwrap();
3557
3558 let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
3559 for (col, key, _value) in payload.iter() {
3560 assert_eq!(db.get(*col, key).unwrap(), None);
3561 }
3562
3563 let payload: Vec<(u8, _, _)> = (0u16..10)
3564 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3565 .collect();
3566
3567 db.commit(payload.clone()).unwrap();
3568
3569 db_test_file.run_stages(&db);
3570 drop(db);
3571
3572 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3573 let payload: Vec<(u8, _, _)> = (10u16..100)
3574 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3575 .collect();
3576
3577 db.commit(payload.clone()).unwrap();
3578 assert!(db.iter(1).is_err());
3579
3580 drop(db);
3581
3582 let mut col_option = options_std.columns[1].clone();
3583 col_option.btree_index = true;
3584 Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
3585
3586 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3587 let payload: Vec<(u8, _, _)> = (0u16..10)
3588 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3589 .collect();
3590
3591 db.commit(payload.clone()).unwrap();
3592 assert!(db.iter(1).is_ok());
3593 }
3594}