1use crate::{
22 btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
23 column::{
24 hash_key, unpack_node_children, unpack_node_data, ColId, Column, HashColumn, IterState,
25 ReindexBatch, ValueIterState,
26 },
27 error::{try_io, Error, Result},
28 hash::IdentityBuildHasher,
29 index::{Address, PlanOutcome},
30 log::{Log, LogAction},
31 multitree::{Children, NewNode, NodeAddress},
32 options::{Options, CURRENT_VERSION},
33 parking_lot::{
34 Condvar, Mutex, MutexGuard, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard,
35 },
36 stats::StatSummary,
37 ColumnOptions, Key,
38};
39#[cfg(feature = "bytes")]
40use bytes::Bytes;
41use fs2::FileExt;
42use std::{
43 borrow::Borrow,
44 collections::{BTreeMap, HashMap, HashSet, VecDeque},
45 ops::Bound,
46 sync::{
47 atomic::{AtomicBool, AtomicU64, Ordering},
48 Arc, Weak,
49 },
50 thread,
51};
52
53const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
57const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
60const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
62const KEEP_LOGS: usize = 16;
65const MAX_LOG_FILES: usize = 4;
69
70pub type Value = Vec<u8>;
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
74pub enum RcValue {
75 #[cfg(feature = "arc")]
76 Arc(Arc<Value>),
77 #[cfg(feature = "bytes")]
78 Bytes(Bytes),
79}
80
81impl AsRef<[u8]> for RcValue {
82 fn as_ref(&self) -> &[u8] {
83 match self {
84 #[cfg(feature = "arc")]
85 Self::Arc(arc) => arc.as_ref(),
86 #[cfg(feature = "bytes")]
87 Self::Bytes(bytes) => bytes.as_ref(),
88 }
89 }
90}
91
92impl Borrow<[u8]> for RcValue {
93 fn borrow(&self) -> &[u8] {
94 self.as_ref()
95 }
96}
97
98#[cfg(feature = "arc")]
99impl From<Value> for RcValue {
100 fn from(value: Value) -> Self {
101 Self::Arc(value.into())
102 }
103}
104
105#[cfg(not(feature = "arc"))]
106impl From<Value> for RcValue {
107 fn from(value: Value) -> Self {
108 Self::Bytes(value.into())
109 }
110}
111
112#[cfg(feature = "arc")]
113impl From<Arc<Value>> for RcValue {
114 fn from(value: Arc<Value>) -> Self {
115 Self::Arc(value)
116 }
117}
118
119#[cfg(feature = "bytes")]
120impl From<Bytes> for RcValue {
121 fn from(value: Bytes) -> Self {
122 Self::Bytes(value)
123 }
124}
125
126#[cfg(test)]
127impl<const N: usize> TryFrom<RcValue> for [u8; N] {
128 type Error = <[u8; N] as TryFrom<Vec<u8>>>::Error;
129
130 fn try_from(value: RcValue) -> std::result::Result<Self, Self::Error> {
131 value.as_ref().to_vec().try_into()
132 }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
136pub struct RcKey(Arc<Vec<u8>>);
137
138impl AsRef<[u8]> for RcKey {
139 fn as_ref(&self) -> &[u8] {
140 self.0.as_ref()
141 }
142}
143
144impl Borrow<[u8]> for RcKey {
145 fn borrow(&self) -> &[u8] {
146 self.as_ref()
147 }
148}
149
150impl From<Vec<u8>> for RcKey {
151 fn from(key: Vec<u8>) -> Self {
152 Self(key.into())
153 }
154}
155
156#[derive(Debug, Default)]
158struct Commit {
159 id: u64,
162 bytes: usize,
165 changeset: CommitChangeSet,
167}
168
169#[derive(Debug, Default)]
171struct CommitQueue {
172 record_id: u64,
174 bytes: usize,
176 commits: VecDeque<Commit>,
178}
179
180#[derive(Debug)]
181struct Trees {
182 readers: HashMap<Key, Weak<RwLock<Box<dyn TreeReader + Send + Sync>>>, IdentityBuildHasher>,
183 to_dereference: HashMap<Key, usize>,
185}
186
187#[derive(Debug)]
188struct DbInner {
189 columns: Vec<Column>,
190 options: Options,
191 shutdown: AtomicBool,
192 log: Log,
193 commit_queue: Mutex<CommitQueue>,
194 commit_queue_full_cv: Condvar,
195 log_worker_wait: WaitCondvar<bool>,
196 commit_worker_wait: Arc<WaitCondvar<bool>>,
197 commit_overlay: RwLock<Vec<CommitOverlay>>,
199 trees: RwLock<HashMap<ColId, Trees>>,
200 log_queue_wait: WaitCondvar<i64>,
202 flush_worker_wait: Arc<WaitCondvar<bool>>,
203 cleanup_worker_wait: WaitCondvar<bool>,
204 cleanup_queue_wait: WaitCondvar<bool>,
205 iteration_lock: Mutex<()>,
206 last_enacted: AtomicU64,
207 next_reindex: AtomicU64,
208 bg_err: Mutex<Option<Arc<Error>>>,
209 db_version: u32,
210 lock_file: std::fs::File,
211}
212
213#[derive(Debug)]
214struct WaitCondvar<S> {
215 cv: Condvar,
216 work: Mutex<S>,
217}
218
219impl<S: Default> WaitCondvar<S> {
220 fn new() -> Self {
221 WaitCondvar { cv: Condvar::new(), work: Mutex::new(S::default()) }
222 }
223}
224
225impl WaitCondvar<bool> {
226 fn signal(&self) {
227 let mut work = self.work.lock();
228 *work = true;
229 self.cv.notify_one();
230 }
231
232 pub fn wait(&self) {
233 let mut work = self.work.lock();
234 while !*work {
235 self.cv.wait(&mut work)
236 }
237 *work = false;
238 }
239}
240
241impl DbInner {
242 fn open(options: &Options, opening_mode: OpeningMode) -> Result<DbInner> {
243 if opening_mode == OpeningMode::Create {
244 try_io!(std::fs::create_dir_all(&options.path));
245 } else if !options.path.is_dir() {
246 return Err(Error::DatabaseNotFound)
247 }
248
249 let mut lock_path: std::path::PathBuf = options.path.clone();
250 lock_path.push("lock");
251 let lock_file = try_io!(std::fs::OpenOptions::new()
252 .create(true)
253 .read(true)
254 .write(true)
255 .open(lock_path.as_path()));
256 lock_file.try_lock_exclusive().map_err(Error::Locked)?;
257
258 let metadata = options.load_and_validate_metadata(opening_mode == OpeningMode::Create)?;
259 let mut columns = Vec::with_capacity(metadata.columns.len());
260 let mut commit_overlay = Vec::with_capacity(metadata.columns.len());
261 let log = Log::open(options)?;
262 let last_enacted = log.replay_record_id().unwrap_or(2) - 1;
263 for c in 0..metadata.columns.len() {
264 let column = Column::open(c as ColId, options, &metadata)?;
265 commit_overlay.push(CommitOverlay::new());
266 columns.push(column);
267 }
268 log::debug!(target: "parity-db", "Opened db {:?}, metadata={:?}", options, metadata);
269 let mut options = options.clone();
270 if options.salt.is_none() {
271 options.salt = Some(metadata.salt);
272 }
273
274 Ok(DbInner {
275 columns,
276 options,
277 shutdown: AtomicBool::new(false),
278 log,
279 commit_queue: Mutex::new(Default::default()),
280 commit_queue_full_cv: Condvar::new(),
281 log_worker_wait: WaitCondvar::new(),
282 commit_worker_wait: Arc::new(WaitCondvar::new()),
283 commit_overlay: RwLock::new(commit_overlay),
284 trees: RwLock::new(Default::default()),
285 log_queue_wait: WaitCondvar::new(),
286 flush_worker_wait: Arc::new(WaitCondvar::new()),
287 cleanup_worker_wait: WaitCondvar::new(),
288 cleanup_queue_wait: WaitCondvar::new(),
289 iteration_lock: Mutex::new(()),
290 next_reindex: AtomicU64::new(1),
291 last_enacted: AtomicU64::new(last_enacted),
292 bg_err: Mutex::new(None),
293 db_version: metadata.version,
294 lock_file,
295 })
296 }
297
298 fn init_table_data(&mut self) -> Result<()> {
299 for column in &mut self.columns {
300 column.init_table_data()?;
301 }
302 Ok(())
303 }
304
305 fn get(&self, col: ColId, key: &[u8], external_call: bool) -> Result<Option<Value>> {
306 if self.options.columns[col as usize].multitree && external_call {
307 return Err(Error::InvalidConfiguration(
308 "get not supported for multitree columns.".to_string(),
309 ))
310 }
311 match &self.columns[col as usize] {
312 Column::Hash(column) => {
313 let key = column.hash_key(key);
314 let overlay = self.commit_overlay.read();
315 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) {
317 return Ok(v.map(|i| i.as_ref().to_vec()))
318 }
319 std::mem::drop(overlay);
320 let log = self.log.overlays();
322 Ok(column.get(&key, log)?.map(|(v, _rc)| v))
323 },
324 Column::Tree(column) => {
325 let overlay = self.commit_overlay.read();
326 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
327 return Ok(l.map(|i| i.as_ref().to_vec()))
328 }
329 std::mem::drop(overlay);
330 let log = self.log.overlays().read();
332 column.with_locked(|btree| BTreeTable::get(key, &*log, btree))
333 },
334 }
335 }
336
337 fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
338 if self.options.columns[col as usize].multitree {
339 return Err(Error::InvalidConfiguration(
340 "get_size not supported for multitree columns.".to_string(),
341 ))
342 }
343 match &self.columns[col as usize] {
344 Column::Hash(column) => {
345 let key = column.hash_key(key);
346 let overlay = self.commit_overlay.read();
347 if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) {
349 return Ok(l)
350 }
351 let log = self.log.overlays();
353 column.get_size(&key, log)
354 },
355 Column::Tree(column) => {
356 let overlay = self.commit_overlay.read();
357 if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) {
358 return Ok(l.map(|v| v.as_ref().len() as u32))
359 }
360 let log = self.log.overlays().read();
361 let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?;
362 Ok(l.map(|v| v.len() as u32))
363 },
364 }
365 }
366
367 fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
368 if !self.options.columns[col as usize].multitree {
369 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
370 }
371 if !self.options.columns[col as usize].append_only &&
372 !self.options.columns[col as usize].allow_direct_node_access
373 {
374 return Err(Error::InvalidConfiguration(
375 "get_root can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
376 ))
377 }
378 let value = self.get(col, key, false)?;
379 if let Some(data) = value {
380 return Ok(Some(unpack_node_data(data)?))
381 }
382 Ok(None)
383 }
384
385 fn get_node(
386 &self,
387 col: ColId,
388 node_address: NodeAddress,
389 external_call: bool,
390 ) -> Result<Option<(Vec<u8>, Children)>> {
391 if !self.options.columns[col as usize].multitree {
392 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
393 }
394 if !self.options.columns[col as usize].append_only &&
395 !self.options.columns[col as usize].allow_direct_node_access &&
396 external_call
397 {
398 return Err(Error::InvalidConfiguration(
399 "get_node can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
400 ))
401 }
402 match &self.columns[col as usize] {
403 Column::Hash(column) => {
404 let overlay = self.commit_overlay.read();
405 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
407 {
408 return Ok(Some(unpack_node_data(v.as_ref().to_vec())?))
409 }
410 let log = self.log.overlays();
411 let value = column.get_value(Address::from_u64(node_address), log)?;
412 if let Some(data) = value {
413 return Ok(Some(unpack_node_data(data)?))
414 }
415 Ok(None)
416 },
417 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
418 }
419 }
420
421 fn get_node_children(
422 &self,
423 col: ColId,
424 node_address: NodeAddress,
425 external_call: bool,
426 ) -> Result<Option<Children>> {
427 if !self.options.columns[col as usize].multitree {
428 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
429 }
430 if !self.options.columns[col as usize].append_only &&
431 !self.options.columns[col as usize].allow_direct_node_access &&
432 external_call
433 {
434 return Err(Error::InvalidConfiguration(
435 "get_node_children can only be called on a column with append_only or allow_direct_node_access options.".to_string(),
436 ))
437 }
438 match &self.columns[col as usize] {
439 Column::Hash(column) => {
440 let overlay = self.commit_overlay.read();
441 if let Some(v) = overlay.get(col as usize).and_then(|o| o.get_address(node_address))
443 {
444 return Ok(Some(unpack_node_children(v.as_ref())?))
445 }
446 let log = self.log.overlays();
447 let value = column.get_value(Address::from_u64(node_address), log)?;
448 if let Some(data) = value {
449 return Ok(Some(unpack_node_children(&data)?))
450 }
451 Ok(None)
452 },
453 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
454 }
455 }
456
457 fn get_tree(
458 &self,
459 db: &Arc<DbInner>,
460 col: ColId,
461 key: &[u8],
462 check_existence: bool,
463 ) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
464 if !self.options.columns[col as usize].multitree {
465 return Err(Error::InvalidConfiguration("Not a multitree column.".to_string()))
466 }
467 match &self.columns[col as usize] {
468 Column::Hash(column) => {
469 if check_existence {
472 let root = self.get(col, key, false).unwrap();
473 if root.is_none() {
474 return Ok(None)
475 }
476 }
477
478 let hash_key = column.hash_key(key);
479
480 let trees = self.trees.upgradable_read();
481
482 if let Some(column_trees) = trees.get(&col) {
483 if let Some(reader) = column_trees.readers.get(&hash_key) {
484 let reader = reader.upgrade();
485 if let Some(reader) = reader {
486 return Ok(Some(reader))
487 }
488 }
489 }
490
491 let mut trees = RwLockUpgradableReadGuard::upgrade(trees);
492
493 let column_trees = trees.entry(col).or_insert_with(|| Trees {
494 readers: Default::default(),
495 to_dereference: Default::default(),
496 });
497
498 let reader: Box<dyn TreeReader + Send + Sync> =
499 Box::new(DbTreeReader { db: db.clone(), col, key: hash_key });
500 let reader = Arc::new(RwLock::new(reader));
501
502 column_trees.readers.insert(hash_key, Arc::downgrade(&reader));
503
504 Ok(Some(reader))
505 },
506 Column::Tree(_) => Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
507 }
508 }
509
510 fn btree_iter(&self, col: ColId) -> Result<BTreeIterator> {
511 match &self.columns[col as usize] {
512 Column::Hash(_column) =>
513 Err(Error::InvalidConfiguration("Not an indexed column.".to_string())),
514 Column::Tree(column) => {
515 let log = self.log.overlays();
516 BTreeIterator::new(column, col, log, &self.commit_overlay)
517 },
518 }
519 }
520
521 fn commit<I, K>(&self, tx: I) -> Result<()>
524 where
525 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
526 K: AsRef<[u8]>,
527 {
528 self.commit_changes(tx.into_iter().map(|(c, k, v)| {
529 (
530 c,
531 match v {
532 Some(v) => Operation::Set(k.as_ref().to_vec(), v),
533 None => Operation::Dereference(k.as_ref().to_vec()),
534 },
535 )
536 }))
537 }
538
539 fn commit_changes<I, V>(&self, tx: I) -> Result<()>
540 where
541 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, V>)>,
542 V: Into<RcValue>,
543 {
544 let mut commit: CommitChangeSet = Default::default();
545 for (col, change) in tx.into_iter() {
546 if self.options.columns[col as usize].btree_index {
547 commit
548 .btree_indexed
549 .entry(col)
550 .or_insert_with(|| BTreeChangeSet::new(col))
551 .push(change)?
552 } else if self.options.columns[col as usize].multitree {
553 match &self.columns[col as usize] {
554 Column::Hash(column) =>
555 match change {
556 Operation::Set(..) |
557 Operation::Reference(..) |
558 Operation::Dereference(..) =>
559 return Err(Error::InvalidConfiguration(
560 "Invalid operation for multitree column".to_string(),
561 )),
562 Operation::InsertTree(..) => {
563 let (root_data, node_values) = column.claim_tree_values(&change)?;
564
565 let trees = self.trees.read();
566 if let Some(column_trees) = trees.get(&col) {
567 for (hash, count) in &column_trees.to_dereference {
568 assert!(*count > 0);
569
570 let mut tree_active = false;
572 if let Some(reader) = column_trees.readers.get(hash) {
573 let reader = reader.upgrade();
574 if let Some(reader) = reader {
575 if reader.is_locked() {
576 tree_active = true;
577 }
578 }
579 }
580 if tree_active {
581 commit
582 .indexed
583 .entry(col)
584 .or_insert_with(|| IndexedChangeSet::new(col))
585 .used_trees
586 .insert(*hash);
587 }
588 }
589 }
590 drop(trees);
591
592 let root_operation = Operation::Set(change.key(), root_data);
593 commit
594 .indexed
595 .entry(col)
596 .or_insert_with(|| IndexedChangeSet::new(col))
597 .push(root_operation, &self.options, self.db_version)?;
598
599 for node_change in node_values {
600 commit
601 .indexed
602 .entry(col)
603 .or_insert_with(|| IndexedChangeSet::new(col))
604 .push_node_change(node_change);
605 }
606 },
607 Operation::ReferenceTree(..) => {
608 if !self.options.columns[col as usize].append_only {
609 let root_operation =
610 Operation::<&_, V>::Reference(change.key());
611 commit
612 .indexed
613 .entry(col)
614 .or_insert_with(|| IndexedChangeSet::new(col))
615 .push(root_operation, &self.options, self.db_version)?;
616 }
617 },
618 Operation::DereferenceTree(key) => {
619 if self.options.columns[col as usize].append_only {
620 return Err(Error::InvalidConfiguration("Attempting to dereference a tree from an append_only column.".to_string()))
621 }
622 let value = self.get(col, &key, false)?;
623 if let Some(data) = value {
624 let root_data = unpack_node_data(data)?;
625 let children = root_data.1;
626 let salt = self.options.salt.unwrap_or_default();
627 let hash = hash_key(
628 &key,
629 &salt,
630 self.options.columns[col as usize].uniform,
631 self.db_version,
632 );
633
634 let mut trees = self.trees.write();
635
636 let column_trees = trees.entry(col).or_insert_with(|| Trees {
637 readers: Default::default(),
638 to_dereference: Default::default(),
639 });
640 let count =
641 column_trees.to_dereference.get(&hash).unwrap_or(&0) + 1;
642 column_trees.to_dereference.insert(hash, count);
643
644 drop(trees);
645
646 commit.check_for_deferral = true;
647
648 let node_change =
649 NodeChange::DereferenceChildren(key, hash, children);
650
651 commit
652 .indexed
653 .entry(col)
654 .or_insert_with(|| IndexedChangeSet::new(col))
655 .push_node_change(node_change);
656 } else {
657 return Err(Error::InvalidConfiguration(
658 "No entry for tree root".to_string(),
659 ))
660 }
661 },
662 },
663 Column::Tree(_) =>
664 return Err(Error::InvalidConfiguration("Not a HashColumn".to_string())),
665 }
666 } else {
667 commit.indexed.entry(col).or_insert_with(|| IndexedChangeSet::new(col)).push(
668 change,
669 &self.options,
670 self.db_version,
671 )?
672 }
673 }
674
675 self.commit_raw(commit)
676 }
677
678 fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
679 let mut queue = self.commit_queue.lock();
680
681 #[cfg(any(test, feature = "instrumentation"))]
682 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
683 #[cfg(not(any(test, feature = "instrumentation")))]
684 let might_wait_because_the_queue_is_full = true;
685 if might_wait_because_the_queue_is_full && queue.bytes > MAX_COMMIT_QUEUE_BYTES {
686 log::debug!(target: "parity-db", "Waiting, queue size={}", queue.bytes);
687 self.commit_queue_full_cv.wait(&mut queue);
688 }
689
690 {
691 let bg_err = self.bg_err.lock();
692 if let Some(err) = &*bg_err {
693 return Err(Error::Background(err.clone()))
694 }
695 }
696
697 let mut overlay = self.commit_overlay.write();
698
699 queue.record_id += 1;
700 let record_id = queue.record_id;
701
702 let mut bytes = 0;
703 for (c, indexed) in &commit.indexed {
704 indexed.copy_to_overlay(
705 &mut overlay[*c as usize],
706 record_id,
707 &mut bytes,
708 &self.options,
709 )?;
710 }
711
712 for (c, iterset) in &commit.btree_indexed {
713 iterset.copy_to_overlay(
714 &mut overlay[*c as usize].btree_indexed,
715 record_id,
716 &mut bytes,
717 &self.options,
718 )?;
719 }
720
721 let commit = Commit { id: record_id, changeset: commit, bytes };
722
723 log::debug!(
724 target: "parity-db",
725 "Queued commit {}, {} bytes",
726 commit.id,
727 bytes,
728 );
729 queue.commits.push_back(commit);
730 queue.bytes += bytes;
731 self.log_worker_wait.signal();
732 Ok(())
733 }
734
735 fn defer_commit(
736 &self,
737 mut queue: MutexGuard<CommitQueue>,
738 mut commit: CommitChangeSet,
739 old_bytes: usize,
740 old_id: u64,
741 new_id: Option<u64>,
742 ) -> Result<()> {
743 let record_id = if let Some(id) = new_id {
744 id
745 } else {
746 queue.record_id += 1;
747 queue.record_id
748 };
749
750 let bytes = if record_id != old_id {
751 let mut overlay = self.commit_overlay.write();
752
753 let mut bytes = 0;
754
755 for (c, indexed) in &commit.indexed {
756 indexed.copy_to_overlay(
757 &mut overlay[*c as usize],
758 record_id,
759 &mut bytes,
760 &self.options,
761 )?;
762 }
763
764 for (c, iterset) in &commit.btree_indexed {
765 iterset.copy_to_overlay(
766 &mut overlay[*c as usize].btree_indexed,
767 record_id,
768 &mut bytes,
769 &self.options,
770 )?;
771 }
772
773 {
774 for (c, key_values) in commit.indexed.iter() {
776 key_values.clean_overlay(&mut overlay[*c as usize], old_id);
777 }
778 for (c, iterset) in commit.btree_indexed.iter_mut() {
779 iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, old_id);
780 }
781 }
782
783 bytes
784 } else {
785 old_bytes
786 };
787
788 let commit = Commit { id: record_id, changeset: commit, bytes };
789
790 log::debug!(
791 target: "parity-db",
792 "Deferred commit, old id: {}, new id: {}",
793 old_id,
794 record_id,
795 );
796 queue.commits.push_back(commit);
797 queue.bytes += bytes;
798 Ok(())
799 }
800
801 fn process_commits(&self, db: &Arc<DbInner>) -> Result<bool> {
802 #[cfg(any(test, feature = "instrumentation"))]
803 let might_wait_because_the_queue_is_full = self.options.with_background_thread;
804 #[cfg(not(any(test, feature = "instrumentation")))]
805 let might_wait_because_the_queue_is_full = true;
806 if might_wait_because_the_queue_is_full {
807 let mut queue = self.log_queue_wait.work.lock();
809 if !self.shutdown.load(Ordering::Relaxed) && *queue > MAX_LOG_QUEUE_BYTES {
810 log::debug!(target: "parity-db", "Waiting, log_bytes={}", queue);
811 self.log_queue_wait.cv.wait(&mut queue);
812 }
813 }
814 let commit = {
815 let mut queue = self.commit_queue.lock();
816 if let Some(commit) = queue.commits.pop_front() {
817 queue.bytes -= commit.bytes;
818 log::debug!(
819 target: "parity-db",
820 "Removed {}. Still queued commits {} bytes",
821 commit.bytes,
822 queue.bytes,
823 );
824 if queue.bytes <= MAX_COMMIT_QUEUE_BYTES &&
825 (queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES
826 {
827 log::debug!(
829 target: "parity-db",
830 "Waking up commit queue worker",
831 );
832 self.commit_queue_full_cv.notify_all();
833 }
834 Some(commit)
835 } else {
836 None
837 }
838 };
839
840 if let Some(mut commit) = commit {
841 if commit.changeset.check_for_deferral {
842 let mut defer = false;
843 'outer: for (col, key_values) in commit.changeset.indexed.iter() {
844 for change in &key_values.node_changes {
845 if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
846 let trees = self.trees.read();
849 if let Some(column_trees) = trees.get(&col) {
850 let mut tree_active = false;
851 if let Some(reader) = column_trees.readers.get(hash) {
852 let reader = reader.upgrade();
853 if let Some(reader) = reader {
854 if reader.is_locked() {
855 tree_active = true;
856 }
857 }
858 }
859 if tree_active {
860 defer = true;
861 break 'outer
862 }
863 }
864 drop(trees);
865
866 let queue = self.commit_queue.lock();
869 for commit in &queue.commits {
870 for (_col, change_set) in &commit.changeset.indexed {
871 for tree in &change_set.used_trees {
872 if tree == hash {
873 defer = true;
874 break 'outer
875 }
876 }
877 }
878 }
879 }
880 }
881 }
882 if defer {
883 let queue = self.commit_queue.lock();
884 let new_id = if queue.commits.len() > 0 {
885 None
887 } else {
888 Some(commit.id)
890 };
891 self.defer_commit(queue, commit.changeset, commit.bytes, commit.id, new_id)?;
892
893 return Ok(true)
894 } else {
895 for (col, key_values) in commit.changeset.indexed.iter() {
896 for change in &key_values.node_changes {
897 if let NodeChange::DereferenceChildren(_key, hash, _children) = change {
898 let mut trees = self.trees.write();
899 if let Some(column_trees) = trees.get_mut(&col) {
900 let count = column_trees.to_dereference.get(hash).unwrap_or(&0);
901 assert!(*count > 0);
902 if *count == 1 {
903 column_trees.to_dereference.remove(hash);
904 } else {
905 column_trees.to_dereference.insert(*hash, count - 1);
906 }
907 }
908 }
909 }
910 }
911 }
912 }
913
914 let mut reindex = false;
915 let mut writer = self.log.begin_record();
916 log::debug!(
917 target: "parity-db",
918 "Processing commit {}, record {}, {} bytes",
919 commit.id,
920 writer.record_id(),
921 commit.bytes,
922 );
923 let mut ops: u64 = 0;
924 for (c, key_values) in commit.changeset.indexed.iter() {
925 key_values.write_plan(
926 db,
927 *c,
928 &self.columns[*c as usize],
929 &mut writer,
930 &mut ops,
931 &mut reindex,
932 )?;
933 }
934
935 for (c, btree) in commit.changeset.btree_indexed.iter_mut() {
936 match &self.columns[*c as usize] {
937 Column::Hash(_column) =>
938 return Err(Error::InvalidConfiguration(
939 "Not an indexed column.".to_string(),
940 )),
941 Column::Tree(column) => {
942 btree.write_plan(column, &mut writer, &mut ops)?;
943 },
944 }
945 }
946
947 for c in self.columns.iter() {
949 c.complete_plan(&mut writer)?;
950 }
951 let record_id = writer.record_id();
952 let l = writer.drain();
953
954 let bytes = {
955 let bytes = self.log.end_record(l)?;
956 let mut logged_bytes = self.log_queue_wait.work.lock();
957 *logged_bytes += bytes as i64;
958 self.flush_worker_wait.signal();
959 bytes
960 };
961
962 {
963 let mut overlay = self.commit_overlay.write();
965 for (c, key_values) in commit.changeset.indexed.iter() {
966 key_values.clean_overlay(&mut overlay[*c as usize], commit.id);
967 }
968 for (c, iterset) in commit.changeset.btree_indexed.iter_mut() {
969 iterset.clean_overlay(&mut overlay[*c as usize].btree_indexed, commit.id);
970 }
971 }
972
973 if reindex {
974 self.start_reindex(record_id);
975 }
976
977 log::debug!(
978 target: "parity-db",
979 "Processed commit {} (record {}), {} ops, {} bytes written",
980 commit.id,
981 record_id,
982 ops,
983 bytes,
984 );
985 Ok(true)
986 } else {
987 Ok(false)
988 }
989 }
990
991 fn start_reindex(&self, record_id: u64) {
992 log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
993 self.next_reindex.store(record_id, Ordering::SeqCst);
994 }
995
996 fn process_reindex(&self) -> Result<bool> {
997 let next_reindex = self.next_reindex.load(Ordering::SeqCst);
998 if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) {
999 return Ok(false)
1000 }
1001 for column in self.columns.iter() {
1003 let column = if let Column::Hash(c) = column { c } else { continue };
1004 let ReindexBatch {
1005 drop_index,
1006 batch,
1007 drop_ref_count,
1008 ref_count_batch,
1009 ref_count_batch_source,
1010 } = column.reindex(&self.log)?;
1011 if !batch.is_empty() || drop_index.is_some() {
1012 debug_assert!(
1013 ref_count_batch.is_empty() &&
1014 ref_count_batch_source.is_none() &&
1015 drop_ref_count.is_none()
1016 );
1017 let mut next_reindex = false;
1018 let mut writer = self.log.begin_record();
1019 log::debug!(
1020 target: "parity-db",
1021 "Creating reindex record {}",
1022 writer.record_id(),
1023 );
1024 for (key, address) in batch.into_iter() {
1025 if let PlanOutcome::NeedReindex =
1026 column.write_reindex_plan(&key, address, &mut writer)?
1027 {
1028 next_reindex = true
1029 }
1030 }
1031 if let Some(table) = drop_index {
1032 writer.drop_table(table);
1033 }
1034 let record_id = writer.record_id();
1035 let l = writer.drain();
1036
1037 let mut logged_bytes = self.log_queue_wait.work.lock();
1038 let bytes = self.log.end_record(l)?;
1039 log::debug!(
1040 target: "parity-db",
1041 "Created reindex record {}, {} bytes",
1042 record_id,
1043 bytes,
1044 );
1045 *logged_bytes += bytes as i64;
1046 if next_reindex {
1047 self.start_reindex(record_id);
1048 }
1049 self.flush_worker_wait.signal();
1050 return Ok(true)
1051 }
1052 if !ref_count_batch.is_empty() || drop_ref_count.is_some() {
1053 debug_assert!(batch.is_empty() && drop_index.is_none());
1054 debug_assert!(ref_count_batch_source.is_some());
1055 let ref_count_source = ref_count_batch_source.unwrap();
1056 let mut next_reindex = false;
1057 let mut writer = self.log.begin_record();
1058 log::debug!(
1059 target: "parity-db",
1060 "Creating ref count reindex record {}",
1061 writer.record_id(),
1062 );
1063 for (address, ref_count) in ref_count_batch.into_iter() {
1064 if let PlanOutcome::NeedReindex = column.write_ref_count_reindex_plan(
1065 address,
1066 ref_count,
1067 ref_count_source,
1068 &mut writer,
1069 )? {
1070 next_reindex = true
1071 }
1072 }
1073 if let Some(table) = drop_ref_count {
1074 writer.drop_ref_count_table(table);
1075 }
1076 let record_id = writer.record_id();
1077 let l = writer.drain();
1078
1079 let mut logged_bytes = self.log_queue_wait.work.lock();
1080 let bytes = self.log.end_record(l)?;
1081 log::debug!(
1082 target: "parity-db",
1083 "Created ref count reindex record {}, {} bytes",
1084 record_id,
1085 bytes,
1086 );
1087 *logged_bytes += bytes as i64;
1088 if next_reindex {
1089 self.start_reindex(record_id);
1090 }
1091 self.flush_worker_wait.signal();
1092 return Ok(true)
1093 }
1094 }
1095 self.next_reindex.store(0, Ordering::SeqCst);
1096 Ok(false)
1097 }
1098
1099 fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
1100 let _iteration_lock = self.iteration_lock.lock();
1101 let cleared = {
1102 let reader = match self.log.read_next(validation_mode) {
1103 Ok(reader) => reader,
1104 Err(Error::Corruption(_)) if validation_mode => {
1105 log::debug!(target: "parity-db", "Bad log header");
1106 self.log.clear_replay_logs();
1107 return Ok(false)
1108 },
1109 Err(e) => return Err(e),
1110 };
1111 if let Some(mut reader) = reader {
1112 log::debug!(
1113 target: "parity-db",
1114 "Enacting log record {}",
1115 reader.record_id(),
1116 );
1117 if validation_mode {
1118 if reader.record_id() != self.last_enacted.load(Ordering::Relaxed) + 1 {
1119 log::warn!(
1120 target: "parity-db",
1121 "Log sequence error. Expected record {}, got {}",
1122 self.last_enacted.load(Ordering::Relaxed) + 1,
1123 reader.record_id(),
1124 );
1125 drop(reader);
1126 self.log.clear_replay_logs();
1127 return Ok(false)
1128 }
1129 loop {
1131 let next = match reader.next() {
1132 Ok(next) => next,
1133 Err(e) => {
1134 log::debug!(target: "parity-db", "Error reading log: {:?}", e);
1135 return Ok(false)
1136 },
1137 };
1138 match next {
1139 LogAction::BeginRecord => {
1140 log::debug!(target: "parity-db", "Unexpected log header");
1141 drop(reader);
1142 self.log.clear_replay_logs();
1143 return Ok(false)
1144 },
1145 LogAction::EndRecord => break,
1146 LogAction::InsertIndex(insertion) => {
1147 let col = insertion.table.col() as usize;
1148 if let Err(e) = self.columns.get(col).map_or_else(
1149 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1150 |col| {
1151 col.validate_plan(
1152 LogAction::InsertIndex(insertion),
1153 &mut reader,
1154 )
1155 },
1156 ) {
1157 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1158 drop(reader);
1159 self.log.clear_replay_logs();
1160 return Ok(false)
1161 }
1162 },
1163 LogAction::InsertValue(insertion) => {
1164 let col = insertion.table.col() as usize;
1165 if let Err(e) = self.columns.get(col).map_or_else(
1166 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1167 |col| {
1168 col.validate_plan(
1169 LogAction::InsertValue(insertion),
1170 &mut reader,
1171 )
1172 },
1173 ) {
1174 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1175 drop(reader);
1176 self.log.clear_replay_logs();
1177 return Ok(false)
1178 }
1179 },
1180 LogAction::InsertRefCount(insertion) => {
1181 let col = insertion.table.col() as usize;
1182 if let Err(e) = self.columns.get(col).map_or_else(
1183 || Err(Error::Corruption(format!("Invalid column id {col}"))),
1184 |col| {
1185 col.validate_plan(
1186 LogAction::InsertRefCount(insertion),
1187 &mut reader,
1188 )
1189 },
1190 ) {
1191 log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
1192 drop(reader);
1193 self.log.clear_replay_logs();
1194 return Ok(false)
1195 }
1196 },
1197 LogAction::DropTable(_) | LogAction::DropRefCountTable(_) => continue,
1198 }
1199 }
1200 reader.reset()?;
1201 reader.next()?;
1202 }
1203 loop {
1204 match reader.next()? {
1205 LogAction::BeginRecord =>
1206 return Err(Error::Corruption("Bad log record".into())),
1207 LogAction::EndRecord => break,
1208 LogAction::InsertIndex(insertion) => {
1209 self.columns[insertion.table.col() as usize]
1210 .enact_plan(LogAction::InsertIndex(insertion), &mut reader)?;
1211 },
1212 LogAction::InsertValue(insertion) => {
1213 self.columns[insertion.table.col() as usize]
1214 .enact_plan(LogAction::InsertValue(insertion), &mut reader)?;
1215 },
1216 LogAction::InsertRefCount(insertion) => {
1217 self.columns[insertion.table.col() as usize]
1218 .enact_plan(LogAction::InsertRefCount(insertion), &mut reader)?;
1219 },
1220 LogAction::DropTable(id) => {
1221 log::debug!(
1222 target: "parity-db",
1223 "Dropping index {}",
1224 id,
1225 );
1226 match &self.columns[id.col() as usize] {
1227 Column::Hash(col) => {
1228 col.drop_index(id)?;
1229 self.start_reindex(reader.record_id());
1231 },
1232 Column::Tree(_) => (),
1233 }
1234 },
1235 LogAction::DropRefCountTable(id) => {
1236 log::debug!(
1237 target: "parity-db",
1238 "Dropping ref count {}",
1239 id,
1240 );
1241 match &self.columns[id.col() as usize] {
1242 Column::Hash(col) => {
1243 col.drop_ref_count(id)?;
1244 self.start_reindex(reader.record_id());
1246 },
1247 Column::Tree(_) => (),
1248 }
1249 },
1250 }
1251 }
1252 log::debug!(
1253 target: "parity-db",
1254 "Enacted log record {}, {} bytes",
1255 reader.record_id(),
1256 reader.read_bytes(),
1257 );
1258 let record_id = reader.record_id();
1259 let bytes = reader.read_bytes();
1260 let cleared = reader.drain();
1261 self.last_enacted.store(record_id, Ordering::SeqCst);
1262 Some((record_id, cleared, bytes))
1263 } else {
1264 log::debug!(target: "parity-db", "End of log");
1265 None
1266 }
1267 };
1268
1269 if let Some((record_id, cleared, bytes)) = cleared {
1270 self.log.end_read(cleared, record_id);
1271 {
1272 if !validation_mode {
1273 let mut queue = self.log_queue_wait.work.lock();
1274 if *queue < bytes as i64 {
1275 log::warn!(
1276 target: "parity-db",
1277 "Detected log underflow record {}, {} bytes, {} queued, reindex = {}",
1278 record_id,
1279 bytes,
1280 *queue,
1281 self.next_reindex.load(Ordering::SeqCst),
1282 );
1283 }
1284 *queue -= bytes as i64;
1285 if *queue <= MAX_LOG_QUEUE_BYTES &&
1286 (*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES
1287 {
1288 self.log_queue_wait.cv.notify_one();
1289 }
1290 log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
1291 }
1292
1293 let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
1294 let dirty_logs = self.log.num_dirty_logs();
1295 if !validation_mode {
1296 while self.log.num_dirty_logs() > max_logs {
1297 log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
1298 self.cleanup_queue_wait.wait();
1299 }
1300 }
1301 }
1302 Ok(true)
1303 } else {
1304 Ok(false)
1305 }
1306 }
1307
1308 fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
1309 let has_flushed = self.log.flush_one(min_log_size)?;
1310 if has_flushed {
1311 self.commit_worker_wait.signal();
1312 }
1313 Ok(has_flushed)
1314 }
1315
1316 fn clean_logs(&self) -> Result<bool> {
1317 let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
1318 let num_cleanup = self.log.num_dirty_logs();
1319 let result = if num_cleanup > keep_logs {
1320 if self.options.sync_data {
1321 for c in self.columns.iter() {
1322 c.flush()?;
1323 }
1324 }
1325 self.log.clean_logs(num_cleanup - keep_logs)?
1326 } else {
1327 false
1328 };
1329 self.cleanup_queue_wait.signal();
1330 Ok(result)
1331 }
1332
1333 fn clean_all_logs(&self) -> Result<()> {
1334 for c in self.columns.iter() {
1335 c.flush()?;
1336 }
1337 let num_cleanup = self.log.num_dirty_logs();
1338 self.log.clean_logs(num_cleanup)?;
1339 Ok(())
1340 }
1341
1342 fn replay_all_logs(&self) -> Result<()> {
1343 while let Some(id) = self.log.replay_next()? {
1344 log::debug!(target: "parity-db", "Replaying database log {}", id);
1345 while self.enact_logs(true)? {}
1346 }
1347
1348 for c in self.columns.iter() {
1350 c.refresh_metadata()?;
1351 }
1352 log::debug!(target: "parity-db", "Replay is complete.");
1353 Ok(())
1354 }
1355
1356 fn shutdown(&self) {
1357 self.shutdown.store(true, Ordering::SeqCst);
1358 self.log_queue_wait.cv.notify_one();
1359 self.flush_worker_wait.signal();
1360 self.log_worker_wait.signal();
1361 self.commit_worker_wait.signal();
1362 self.cleanup_worker_wait.signal();
1363 }
1364
1365 fn kill_logs(&self, db: &Arc<DbInner>) -> Result<()> {
1366 {
1367 if let Some(err) = self.bg_err.lock().as_ref() {
1368 log::debug!(target: "parity-db", "Shutdown with error state {}", err);
1371 self.log.clean_logs(self.log.num_dirty_logs())?;
1372 return Ok(())
1373 }
1374 }
1375 log::debug!(target: "parity-db", "Processing leftover commits");
1376 while self.enact_logs(false)? {}
1378 self.flush_logs(0)?;
1379 while self.process_commits(db)? {}
1380 while self.enact_logs(false)? {}
1381 self.flush_logs(0)?;
1382 while self.enact_logs(false)? {}
1383 self.clean_all_logs()?;
1384 self.log.kill_logs()?;
1385 if self.options.stats {
1386 let mut path = self.options.path.clone();
1387 path.push("stats.txt");
1388 match std::fs::File::create(path) {
1389 Ok(file) => {
1390 let mut writer = std::io::BufWriter::new(file);
1391 if let Err(e) = self.write_stats_text(&mut writer, None) {
1392 log::warn!(target: "parity-db", "Error writing stats file: {:?}", e)
1393 }
1394 },
1395 Err(e) => log::warn!(target: "parity-db", "Error creating stats file: {:?}", e),
1396 }
1397 }
1398 Ok(())
1399 }
1400
1401 fn write_stats_text(&self, writer: &mut impl std::io::Write, column: Option<u8>) -> Result<()> {
1402 if let Some(col) = column {
1403 self.columns[col as usize].write_stats_text(writer)
1404 } else {
1405 for c in self.columns.iter() {
1406 c.write_stats_text(writer)?;
1407 }
1408 Ok(())
1409 }
1410 }
1411
1412 fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1413 if let Some(col) = column {
1414 self.columns[col as usize].clear_stats()
1415 } else {
1416 for c in self.columns.iter() {
1417 c.clear_stats()?;
1418 }
1419 Ok(())
1420 }
1421 }
1422
1423 fn stats(&self) -> StatSummary {
1424 StatSummary { columns: self.columns.iter().map(|c| c.stats()).collect() }
1425 }
1426
1427 fn store_err(&self, result: Result<()>) {
1428 if let Err(e) = result {
1429 log::warn!(target: "parity-db", "Background worker error: {}", e);
1430 let mut err = self.bg_err.lock();
1431 if err.is_none() {
1432 *err = Some(Arc::new(e));
1433 self.shutdown();
1434 }
1435 self.commit_queue_full_cv.notify_all();
1436 }
1437 }
1438
1439 fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1440 let _lock = self.iteration_lock.lock();
1441 match &self.columns[c as usize] {
1442 Column::Hash(column) => column.iter_values(&self.log, f),
1443 Column::Tree(_) => unimplemented!(),
1444 }
1445 }
1446
1447 fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
1448 let _lock = self.iteration_lock.lock();
1449 match &self.columns[c as usize] {
1450 Column::Hash(column) => column.iter_index(&self.log, f),
1451 Column::Tree(_) => unimplemented!(),
1452 }
1453 }
1454}
1455
1456pub struct Db {
1458 inner: Arc<DbInner>,
1459 commit_thread: Option<thread::JoinHandle<()>>,
1460 flush_thread: Option<thread::JoinHandle<()>>,
1461 log_thread: Option<thread::JoinHandle<()>>,
1462 cleanup_thread: Option<thread::JoinHandle<()>>,
1463}
1464
1465impl Db {
1466 #[cfg(test)]
1467 pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
1468 let options = Options::with_columns(path, num_columns);
1469 Self::open_inner(&options, OpeningMode::Create)
1470 }
1471
1472 pub fn open(options: &Options) -> Result<Db> {
1475 Self::open_inner(options, OpeningMode::Write)
1476 }
1477
1478 pub fn open_or_create(options: &Options) -> Result<Db> {
1481 Self::open_inner(options, OpeningMode::Create)
1482 }
1483
1484 pub fn open_read_only(options: &Options) -> Result<Db> {
1486 Self::open_inner(options, OpeningMode::ReadOnly)
1487 }
1488
1489 fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
1490 assert!(options.is_valid());
1491 let mut db = DbInner::open(options, opening_mode)?;
1492 if let Err(e) = db.replay_all_logs() {
1495 log::debug!(target: "parity-db", "Error during log replay.");
1496 return Err(e)
1497 } else {
1498 db.log.clear_replay_logs();
1499 db.clean_all_logs()?;
1500 db.log.kill_logs()?;
1501 }
1502 db.init_table_data()?;
1503 let db = Arc::new(db);
1504 #[cfg(any(test, feature = "instrumentation"))]
1505 let start_threads = opening_mode != OpeningMode::ReadOnly && options.with_background_thread;
1506 #[cfg(not(any(test, feature = "instrumentation")))]
1507 let start_threads = opening_mode != OpeningMode::ReadOnly;
1508 let commit_thread = if start_threads {
1509 let commit_worker_db = db.clone();
1510 Some(thread::spawn(move || {
1511 commit_worker_db.store_err(Self::commit_worker(commit_worker_db.clone()))
1512 }))
1513 } else {
1514 None
1515 };
1516 let flush_thread = if start_threads {
1517 let flush_worker_db = db.clone();
1518 #[cfg(any(test, feature = "instrumentation"))]
1519 let min_log_size = if options.always_flush { 0 } else { MIN_LOG_SIZE_BYTES };
1520 #[cfg(not(any(test, feature = "instrumentation")))]
1521 let min_log_size = MIN_LOG_SIZE_BYTES;
1522 Some(thread::spawn(move || {
1523 flush_worker_db.store_err(Self::flush_worker(flush_worker_db.clone(), min_log_size))
1524 }))
1525 } else {
1526 None
1527 };
1528 let log_thread = if start_threads {
1529 let log_worker_db = db.clone();
1530 Some(thread::spawn(move || {
1531 log_worker_db.store_err(Self::log_worker(log_worker_db.clone()))
1532 }))
1533 } else {
1534 None
1535 };
1536 let cleanup_thread = if start_threads {
1537 let cleanup_worker_db = db.clone();
1538 Some(thread::spawn(move || {
1539 cleanup_worker_db.store_err(Self::cleanup_worker(cleanup_worker_db.clone()))
1540 }))
1541 } else {
1542 None
1543 };
1544 Ok(Db { inner: db, commit_thread, flush_thread, log_thread, cleanup_thread })
1545 }
1546
1547 pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
1549 self.inner.get(col, key, true)
1550 }
1551
1552 pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
1554 self.inner.get_size(col, key)
1555 }
1556
1557 pub fn iter(&self, col: ColId) -> Result<BTreeIterator> {
1560 self.inner.btree_iter(col)
1561 }
1562
1563 pub fn get_tree(
1564 &self,
1565 col: ColId,
1566 key: &[u8],
1567 ) -> Result<Option<Arc<RwLock<Box<dyn TreeReader + Send + Sync>>>>> {
1568 self.inner.get_tree(&self.inner, col, key, true)
1569 }
1570
1571 pub fn get_root(&self, col: ColId, key: &[u8]) -> Result<Option<(Vec<u8>, Children)>> {
1572 self.inner.get_root(col, key)
1573 }
1574
1575 pub fn get_node(
1576 &self,
1577 col: ColId,
1578 node_address: NodeAddress,
1579 ) -> Result<Option<(Vec<u8>, Children)>> {
1580 self.inner.get_node(col, node_address, true)
1581 }
1582
1583 pub fn get_node_children(
1584 &self,
1585 col: ColId,
1586 node_address: NodeAddress,
1587 ) -> Result<Option<Children>> {
1588 self.inner.get_node_children(col, node_address, true)
1589 }
1590
1591 pub fn commit<I, K>(&self, tx: I) -> Result<()>
1593 where
1594 I: IntoIterator<Item = (ColId, K, Option<Value>)>,
1595 K: AsRef<[u8]>,
1596 {
1597 self.inner.commit(tx)
1598 }
1599
1600 pub fn commit_changes<I>(&self, tx: I) -> Result<()>
1602 where
1603 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
1604 {
1605 self.inner.commit_changes(tx)
1606 }
1607
1608 #[cfg(feature = "arc")]
1612 #[deprecated(note = "This method will be removed in future versions. Use `commit_changes_bytes` instead")]
1613 pub fn commit_changes_shared<I>(&self, tx: I) -> Result<()>
1614 where
1615 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Arc<Vec<u8>>>)>,
1616 {
1617 self.inner.commit_changes(tx)
1618 }
1619
1620 #[cfg(feature = "bytes")]
1624 pub fn commit_changes_bytes<I>(&self, tx: I) -> Result<()>
1625 where
1626 I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Bytes>)>,
1627 {
1628 self.inner.commit_changes(tx)
1629 }
1630
1631 pub(crate) fn commit_raw(&self, commit: CommitChangeSet) -> Result<()> {
1632 self.inner.commit_raw(commit)
1633 }
1634
1635 pub fn num_columns(&self) -> u8 {
1637 self.inner.columns.len() as u8
1638 }
1639
1640 pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
1644 self.inner.iter_column_while(c, f)
1645 }
1646
1647 pub(crate) fn iter_column_index_while(
1652 &self,
1653 c: ColId,
1654 f: impl FnMut(IterState) -> bool,
1655 ) -> Result<()> {
1656 self.inner.iter_column_index_while(c, f)
1657 }
1658
1659 fn commit_worker(db: Arc<DbInner>) -> Result<()> {
1660 let mut more_work = false;
1661 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1662 if !more_work {
1663 db.cleanup_worker_wait.signal();
1664 if !db.log.has_log_files_to_read() {
1665 db.commit_worker_wait.wait();
1666 }
1667 }
1668
1669 more_work = db.enact_logs(false)?;
1670 }
1671 log::debug!(target: "parity-db", "Commit worker shutdown");
1672 Ok(())
1673 }
1674
1675 fn log_worker(db: Arc<DbInner>) -> Result<()> {
1676 let mut more_reindex = db.process_reindex()?;
1678 let mut more_commits = false;
1679 while !db.shutdown.load(Ordering::SeqCst) || more_commits {
1681 if !more_commits && !more_reindex {
1682 db.log_worker_wait.wait();
1683 }
1684
1685 more_commits = db.process_commits(&db)?;
1686 more_reindex = db.process_reindex()?;
1687 }
1688 log::debug!(target: "parity-db", "Log worker shutdown");
1689 Ok(())
1690 }
1691
1692 fn flush_worker(db: Arc<DbInner>, min_log_size: u64) -> Result<()> {
1693 let mut more_work = false;
1694 while !db.shutdown.load(Ordering::SeqCst) {
1695 if !more_work {
1696 db.flush_worker_wait.wait();
1697 }
1698 more_work = db.flush_logs(min_log_size)?;
1699 }
1700 log::debug!(target: "parity-db", "Flush worker shutdown");
1701 Ok(())
1702 }
1703
1704 fn cleanup_worker(db: Arc<DbInner>) -> Result<()> {
1705 let mut more_work = true;
1706 while !db.shutdown.load(Ordering::SeqCst) || more_work {
1707 if !more_work {
1708 db.cleanup_worker_wait.wait();
1709 }
1710 more_work = db.clean_logs()?;
1711 }
1712 log::debug!(target: "parity-db", "Cleanup worker shutdown");
1713 Ok(())
1714 }
1715
1716 pub fn write_stats_text(
1718 &self,
1719 writer: &mut impl std::io::Write,
1720 column: Option<u8>,
1721 ) -> Result<()> {
1722 self.inner.write_stats_text(writer, column)
1723 }
1724
1725 pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
1727 self.inner.clear_stats(column)
1728 }
1729
1730 pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
1732 if let Some(col) = check_param.column {
1733 self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
1734 } else {
1735 for (ix, c) in self.inner.columns.iter().enumerate() {
1736 c.dump(&self.inner.log, &check_param, ix as ColId)?;
1737 }
1738 }
1739 Ok(())
1740 }
1741
1742 pub fn stats(&self) -> StatSummary {
1744 self.inner.stats()
1745 }
1746
1747 pub fn get_num_column_value_entries(&self, col: ColId) -> Result<u64> {
1748 let column = &self.inner.columns[col as usize];
1749 match column {
1750 Column::Hash(column) => return column.get_num_value_entries(),
1751 Column::Tree(..) =>
1752 return Err(Error::InvalidConfiguration(
1753 "get_num_column_value_entries not implemented for tree columns.".to_string(),
1754 )),
1755 }
1756 }
1757
1758 fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
1761 let db = Db::open(options)?;
1762 let salt = db.inner.options.salt;
1763 drop(db);
1764 Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
1765 }
1766
1767 pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
1769 let salt = Self::precheck_column_operation(options)?;
1770
1771 options.columns.push(new_column_options);
1772 options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;
1773
1774 Ok(())
1775 }
1776
1777 pub fn drop_last_column(options: &mut Options) -> Result<()> {
1780 let salt = Self::precheck_column_operation(options)?;
1781 let nb_column = options.columns.len();
1782 if nb_column == 0 {
1783 return Ok(())
1784 }
1785 let index = options.columns.len() - 1;
1786 Self::remove_column_files(options, index as u8)?;
1787 options.columns.pop();
1788 options.write_metadata(&options.path, &salt)?;
1789 Ok(())
1790 }
1791
1792 pub fn reset_column(
1795 options: &mut Options,
1796 index: u8,
1797 new_options: Option<ColumnOptions>,
1798 ) -> Result<()> {
1799 let salt = Self::precheck_column_operation(options)?;
1800 Self::remove_column_files(options, index)?;
1801
1802 if let Some(new_options) = new_options {
1803 options.columns[index as usize] = new_options;
1804 options.write_metadata(&options.path, &salt)?;
1805 }
1806
1807 Ok(())
1808 }
1809
1810 fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
1811 if index as usize >= options.columns.len() {
1812 return Err(Error::IncompatibleColumnConfig {
1813 id: index,
1814 reason: "Column not found".to_string(),
1815 })
1816 }
1817
1818 Column::drop_files(index, options.path.clone())?;
1819 Ok(())
1820 }
1821
1822 #[cfg(feature = "instrumentation")]
1823 pub fn process_reindex(&self) -> Result<()> {
1824 self.inner.process_reindex()?;
1825 Ok(())
1826 }
1827
1828 #[cfg(feature = "instrumentation")]
1829 pub fn process_commits(&self) -> Result<()> {
1830 self.inner.process_commits(&self.inner)?;
1831 Ok(())
1832 }
1833
1834 #[cfg(feature = "instrumentation")]
1835 pub fn flush_logs(&self) -> Result<()> {
1836 self.inner.flush_logs(0)?;
1837 Ok(())
1838 }
1839
1840 #[cfg(feature = "instrumentation")]
1841 pub fn enact_logs(&self) -> Result<()> {
1842 while self.inner.enact_logs(false)? {}
1843 Ok(())
1844 }
1845
1846 #[cfg(feature = "instrumentation")]
1847 pub fn clean_logs(&self) -> Result<()> {
1848 self.inner.clean_logs()?;
1849 Ok(())
1850 }
1851}
1852
1853impl Drop for Db {
1854 fn drop(&mut self) {
1855 self.drop_inner()
1856 }
1857}
1858
1859impl Db {
1860 fn drop_inner(&mut self) {
1861 self.inner.shutdown();
1862 if let Some(t) = self.log_thread.take() {
1863 if let Err(e) = t.join() {
1864 log::warn!(target: "parity-db", "Log thread shutdown error: {:?}", e);
1865 }
1866 }
1867 if let Some(t) = self.flush_thread.take() {
1868 if let Err(e) = t.join() {
1869 log::warn!(target: "parity-db", "Flush thread shutdown error: {:?}", e);
1870 }
1871 }
1872 if let Some(t) = self.commit_thread.take() {
1873 if let Err(e) = t.join() {
1874 log::warn!(target: "parity-db", "Commit thread shutdown error: {:?}", e);
1875 }
1876 }
1877 if let Some(t) = self.cleanup_thread.take() {
1878 if let Err(e) = t.join() {
1879 log::warn!(target: "parity-db", "Cleanup thread shutdown error: {:?}", e);
1880 }
1881 }
1882 if let Err(e) = self.inner.kill_logs(&self.inner) {
1883 log::warn!(target: "parity-db", "Shutdown error: {:?}", e);
1884 }
1885 if let Err(e) = fs2::FileExt::unlock(&self.inner.lock_file) {
1886 log::debug!(target: "parity-db", "Error removing file lock: {:?}", e);
1887 }
1888 }
1889}
1890
1891pub trait TreeReader {
1894 fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>>;
1895 fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>>;
1896 fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>>;
1897}
1898
1899#[derive(Debug)]
1900pub struct DbTreeReader {
1901 db: Arc<DbInner>,
1902 col: ColId,
1903 key: Key,
1904}
1905
1906impl TreeReader for DbTreeReader {
1907 fn get_root(&self) -> Result<Option<(Vec<u8>, Children)>> {
1908 match &self.db.columns[self.col as usize] {
1915 Column::Hash(column) => {
1916 let overlay = self.db.commit_overlay.read();
1917 let value = if let Some(v) =
1919 overlay.get(self.col as usize).and_then(|o| o.get(&self.key))
1920 {
1921 Ok(v.map(|i| i.as_ref().to_vec()))
1922 } else {
1923 let log = self.db.log.overlays();
1925 Ok(column.get(&self.key, log)?.map(|(v, _rc)| v))
1926 }?;
1927
1928 if let Some(data) = value {
1929 return unpack_node_data(data).map(|x| Some(x))
1930 }
1931
1932 return Ok(None)
1933 },
1934 Column::Tree(..) =>
1935 return Err(Error::InvalidConfiguration("Not a HashColumn.".to_string())),
1936 };
1937 }
1938
1939 fn get_node(&self, node_address: NodeAddress) -> Result<Option<(Vec<u8>, Children)>> {
1940 self.db.get_node(self.col, node_address, false)
1941 }
1942
1943 fn get_node_children(&self, node_address: NodeAddress) -> Result<Option<Children>> {
1944 self.db.get_node_children(self.col, node_address, false)
1945 }
1946}
1947
1948pub type IndexedCommitOverlay = HashMap<Key, (u64, Option<RcValue>), IdentityBuildHasher>;
1949pub type AddressCommitOverlay = HashMap<u64, (u64, RcValue)>;
1950pub type BTreeCommitOverlay = BTreeMap<RcKey, (u64, Option<RcValue>)>;
1951
1952#[derive(Debug)]
1953pub struct CommitOverlay {
1954 indexed: IndexedCommitOverlay,
1955 address: AddressCommitOverlay,
1956 btree_indexed: BTreeCommitOverlay,
1957}
1958
1959impl CommitOverlay {
1960 fn new() -> Self {
1961 CommitOverlay {
1962 indexed: Default::default(),
1963 address: Default::default(),
1964 btree_indexed: Default::default(),
1965 }
1966 }
1967
1968 #[cfg(test)]
1969 fn is_empty(&self) -> bool {
1970 self.indexed.is_empty() && self.address.is_empty() && self.btree_indexed.is_empty()
1971 }
1972}
1973
1974impl CommitOverlay {
1975 fn get_ref(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1976 self.indexed.get(key).map(|(_, v)| v.as_ref())
1977 }
1978
1979 fn get(&self, key: &[u8]) -> Option<Option<RcValue>> {
1980 self.get_ref(key).map(|v| v.cloned())
1981 }
1982
1983 fn get_size(&self, key: &[u8]) -> Option<Option<u32>> {
1984 self.get_ref(key).map(|res| res.as_ref().map(|b| b.as_ref().len() as u32))
1985 }
1986
1987 fn get_address(&self, address: u64) -> Option<RcValue> {
1988 self.address.get(&address).map(|(_, v)| v.clone())
1989 }
1990
1991 fn btree_get(&self, key: &[u8]) -> Option<Option<&RcValue>> {
1992 self.btree_indexed.get(key).map(|(_, v)| v.as_ref())
1993 }
1994
1995 pub fn btree_next(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
1996 use crate::btree::LastKey;
1997 match &last_key {
1998 LastKey::Start => self
1999 .btree_indexed
2000 .range::<[u8], _>(..)
2001 .next()
2002 .map(|(k, (_, v))| (k.clone(), v.clone())),
2003 LastKey::End => None,
2004 LastKey::At(key) => self
2005 .btree_indexed
2006 .range::<[u8], _>((Bound::Excluded(key.as_slice()), Bound::Unbounded))
2007 .next()
2008 .map(|(k, (_, v))| (k.clone(), v.clone())),
2009 LastKey::Seeked(key) => self
2010 .btree_indexed
2011 .range::<[u8], _>((Bound::Included(key.as_slice()), Bound::Unbounded))
2012 .next()
2013 .map(|(k, (_, v))| (k.clone(), v.clone())),
2014 }
2015 }
2016
2017 pub fn btree_prev(&self, last_key: &crate::btree::LastKey) -> Option<(RcKey, Option<RcValue>)> {
2018 use crate::btree::LastKey;
2019 match &last_key {
2020 LastKey::End => self
2021 .btree_indexed
2022 .range::<[u8], _>(..)
2023 .rev()
2024 .next()
2025 .map(|(k, (_, v))| (k.clone(), v.clone())),
2026 LastKey::Start => None,
2027 LastKey::At(key) => self
2028 .btree_indexed
2029 .range::<[u8], _>((Bound::Unbounded, Bound::Excluded(key.as_slice())))
2030 .rev()
2031 .next()
2032 .map(|(k, (_, v))| (k.clone(), v.clone())),
2033 LastKey::Seeked(key) => self
2034 .btree_indexed
2035 .range::<[u8], _>((Bound::Unbounded, Bound::Included(key.as_slice())))
2036 .rev()
2037 .next()
2038 .map(|(k, (_, v))| (k.clone(), v.clone())),
2039 }
2040 }
2041}
2042
2043#[derive(Debug, PartialEq, Eq)]
2046pub enum Operation<Key, Value> {
2047 Set(Key, Value),
2049
2050 Dereference(Key),
2054
2055 Reference(Key),
2058
2059 InsertTree(Key, NewNode),
2061
2062 ReferenceTree(Key),
2064
2065 DereferenceTree(Key),
2068}
2069
2070impl<Key: Ord, Value: Eq> PartialOrd<Self> for Operation<Key, Value> {
2071 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2072 Some(self.cmp(other))
2073 }
2074}
2075
2076impl<Key: Ord, Value: Eq> Ord for Operation<Key, Value> {
2077 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2078 self.key().cmp(other.key())
2079 }
2080}
2081
2082impl<Key, Value> Operation<Key, Value> {
2083 pub fn key(&self) -> &Key {
2084 match self {
2085 Operation::Set(k, _) |
2086 Operation::Dereference(k) |
2087 Operation::Reference(k) |
2088 Operation::InsertTree(k, _) |
2089 Operation::ReferenceTree(k) |
2090 Operation::DereferenceTree(k) => k,
2091 }
2092 }
2093
2094 pub fn into_key(self) -> Key {
2095 match self {
2096 Operation::Set(k, _) |
2097 Operation::Dereference(k) |
2098 Operation::Reference(k) |
2099 Operation::InsertTree(k, _) |
2100 Operation::ReferenceTree(k) |
2101 Operation::DereferenceTree(k) => k,
2102 }
2103 }
2104}
2105
2106impl<K: AsRef<[u8]>, Value> Operation<K, Value> {
2107 pub fn to_key_vec(self) -> Operation<Vec<u8>, Value> {
2108 match self {
2109 Operation::Set(k, v) => Operation::Set(k.as_ref().to_vec(), v),
2110 Operation::Dereference(k) => Operation::Dereference(k.as_ref().to_vec()),
2111 Operation::Reference(k) => Operation::Reference(k.as_ref().to_vec()),
2112 Operation::InsertTree(k, n) => Operation::InsertTree(k.as_ref().to_vec(), n),
2113 Operation::ReferenceTree(k) => Operation::ReferenceTree(k.as_ref().to_vec()),
2114 Operation::DereferenceTree(k) => Operation::DereferenceTree(k.as_ref().to_vec()),
2115 }
2116 }
2117}
2118
2119#[derive(Debug, PartialEq, Eq)]
2120pub enum NodeChange {
2121 NewValue(u64, RcValue),
2123 IncrementReference(u64),
2125 DereferenceChildren(Vec<u8>, Key, Children),
2127}
2128
2129#[derive(Debug, Default)]
2130pub struct CommitChangeSet {
2131 pub indexed: HashMap<ColId, IndexedChangeSet>,
2132 pub btree_indexed: HashMap<ColId, BTreeChangeSet>,
2133 pub check_for_deferral: bool,
2134}
2135
2136#[derive(Debug)]
2137pub struct IndexedChangeSet {
2138 pub col: ColId,
2139 pub changes: Vec<Operation<Key, RcValue>>,
2140 pub node_changes: Vec<NodeChange>,
2141 pub used_trees: HashSet<Key>,
2142}
2143
2144impl IndexedChangeSet {
2145 pub fn new(col: ColId) -> Self {
2146 IndexedChangeSet {
2147 col,
2148 changes: Default::default(),
2149 node_changes: Default::default(),
2150 used_trees: Default::default(),
2151 }
2152 }
2153
2154 fn push<K: AsRef<[u8]>, V: Into<RcValue>>(
2155 &mut self,
2156 change: Operation<K, V>,
2157 options: &Options,
2158 db_version: u32,
2159 ) -> Result<()> {
2160 let salt = options.salt.unwrap_or_default();
2161 let hash_key = |key: &[u8]| -> Key {
2162 hash_key(key, &salt, options.columns[self.col as usize].uniform, db_version)
2163 };
2164
2165 self.push_change_hashed(match change {
2166 Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()),
2167 Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())),
2168 Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())),
2169 Operation::InsertTree(..) |
2170 Operation::ReferenceTree(..) |
2171 Operation::DereferenceTree(..) =>
2172 return Err(Error::InvalidInput(format!(
2173 "Invalid operation for column {}",
2174 self.col
2175 ))),
2176 });
2177
2178 Ok(())
2179 }
2180
2181 fn push_change_hashed(&mut self, change: Operation<Key, RcValue>) {
2182 self.changes.push(change);
2183 }
2184
2185 fn push_node_change(&mut self, change: NodeChange) {
2186 self.node_changes.push(change);
2187 }
2188
2189 fn copy_to_overlay(
2190 &self,
2191 overlay: &mut CommitOverlay,
2192 record_id: u64,
2193 bytes: &mut usize,
2194 options: &Options,
2195 ) -> Result<()> {
2196 let ref_counted = options.columns[self.col as usize].ref_counted;
2197 for change in self.changes.iter() {
2198 match &change {
2199 Operation::Set(k, v) => {
2200 *bytes += k.len();
2201 *bytes += v.as_ref().len();
2202 overlay.indexed.insert(*k, (record_id, Some(v.clone())));
2203 },
2204 Operation::Dereference(k) => {
2205 if !ref_counted {
2207 overlay.indexed.insert(*k, (record_id, None));
2208 }
2209 },
2210 Operation::Reference(..) => {
2211 if !ref_counted {
2214 return Err(Error::InvalidInput(format!("No Rc for column {}", self.col)))
2215 }
2216 },
2217 Operation::InsertTree(..) |
2218 Operation::ReferenceTree(..) |
2219 Operation::DereferenceTree(..) =>
2220 return Err(Error::InvalidInput(format!(
2221 "Invalid operation for column {}",
2222 self.col
2223 ))),
2224 }
2225 }
2226 for change in self.node_changes.iter() {
2227 if let NodeChange::NewValue(address, val) = change {
2228 *bytes += val.as_ref().len();
2229 overlay.address.insert(*address, (record_id, val.clone()));
2230 }
2231 }
2232 Ok(())
2233 }
2234
2235 fn write_plan(
2236 &self,
2237 db: &Arc<DbInner>,
2238 col: ColId,
2239 column: &Column,
2240 writer: &mut crate::log::LogWriter,
2241 ops: &mut u64,
2242 reindex: &mut bool,
2243 ) -> Result<()> {
2244 let column = match column {
2245 Column::Hash(column) => column,
2246 Column::Tree(_) => {
2247 log::warn!(target: "parity-db", "Skipping unindex commit in indexed column");
2248 return Ok(())
2249 },
2250 };
2251 for change in self.changes.iter() {
2252 if let PlanOutcome::NeedReindex = column.write_plan(change, writer)? {
2253 *reindex = true;
2255 }
2256 *ops += 1;
2257 }
2258 for change in self.node_changes.iter() {
2259 match change {
2260 NodeChange::NewValue(address, val) => {
2261 column.write_address_value_plan(
2262 *address,
2263 val.clone(),
2264 false,
2265 val.as_ref().len() as u32,
2266 writer,
2267 )?;
2268 },
2269 NodeChange::IncrementReference(address) => {
2270 if let PlanOutcome::NeedReindex =
2271 column.write_address_inc_ref_plan(*address, writer)?
2272 {
2273 *reindex = true;
2274 }
2275 },
2276 NodeChange::DereferenceChildren(key, hash, children) => {
2277 if let Some((_root, rc)) = column.get(hash, writer)? {
2278 column.write_plan(&Operation::Dereference(*hash), writer)?;
2279 log::debug!(target: "parity-db", "Dereferencing root, rc={}", rc);
2280 if rc == 1 {
2281 let tree = db.get_tree(db, col, key, false).unwrap();
2282 if let Some(tree) = tree {
2283 let guard = tree.write();
2284 let mut num_removed = 0;
2285 self.write_dereference_children_plan(
2286 column,
2287 &guard,
2288 children,
2289 &mut num_removed,
2290 writer,
2291 )?;
2292 log::debug!(target: "parity-db", "Dereferenced tree {:?}, removed {}", &key[0..3], num_removed);
2293 }
2294 }
2295 }
2296 },
2298 }
2299 }
2300 Ok(())
2301 }
2302
2303 fn write_dereference_children_plan(
2304 &self,
2305 column: &HashColumn,
2306 guard: &RwLockWriteGuard<'_, Box<dyn TreeReader + Send + Sync>>,
2307 children: &Vec<u64>,
2308 num_removed: &mut u64,
2309 writer: &mut crate::log::LogWriter,
2310 ) -> Result<()> {
2311 for address in children {
2312 let node = guard.get_node_children(*address)?;
2316 let (remains, _outcome) = column.write_address_dec_ref_plan(*address, writer)?;
2317 if !remains {
2318 *num_removed += 1;
2320 if let Some(children) = node {
2321 self.write_dereference_children_plan(
2322 column,
2323 guard,
2324 &children,
2325 num_removed,
2326 writer,
2327 )?;
2328 } else {
2329 return Err(Error::InvalidConfiguration("Missing node data".to_string()))
2330 }
2331 }
2332 }
2333 Ok(())
2334 }
2335
2336 fn clean_overlay(&self, overlay: &mut CommitOverlay, record_id: u64) {
2337 use std::collections::hash_map::Entry;
2338 for change in self.changes.iter() {
2339 match change {
2340 Operation::Set(k, _) | Operation::Dereference(k) => {
2341 if let Entry::Occupied(e) = overlay.indexed.entry(*k) {
2342 if e.get().0 == record_id {
2343 e.remove_entry();
2344 }
2345 }
2346 },
2347 Operation::Reference(..) |
2348 Operation::InsertTree(..) |
2349 Operation::ReferenceTree(..) |
2350 Operation::DereferenceTree(..) => (),
2351 }
2352 }
2353 for change in self.node_changes.iter() {
2354 if let NodeChange::NewValue(address, _val) = change {
2355 if let Entry::Occupied(e) = overlay.address.entry(*address) {
2356 if e.get().0 == record_id {
2357 e.remove_entry();
2358 }
2359 }
2360 }
2361 }
2362 }
2363}
2364
2365pub mod check {
2367 pub enum CheckDisplay {
2369 None,
2371 Full,
2373 Short(u64),
2375 }
2376
2377 pub struct CheckOptions {
2379 pub column: Option<u8>,
2381 pub from: Option<u64>,
2383 pub bound: Option<u64>,
2385 pub display: CheckDisplay,
2387 pub fast: bool,
2389 pub validate_free_refs: bool,
2391 }
2392
2393 impl CheckOptions {
2394 pub fn new(
2396 column: Option<u8>,
2397 from: Option<u64>,
2398 bound: Option<u64>,
2399 display_content: bool,
2400 truncate_value_display: Option<u64>,
2401 fast: bool,
2402 validate_free_refs: bool,
2403 ) -> Self {
2404 let display = if display_content {
2405 match truncate_value_display {
2406 Some(t) => CheckDisplay::Short(t),
2407 None => CheckDisplay::Full,
2408 }
2409 } else {
2410 CheckDisplay::None
2411 };
2412 CheckOptions { column, from, bound, display, fast, validate_free_refs }
2413 }
2414 }
2415}
2416
2417#[derive(Eq, PartialEq, Clone, Copy)]
2418enum OpeningMode {
2419 Create,
2420 Write,
2421 ReadOnly,
2422}
2423
2424#[cfg(test)]
2425mod tests {
2426 use super::{Db, Options};
2427 use crate::{
2428 column::ColId,
2429 db::{DbInner, OpeningMode},
2430 ColumnOptions, Value,
2431 };
2432 use rand::Rng;
2433 use std::{
2434 collections::{BTreeMap, HashMap, HashSet},
2435 path::Path,
2436 };
2437 use tempfile::tempdir;
2438
2439 #[derive(Eq, PartialEq, Debug, Clone, Copy)]
2441 enum EnableCommitPipelineStages {
2442 #[allow(dead_code)]
2444 CommitOverlay,
2445 #[allow(dead_code)]
2447 LogOverlay,
2448 #[allow(dead_code)]
2450 DbFile,
2451 Standard,
2453 }
2454
2455 impl EnableCommitPipelineStages {
2456 fn options(&self, path: &Path, num_columns: u8) -> Options {
2457 Options {
2458 path: path.into(),
2459 sync_wal: true,
2460 sync_data: true,
2461 stats: true,
2462 salt: None,
2463 columns: (0..num_columns).map(|_| Default::default()).collect(),
2464 compression_threshold: HashMap::new(),
2465 with_background_thread: *self == Self::Standard,
2466 always_flush: *self == Self::DbFile,
2467 }
2468 }
2469
2470 fn run_stages(&self, db: &Db) {
2471 let db = &db.inner;
2472 if *self == EnableCommitPipelineStages::DbFile ||
2473 *self == EnableCommitPipelineStages::LogOverlay
2474 {
2475 while db.process_commits(db).unwrap() {}
2476 while db.process_reindex().unwrap() {}
2477 }
2478 if *self == EnableCommitPipelineStages::DbFile {
2479 let _ = db.log.flush_one(0).unwrap();
2480 while db.enact_logs(false).unwrap() {}
2481 let _ = db.clean_logs().unwrap();
2482 }
2483 }
2484
2485 fn check_empty_overlay(&self, db: &DbInner, col: ColId) -> bool {
2486 match self {
2487 EnableCommitPipelineStages::DbFile | EnableCommitPipelineStages::LogOverlay => {
2488 if let Some(overlay) = db.commit_overlay.read().get(col as usize) {
2489 if !overlay.is_empty() {
2490 let mut replayed = 5;
2491 while !overlay.is_empty() {
2492 if replayed > 0 {
2493 replayed -= 1;
2494 std::thread::sleep(std::time::Duration::from_millis(100));
2499 } else {
2500 return false
2501 }
2502 }
2503 }
2504 }
2505 },
2506 _ => (),
2507 }
2508 true
2509 }
2510 }
2511
2512 #[test]
2513 fn test_db_open_should_fail() {
2514 let tmp = tempdir().unwrap();
2515 let options = Options::with_columns(tmp.path(), 5);
2516 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2517 }
2518
2519 #[test]
2520 fn test_db_open_fail_then_recursively_create() {
2521 let tmp = tempdir().unwrap();
2522 let (db_path_first, db_path_last) = {
2523 let mut db_path_first = tmp.path().to_owned();
2524 db_path_first.push("nope");
2525
2526 let mut db_path_last = db_path_first.to_owned();
2527
2528 for p in ["does", "not", "yet", "exist"] {
2529 db_path_last.push(p);
2530 }
2531
2532 (db_path_first, db_path_last)
2533 };
2534
2535 assert!(
2536 !db_path_first.exists(),
2537 "That directory should not have existed at this point (dir: {db_path_first:?})"
2538 );
2539
2540 let options = Options::with_columns(&db_path_last, 5);
2541 assert!(matches!(Db::open(&options), Err(crate::Error::DatabaseNotFound)));
2542
2543 assert!(!db_path_first.exists(), "That directory should remain non-existent. Did the `open(create: false)` nonetheless create a directory? (dir: {db_path_first:?})");
2544 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2545
2546 assert!(
2547 db_path_first.is_dir(),
2548 "A directory should have been been created (dir: {db_path_first:?})"
2549 );
2550 assert!(
2551 db_path_last.is_dir(),
2552 "A directory should have been been created (dir: {db_path_last:?})"
2553 );
2554 }
2555
2556 #[test]
2557 fn test_db_open_or_create() {
2558 let tmp = tempdir().unwrap();
2559 let options = Options::with_columns(tmp.path(), 5);
2560 assert!(Db::open_or_create(&options).is_ok(), "New database should be created");
2561 assert!(Db::open(&options).is_ok(), "Existing database should be reopened");
2562 }
2563
2564 #[test]
2565 fn test_indexed_keyvalues() {
2566 test_indexed_keyvalues_inner(EnableCommitPipelineStages::CommitOverlay);
2567 test_indexed_keyvalues_inner(EnableCommitPipelineStages::LogOverlay);
2568 test_indexed_keyvalues_inner(EnableCommitPipelineStages::DbFile);
2569 test_indexed_keyvalues_inner(EnableCommitPipelineStages::Standard);
2570 }
2571 fn test_indexed_keyvalues_inner(db_test: EnableCommitPipelineStages) {
2572 let tmp = tempdir().unwrap();
2573 let options = db_test.options(tmp.path(), 5);
2574 let col_nb = 0;
2575
2576 let key1 = b"key1".to_vec();
2577 let key2 = b"key2".to_vec();
2578 let key3 = b"key3".to_vec();
2579
2580 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2581 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2582
2583 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2584 db_test.run_stages(&db);
2585 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2586
2587 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2588
2589 db.commit(vec![
2590 (col_nb, key1.clone(), None),
2591 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2592 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2593 ])
2594 .unwrap();
2595 db_test.run_stages(&db);
2596 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2597
2598 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2599 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2600 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2601
2602 db.commit(vec![
2603 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2604 (col_nb, key3.clone(), None),
2605 ])
2606 .unwrap();
2607 db_test.run_stages(&db);
2608 assert!(db_test.check_empty_overlay(&db.inner, col_nb));
2609
2610 assert!(db.get(col_nb, key1.as_slice()).unwrap().is_none());
2611 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2612 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2613 }
2614
2615 #[test]
2616 fn test_indexed_overlay_against_backend() {
2617 let tmp = tempdir().unwrap();
2618 let db_test = EnableCommitPipelineStages::DbFile;
2619 let options = db_test.options(tmp.path(), 5);
2620 let col_nb = 0;
2621
2622 let key1 = b"key1".to_vec();
2623 let key2 = b"key2".to_vec();
2624 let key3 = b"key3".to_vec();
2625
2626 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2627
2628 db.commit(vec![
2629 (col_nb, key1.clone(), Some(b"value1".to_vec())),
2630 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2631 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2632 ])
2633 .unwrap();
2634 db_test.run_stages(&db);
2635 drop(db);
2636
2637 std::thread::sleep(std::time::Duration::from_millis(100));
2639
2640 let db_test = EnableCommitPipelineStages::CommitOverlay;
2641 let options = db_test.options(tmp.path(), 5);
2642 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2643 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2644 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2645 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2646 db.commit(vec![
2647 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2648 (col_nb, key3.clone(), None),
2649 ])
2650 .unwrap();
2651 db_test.run_stages(&db);
2652
2653 assert_eq!(db.get(col_nb, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2654 assert_eq!(db.get(col_nb, key2.as_slice()).unwrap(), Some(b"value2b".to_vec()));
2655 assert_eq!(db.get(col_nb, key3.as_slice()).unwrap(), None);
2656 }
2657
2658 #[test]
2659 fn test_add_column() {
2660 let tmp = tempdir().unwrap();
2661 let db_test = EnableCommitPipelineStages::DbFile;
2662 let mut options = db_test.options(tmp.path(), 1);
2663 options.salt = Some(options.salt.unwrap_or_default());
2664
2665 let old_col_id = 0;
2666 let new_col_id = 1;
2667 let new_col_indexed_id = 2;
2668
2669 let key1 = b"key1".to_vec();
2670 let key2 = b"key2".to_vec();
2671 let key3 = b"key3".to_vec();
2672
2673 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2674
2675 db.commit(vec![
2676 (old_col_id, key1.clone(), Some(b"value1".to_vec())),
2677 (old_col_id, key2.clone(), Some(b"value2".to_vec())),
2678 (old_col_id, key3.clone(), Some(b"value3".to_vec())),
2679 ])
2680 .unwrap();
2681 db_test.run_stages(&db);
2682
2683 drop(db);
2684
2685 Db::add_column(&mut options, ColumnOptions { btree_index: false, ..Default::default() })
2686 .unwrap();
2687
2688 Db::add_column(&mut options, ColumnOptions { btree_index: true, ..Default::default() })
2689 .unwrap();
2690
2691 let mut options = db_test.options(tmp.path(), 3);
2692 options.columns[new_col_indexed_id as usize].btree_index = true;
2693
2694 let db_test = EnableCommitPipelineStages::DbFile;
2695 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2696
2697 assert_eq!(db.num_columns(), 3);
2699
2700 let new_key1 = b"abcdef".to_vec();
2701 let new_key2 = b"123456".to_vec();
2702
2703 db.commit(vec![
2705 (new_col_id, new_key1.clone(), Some(new_key1.to_vec())),
2706 (new_col_id, new_key2.clone(), Some(new_key2.to_vec())),
2707 (new_col_indexed_id, new_key1.clone(), Some(new_key1.to_vec())),
2708 (new_col_indexed_id, new_key2.clone(), Some(new_key2.to_vec())),
2709 ])
2710 .unwrap();
2711 db_test.run_stages(&db);
2712
2713 drop(db);
2714
2715 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2717
2718 assert_eq!(db.get(old_col_id, key1.as_slice()).unwrap(), Some(b"value1".to_vec()));
2719 assert_eq!(db.get(old_col_id, key2.as_slice()).unwrap(), Some(b"value2".to_vec()));
2720 assert_eq!(db.get(old_col_id, key3.as_slice()).unwrap(), Some(b"value3".to_vec()));
2721
2722 assert_eq!(db.get(new_col_id, new_key1.as_slice()).unwrap(), Some(new_key1.to_vec()));
2724 assert_eq!(db.get(new_col_id, new_key2.as_slice()).unwrap(), Some(new_key2.to_vec()));
2725 assert_eq!(
2726 db.get(new_col_indexed_id, new_key1.as_slice()).unwrap(),
2727 Some(new_key1.to_vec())
2728 );
2729 assert_eq!(
2730 db.get(new_col_indexed_id, new_key2.as_slice()).unwrap(),
2731 Some(new_key2.to_vec())
2732 );
2733 }
2734
2735 #[test]
2736 fn test_indexed_btree_1() {
2737 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, false);
2738 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, false);
2739 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, false);
2740 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, false);
2741 test_indexed_btree_inner(EnableCommitPipelineStages::CommitOverlay, true);
2742 test_indexed_btree_inner(EnableCommitPipelineStages::LogOverlay, true);
2743 test_indexed_btree_inner(EnableCommitPipelineStages::DbFile, true);
2744 test_indexed_btree_inner(EnableCommitPipelineStages::Standard, true);
2745 }
2746 fn test_indexed_btree_inner(db_test: EnableCommitPipelineStages, long_key: bool) {
2747 let tmp = tempdir().unwrap();
2748 let col_nb = 0u8;
2749 let mut options = db_test.options(tmp.path(), 5);
2750 options.columns[col_nb as usize].btree_index = true;
2751
2752 let (key1, key2, key3, key4) = if !long_key {
2753 (b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec(), b"key4".to_vec())
2754 } else {
2755 let key2 = vec![2; 272];
2756 let mut key3 = key2.clone();
2757 key3[271] = 3;
2758 (vec![1; 953], key2, key3, vec![4; 79])
2759 };
2760
2761 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2762 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2763
2764 let mut iter = db.iter(col_nb).unwrap();
2765 assert_eq!(iter.next().unwrap(), None);
2766 assert_eq!(iter.prev().unwrap(), None);
2767
2768 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2769 db_test.run_stages(&db);
2770
2771 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2772 iter.seek_to_first().unwrap();
2773 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2774 assert_eq!(iter.next().unwrap(), None);
2775 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2776 assert_eq!(iter.prev().unwrap(), None);
2777 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2778 assert_eq!(iter.next().unwrap(), None);
2779
2780 iter.seek_to_first().unwrap();
2781 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2782 assert_eq!(iter.prev().unwrap(), None);
2783
2784 iter.seek(&[0xff]).unwrap();
2785 assert_eq!(iter.prev().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2786 assert_eq!(iter.prev().unwrap(), None);
2787
2788 db.commit(vec![
2789 (col_nb, key1.clone(), None),
2790 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2791 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2792 ])
2793 .unwrap();
2794 db_test.run_stages(&db);
2795
2796 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2797 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2798 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2799
2800 iter.seek(key2.as_slice()).unwrap();
2801 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2802 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2803 assert_eq!(iter.next().unwrap(), None);
2804
2805 iter.seek(key3.as_slice()).unwrap();
2806 assert_eq!(iter.prev().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2807 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2808 assert_eq!(iter.prev().unwrap(), None);
2809
2810 db.commit(vec![
2811 (col_nb, key2.clone(), Some(b"value2b".to_vec())),
2812 (col_nb, key4.clone(), Some(b"value4".to_vec())),
2813 (col_nb, key3.clone(), None),
2814 ])
2815 .unwrap();
2816 db_test.run_stages(&db);
2817
2818 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2819 assert_eq!(db.get(col_nb, &key3).unwrap(), None);
2820 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2b".to_vec()));
2821 assert_eq!(db.get(col_nb, &key4).unwrap(), Some(b"value4".to_vec()));
2822 let mut key22 = key2.clone();
2823 key22.push(2);
2824 iter.seek(key22.as_slice()).unwrap();
2825 assert_eq!(iter.next().unwrap(), Some((key4, b"value4".to_vec())));
2826 assert_eq!(iter.next().unwrap(), None);
2827 }
2828
2829 #[test]
2830 fn test_indexed_btree_2() {
2831 test_indexed_btree_inner_2(EnableCommitPipelineStages::CommitOverlay);
2832 test_indexed_btree_inner_2(EnableCommitPipelineStages::LogOverlay);
2833 }
2834 fn test_indexed_btree_inner_2(db_test: EnableCommitPipelineStages) {
2835 let tmp = tempdir().unwrap();
2836 let col_nb = 0u8;
2837 let mut options = db_test.options(tmp.path(), 5);
2838 options.columns[col_nb as usize].btree_index = true;
2839
2840 let key1 = b"key1".to_vec();
2841 let key2 = b"key2".to_vec();
2842 let key3 = b"key3".to_vec();
2843
2844 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2845 let mut iter = db.iter(col_nb).unwrap();
2846 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2847 assert_eq!(iter.next().unwrap(), None);
2848
2849 db.commit(vec![(col_nb, key1.clone(), Some(b"value1".to_vec()))]).unwrap();
2850 EnableCommitPipelineStages::DbFile.run_stages(&db);
2851 drop(db);
2852
2853 std::thread::sleep(std::time::Duration::from_millis(100));
2855
2856 let db = Db::open_inner(&options, OpeningMode::Write).unwrap();
2857
2858 let mut iter = db.iter(col_nb).unwrap();
2859 assert_eq!(db.get(col_nb, &key1).unwrap(), Some(b"value1".to_vec()));
2860 iter.seek_to_first().unwrap();
2861 assert_eq!(iter.next().unwrap(), Some((key1.clone(), b"value1".to_vec())));
2862 assert_eq!(iter.next().unwrap(), None);
2863
2864 db.commit(vec![
2865 (col_nb, key1.clone(), None),
2866 (col_nb, key2.clone(), Some(b"value2".to_vec())),
2867 (col_nb, key3.clone(), Some(b"value3".to_vec())),
2868 ])
2869 .unwrap();
2870 db_test.run_stages(&db);
2871
2872 assert_eq!(db.get(col_nb, &key1).unwrap(), None);
2873 assert_eq!(db.get(col_nb, &key2).unwrap(), Some(b"value2".to_vec()));
2874 assert_eq!(db.get(col_nb, &key3).unwrap(), Some(b"value3".to_vec()));
2875 iter.seek(key2.as_slice()).unwrap();
2876 assert_eq!(iter.next().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2877 assert_eq!(iter.next().unwrap(), Some((key3.clone(), b"value3".to_vec())));
2878 assert_eq!(iter.next().unwrap(), None);
2879
2880 iter.seek_to_last().unwrap();
2881 assert_eq!(iter.prev().unwrap(), Some((key3, b"value3".to_vec())));
2882 assert_eq!(iter.prev().unwrap(), Some((key2.clone(), b"value2".to_vec())));
2883 assert_eq!(iter.prev().unwrap(), None);
2884 }
2885
2886 #[test]
2887 fn test_indexed_btree_3() {
2888 test_indexed_btree_inner_3(EnableCommitPipelineStages::CommitOverlay);
2889 test_indexed_btree_inner_3(EnableCommitPipelineStages::LogOverlay);
2890 test_indexed_btree_inner_3(EnableCommitPipelineStages::DbFile);
2891 test_indexed_btree_inner_3(EnableCommitPipelineStages::Standard);
2892 }
2893
2894 fn test_indexed_btree_inner_3(db_test: EnableCommitPipelineStages) {
2895 use rand::SeedableRng;
2896
2897 use std::collections::BTreeSet;
2898
2899 let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
2900
2901 let tmp = tempdir().unwrap();
2902 let col_nb = 0u8;
2903 let mut options = db_test.options(tmp.path(), 5);
2904 options.columns[col_nb as usize].btree_index = true;
2905
2906 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
2907
2908 db.commit(
2909 (0u64..1024)
2910 .map(|i| (0, i.to_be_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
2911 .chain((0u64..1024).step_by(2).map(|i| (0, i.to_be_bytes().to_vec(), None))),
2912 )
2913 .unwrap();
2914 let expected = (0u64..1024).filter(|i| i % 2 == 1).collect::<BTreeSet<_>>();
2915 let mut iter = db.iter(0).unwrap();
2916
2917 for _ in 0..100 {
2918 let at = rng.random_range(0u64..=1024);
2919 iter.seek(&at.to_be_bytes()).unwrap();
2920
2921 let mut prev_run: bool = rng.random();
2922 let at = if prev_run {
2923 let take = rng.random_range(1..100);
2924 let got = std::iter::from_fn(|| iter.next().unwrap())
2925 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2926 .take(take)
2927 .collect::<Vec<_>>();
2928 let expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2929 assert_eq!(got, expected);
2930 if got.is_empty() {
2931 prev_run = false;
2932 }
2933 if got.len() < take {
2934 prev_run = false;
2935 }
2936 expected.last().copied().unwrap_or(at)
2937 } else {
2938 at
2939 };
2940
2941 let at = {
2942 let take = rng.random_range(1..100);
2943 let got = std::iter::from_fn(|| iter.prev().unwrap())
2944 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2945 .take(take)
2946 .collect::<Vec<_>>();
2947 let expected = if prev_run {
2948 expected.range(..at).rev().take(take).copied().collect::<Vec<_>>()
2949 } else {
2950 expected.range(..=at).rev().take(take).copied().collect::<Vec<_>>()
2951 };
2952 assert_eq!(got, expected);
2953 prev_run = !got.is_empty();
2954 if take > got.len() {
2955 prev_run = false;
2956 }
2957 expected.last().copied().unwrap_or(at)
2958 };
2959
2960 let take = rng.random_range(1..100);
2961 let mut got = std::iter::from_fn(|| iter.next().unwrap())
2962 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2963 .take(take)
2964 .collect::<Vec<_>>();
2965 let mut expected = expected.range(at..).take(take).copied().collect::<Vec<_>>();
2966 if prev_run {
2967 expected = expected.split_off(1);
2968 if got.len() == take {
2969 got.pop();
2970 }
2971 }
2972 assert_eq!(got, expected);
2973 }
2974
2975 let take = rng.random_range(20..100);
2976 iter.seek_to_last().unwrap();
2977 let got = std::iter::from_fn(|| iter.prev().unwrap())
2978 .map(|(k, _)| u64::from_be_bytes(k.try_into().unwrap()))
2979 .take(take)
2980 .collect::<Vec<_>>();
2981 let expected = expected.iter().rev().take(take).copied().collect::<Vec<_>>();
2982 assert_eq!(got, expected);
2983 }
2984
2985 fn test_basic(change_set: &[(Vec<u8>, Option<Vec<u8>>)]) {
2986 test_basic_inner(change_set, false, false);
2987 test_basic_inner(change_set, false, true);
2988 test_basic_inner(change_set, true, false);
2989 test_basic_inner(change_set, true, true);
2990 }
2991
2992 fn test_basic_inner(
2993 change_set: &[(Vec<u8>, Option<Vec<u8>>)],
2994 btree_index: bool,
2995 ref_counted: bool,
2996 ) {
2997 let tmp = tempdir().unwrap();
2998 let col_nb = 1u8;
2999 let db_test = EnableCommitPipelineStages::DbFile;
3000 let mut options = db_test.options(tmp.path(), 2);
3001 options.columns[col_nb as usize].btree_index = btree_index;
3002 options.columns[col_nb as usize].ref_counted = ref_counted;
3003 options.columns[col_nb as usize].preimage = ref_counted;
3004 assert!(!(ref_counted && db_test == EnableCommitPipelineStages::CommitOverlay));
3006 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3007
3008 let iter = btree_index.then(|| db.iter(col_nb).unwrap());
3009 assert_eq!(iter.and_then(|mut i| i.next().unwrap()), None);
3010
3011 db.commit(change_set.iter().map(|(k, v)| (col_nb, k.clone(), v.clone())))
3012 .unwrap();
3013 db_test.run_stages(&db);
3014
3015 let mut keys = HashSet::new();
3016 let mut expected_count: u64 = 0;
3017 for (k, v) in change_set.iter() {
3018 if v.is_some() {
3019 if keys.insert(k) {
3020 expected_count += 1;
3021 }
3022 } else if keys.remove(k) {
3023 expected_count -= 1;
3024 }
3025 }
3026 if ref_counted {
3027 let mut state: BTreeMap<Vec<u8>, Option<(Vec<u8>, usize)>> = Default::default();
3028 for (k, v) in change_set.iter() {
3029 let mut remove = false;
3030 let mut insert = false;
3031 match state.get_mut(k) {
3032 Some(Some((_, counter))) =>
3033 if v.is_some() {
3034 *counter += 1;
3035 } else if *counter == 1 {
3036 remove = true;
3037 } else {
3038 *counter -= 1;
3039 },
3040 Some(None) | None =>
3041 if v.is_some() {
3042 insert = true;
3043 },
3044 }
3045 if insert {
3046 state.insert(k.clone(), v.clone().map(|v| (v, 1)));
3047 }
3048 if remove {
3049 state.remove(k);
3050 }
3051 }
3052 for (key, value) in state {
3053 assert_eq!(db.get(col_nb, &key).unwrap(), value.map(|v| v.0));
3054 }
3055 } else {
3056 let stats = db.stats();
3057 if let Some(stats) = stats.columns[col_nb as usize].as_ref() {
3059 assert_eq!(stats.total_values, expected_count);
3060 }
3061
3062 let state: BTreeMap<Vec<u8>, Option<Vec<u8>>> =
3063 change_set.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
3064 for (key, value) in state.iter() {
3065 assert_eq!(&db.get(col_nb, key).unwrap(), value);
3066 }
3067 }
3068 }
3069
3070 #[test]
3071 fn test_random() {
3072 fdlimit::raise_fd_limit().unwrap();
3073 for i in 0..100 {
3074 test_random_inner(60, 60, i);
3075 }
3076 for i in 0..50 {
3077 test_random_inner(20, 60, i);
3078 }
3079 }
3080 fn test_random_inner(size: usize, key_size: usize, seed: u64) {
3081 use rand::{RngCore, SeedableRng};
3082 let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
3083 let mut data = Vec::<(Vec<u8>, Option<Vec<u8>>)>::new();
3084 for i in 0..size {
3085 let nb_delete: u32 = rng.next_u32(); let nb_delete = (nb_delete as usize % size) / 2;
3087 let mut key = vec![0u8; key_size];
3088 rng.fill_bytes(&mut key[..]);
3089 let value = if i > size - nb_delete {
3090 let random_key = rng.next_u32();
3091 let random_key = (random_key % 4) > 0;
3092 if !random_key {
3093 key = data[i - size / 2].0.clone();
3094 }
3095 None
3096 } else {
3097 Some(key.clone())
3098 };
3099 let var_keysize = rng.next_u32();
3100 let var_keysize = var_keysize as usize % (key_size / 2);
3101 key.truncate(key_size - var_keysize);
3102 data.push((key, value));
3103 }
3104 test_basic(&data[..]);
3105 }
3106
3107 #[test]
3108 fn test_simple() {
3109 test_basic(&[
3110 (b"key1".to_vec(), Some(b"value1".to_vec())),
3111 (b"key1".to_vec(), Some(b"value1".to_vec())),
3112 (b"key1".to_vec(), None),
3113 ]);
3114 test_basic(&[
3115 (b"key1".to_vec(), Some(b"value1".to_vec())),
3116 (b"key1".to_vec(), Some(b"value1".to_vec())),
3117 (b"key1".to_vec(), None),
3118 (b"key1".to_vec(), None),
3119 ]);
3120 test_basic(&[
3121 (b"key1".to_vec(), Some(b"value1".to_vec())),
3122 (b"key1".to_vec(), Some(b"value2".to_vec())),
3123 ]);
3124 test_basic(&[(b"key1".to_vec(), Some(b"value1".to_vec()))]);
3125 test_basic(&[
3126 (b"key1".to_vec(), Some(b"value1".to_vec())),
3127 (b"key2".to_vec(), Some(b"value2".to_vec())),
3128 ]);
3129 test_basic(&[
3130 (b"key1".to_vec(), Some(b"value1".to_vec())),
3131 (b"key2".to_vec(), Some(b"value2".to_vec())),
3132 (b"key3".to_vec(), Some(b"value3".to_vec())),
3133 ]);
3134 test_basic(&[
3135 (b"key1".to_vec(), Some(b"value1".to_vec())),
3136 (b"key3".to_vec(), Some(b"value3".to_vec())),
3137 (b"key2".to_vec(), Some(b"value2".to_vec())),
3138 ]);
3139 test_basic(&[
3140 (b"key3".to_vec(), Some(b"value3".to_vec())),
3141 (b"key2".to_vec(), Some(b"value2".to_vec())),
3142 (b"key1".to_vec(), Some(b"value1".to_vec())),
3143 ]);
3144 test_basic(&[
3145 (b"key1".to_vec(), Some(b"value1".to_vec())),
3146 (b"key2".to_vec(), Some(b"value2".to_vec())),
3147 (b"key3".to_vec(), Some(b"value3".to_vec())),
3148 (b"key4".to_vec(), Some(b"value4".to_vec())),
3149 ]);
3150 test_basic(&[
3151 (b"key1".to_vec(), Some(b"value1".to_vec())),
3152 (b"key2".to_vec(), Some(b"value2".to_vec())),
3153 (b"key3".to_vec(), Some(b"value3".to_vec())),
3154 (b"key4".to_vec(), Some(b"value4".to_vec())),
3155 (b"key5".to_vec(), Some(b"value5".to_vec())),
3156 ]);
3157 test_basic(&[
3158 (b"key5".to_vec(), Some(b"value5".to_vec())),
3159 (b"key3".to_vec(), Some(b"value3".to_vec())),
3160 (b"key4".to_vec(), Some(b"value4".to_vec())),
3161 (b"key2".to_vec(), Some(b"value2".to_vec())),
3162 (b"key1".to_vec(), Some(b"value1".to_vec())),
3163 ]);
3164 test_basic(&[
3165 (b"key5".to_vec(), Some(b"value5".to_vec())),
3166 (b"key3".to_vec(), Some(b"value3".to_vec())),
3167 (b"key4".to_vec(), Some(b"value4".to_vec())),
3168 (b"key2".to_vec(), Some(b"value2".to_vec())),
3169 (b"key1".to_vec(), Some(b"value1".to_vec())),
3170 (b"key11".to_vec(), Some(b"value31".to_vec())),
3171 (b"key12".to_vec(), Some(b"value32".to_vec())),
3172 ]);
3173 test_basic(&[
3174 (b"key5".to_vec(), Some(b"value5".to_vec())),
3175 (b"key3".to_vec(), Some(b"value3".to_vec())),
3176 (b"key4".to_vec(), Some(b"value4".to_vec())),
3177 (b"key2".to_vec(), Some(b"value2".to_vec())),
3178 (b"key1".to_vec(), Some(b"value1".to_vec())),
3179 (b"key51".to_vec(), Some(b"value31".to_vec())),
3180 (b"key52".to_vec(), Some(b"value32".to_vec())),
3181 ]);
3182 test_basic(&[
3183 (b"key5".to_vec(), Some(b"value5".to_vec())),
3184 (b"key3".to_vec(), Some(b"value3".to_vec())),
3185 (b"key4".to_vec(), Some(b"value4".to_vec())),
3186 (b"key2".to_vec(), Some(b"value2".to_vec())),
3187 (b"key1".to_vec(), Some(b"value1".to_vec())),
3188 (b"key31".to_vec(), Some(b"value31".to_vec())),
3189 (b"key32".to_vec(), Some(b"value32".to_vec())),
3190 ]);
3191 test_basic(&[
3192 (b"key1".to_vec(), Some(b"value5".to_vec())),
3193 (b"key2".to_vec(), Some(b"value3".to_vec())),
3194 (b"key3".to_vec(), Some(b"value4".to_vec())),
3195 (b"key4".to_vec(), Some(b"value7".to_vec())),
3196 (b"key5".to_vec(), Some(b"value2".to_vec())),
3197 (b"key6".to_vec(), Some(b"value1".to_vec())),
3198 (b"key3".to_vec(), None),
3199 ]);
3200 test_basic(&[
3201 (b"key1".to_vec(), Some(b"value5".to_vec())),
3202 (b"key2".to_vec(), Some(b"value3".to_vec())),
3203 (b"key3".to_vec(), Some(b"value4".to_vec())),
3204 (b"key4".to_vec(), Some(b"value7".to_vec())),
3205 (b"key5".to_vec(), Some(b"value2".to_vec())),
3206 (b"key0".to_vec(), Some(b"value1".to_vec())),
3207 (b"key3".to_vec(), None),
3208 ]);
3209 test_basic(&[
3210 (b"key1".to_vec(), Some(b"value5".to_vec())),
3211 (b"key2".to_vec(), Some(b"value3".to_vec())),
3212 (b"key3".to_vec(), Some(b"value4".to_vec())),
3213 (b"key4".to_vec(), Some(b"value7".to_vec())),
3214 (b"key5".to_vec(), Some(b"value2".to_vec())),
3215 (b"key3".to_vec(), None),
3216 ]);
3217 test_basic(&[
3218 (b"key1".to_vec(), Some(b"value5".to_vec())),
3219 (b"key4".to_vec(), Some(b"value3".to_vec())),
3220 (b"key5".to_vec(), Some(b"value4".to_vec())),
3221 (b"key6".to_vec(), Some(b"value4".to_vec())),
3222 (b"key7".to_vec(), Some(b"value2".to_vec())),
3223 (b"key8".to_vec(), Some(b"value1".to_vec())),
3224 (b"key5".to_vec(), None),
3225 ]);
3226 test_basic(&[
3227 (b"key1".to_vec(), Some(b"value5".to_vec())),
3228 (b"key4".to_vec(), Some(b"value3".to_vec())),
3229 (b"key5".to_vec(), Some(b"value4".to_vec())),
3230 (b"key7".to_vec(), Some(b"value2".to_vec())),
3231 (b"key8".to_vec(), Some(b"value1".to_vec())),
3232 (b"key3".to_vec(), None),
3233 ]);
3234 test_basic(&[
3235 (b"key5".to_vec(), Some(b"value5".to_vec())),
3236 (b"key3".to_vec(), Some(b"value3".to_vec())),
3237 (b"key4".to_vec(), Some(b"value4".to_vec())),
3238 (b"key2".to_vec(), Some(b"value2".to_vec())),
3239 (b"key1".to_vec(), Some(b"value1".to_vec())),
3240 (b"key5".to_vec(), None),
3241 (b"key3".to_vec(), None),
3242 ]);
3243 test_basic(&[
3244 (b"key5".to_vec(), Some(b"value5".to_vec())),
3245 (b"key3".to_vec(), Some(b"value3".to_vec())),
3246 (b"key4".to_vec(), Some(b"value4".to_vec())),
3247 (b"key2".to_vec(), Some(b"value2".to_vec())),
3248 (b"key1".to_vec(), Some(b"value1".to_vec())),
3249 (b"key5".to_vec(), None),
3250 (b"key3".to_vec(), None),
3251 (b"key2".to_vec(), None),
3252 (b"key4".to_vec(), None),
3253 ]);
3254 test_basic(&[
3255 (b"key5".to_vec(), Some(b"value5".to_vec())),
3256 (b"key3".to_vec(), Some(b"value3".to_vec())),
3257 (b"key4".to_vec(), Some(b"value4".to_vec())),
3258 (b"key2".to_vec(), Some(b"value2".to_vec())),
3259 (b"key1".to_vec(), Some(b"value1".to_vec())),
3260 (b"key5".to_vec(), None),
3261 (b"key3".to_vec(), None),
3262 (b"key2".to_vec(), None),
3263 (b"key4".to_vec(), None),
3264 (b"key1".to_vec(), None),
3265 ]);
3266 test_basic(&[
3267 ([5u8; 250].to_vec(), Some(b"value5".to_vec())),
3268 ([5u8; 200].to_vec(), Some(b"value3".to_vec())),
3269 ([5u8; 100].to_vec(), Some(b"value4".to_vec())),
3270 ([5u8; 150].to_vec(), Some(b"value2".to_vec())),
3271 ([5u8; 101].to_vec(), Some(b"value1".to_vec())),
3272 ([5u8; 250].to_vec(), None),
3273 ([5u8; 101].to_vec(), None),
3274 ]);
3275 }
3276
3277 #[test]
3278 fn test_btree_iter() {
3279 let col_nb = 0;
3280 let mut data_start = Vec::new();
3281 for i in 0u8..100 {
3282 let mut key = b"key0".to_vec();
3283 key[3] = i;
3284 let mut value = b"val0".to_vec();
3285 value[3] = i;
3286 data_start.push((col_nb, key, Some(value)));
3287 }
3288 let mut data_change = Vec::new();
3289 for i in 0u8..100 {
3290 let mut key = b"key0".to_vec();
3291 if i % 2 == 0 {
3292 key[2] = i;
3293 let mut value = b"val0".to_vec();
3294 value[2] = i;
3295 data_change.push((col_nb, key, Some(value)));
3296 } else if i % 3 == 0 {
3297 key[3] = i;
3298 data_change.push((col_nb, key, None));
3299 } else {
3300 key[3] = i;
3301 let mut value = b"val0".to_vec();
3302 value[2] = i;
3303 data_change.push((col_nb, key, Some(value)));
3304 }
3305 }
3306
3307 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3308 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3309 let mut end_state = start_state.clone();
3310 for (_c, k, v) in data_change.iter() {
3311 if let Some(v) = v {
3312 end_state.insert(k.clone(), v.clone());
3313 } else {
3314 end_state.remove(k);
3315 }
3316 }
3317
3318 for stage in [
3319 EnableCommitPipelineStages::CommitOverlay,
3320 EnableCommitPipelineStages::LogOverlay,
3321 EnableCommitPipelineStages::DbFile,
3322 EnableCommitPipelineStages::Standard,
3323 ] {
3324 for i in 0..10 {
3325 test_btree_iter_inner(
3326 stage,
3327 &data_start,
3328 &data_change,
3329 &start_state,
3330 &end_state,
3331 i * 5,
3332 );
3333 }
3334 let data_start = vec![
3335 (0, b"key1".to_vec(), Some(b"val1".to_vec())),
3336 (0, b"key3".to_vec(), Some(b"val3".to_vec())),
3337 ];
3338 let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))];
3339 let start_state: BTreeMap<Vec<u8>, Vec<u8>> =
3340 data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect();
3341 let mut end_state = start_state.clone();
3342 for (_c, k, v) in data_change.iter() {
3343 if let Some(v) = v {
3344 end_state.insert(k.clone(), v.clone());
3345 } else {
3346 end_state.remove(k);
3347 }
3348 }
3349 test_btree_iter_inner(stage, &data_start, &data_change, &start_state, &end_state, 1);
3350 }
3351 }
3352 fn test_btree_iter_inner(
3353 db_test: EnableCommitPipelineStages,
3354 data_start: &[(u8, Vec<u8>, Option<Value>)],
3355 data_change: &[(u8, Vec<u8>, Option<Value>)],
3356 start_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3357 end_state: &BTreeMap<Vec<u8>, Vec<u8>>,
3358 commit_at: usize,
3359 ) {
3360 let tmp = tempdir().unwrap();
3361 let mut options = db_test.options(tmp.path(), 5);
3362 let col_nb = 0;
3363 options.columns[col_nb as usize].btree_index = true;
3364 let db = Db::open_inner(&options, OpeningMode::Create).unwrap();
3365
3366 db.commit(data_start.iter().cloned()).unwrap();
3367 db_test.run_stages(&db);
3368
3369 let mut iter = db.iter(col_nb).unwrap();
3370 let mut iter_state = start_state.iter();
3371 let mut last_key = Value::new();
3372 for _ in 0..commit_at {
3373 let next = iter.next().unwrap();
3374 if let Some((k, _)) = next.as_ref() {
3375 last_key = k.clone();
3376 }
3377 assert_eq!(iter_state.next(), next.as_ref().map(|(k, v)| (k, v)));
3378 }
3379
3380 db.commit(data_change.iter().cloned()).unwrap();
3381 db_test.run_stages(&db);
3382
3383 let mut iter_state = end_state.range(last_key.clone()..);
3384 for _ in commit_at..100 {
3385 let mut state_next = iter_state.next();
3386 if let Some((k, _v)) = state_next.as_ref() {
3387 if *k == &last_key {
3388 state_next = iter_state.next();
3389 }
3390 }
3391 let iter_next = iter.next().unwrap();
3392 assert_eq!(state_next, iter_next.as_ref().map(|(k, v)| (k, v)));
3393 }
3394 let mut iter_state_rev = end_state.iter().rev();
3395 let mut iter = db.iter(col_nb).unwrap();
3396 iter.seek_to_last().unwrap();
3397 for _ in 0..100 {
3398 let next = iter.prev().unwrap();
3399 assert_eq!(iter_state_rev.next(), next.as_ref().map(|(k, v)| (k, v)));
3400 }
3401 }
3402
3403 #[cfg(feature = "instrumentation")]
3404 #[test]
3405 fn test_recover_from_log_on_error() {
3406 let tmp = tempdir().unwrap();
3407 let mut options = Options::with_columns(tmp.path(), 1);
3408 options.always_flush = true;
3409 options.with_background_thread = false;
3410
3411 {
3413 let db = Db::open_or_create(&options).unwrap();
3414 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3415 db.process_commits().unwrap();
3416 db.flush_logs().unwrap();
3417 db.enact_logs().unwrap();
3418 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3419 db.process_commits().unwrap();
3420 db.flush_logs().unwrap();
3421 crate::set_number_of_allowed_io_operations(4);
3422
3423 let err = db.enact_logs();
3425 assert!(err.is_err());
3426 db.inner.store_err(err);
3427 crate::set_number_of_allowed_io_operations(usize::MAX);
3428 }
3429
3430 {
3432 let db = Db::open(&options).unwrap();
3433 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3434 assert_eq!(db.get(0, &[1]).unwrap(), Some(vec![1]));
3435 }
3436 }
3437
3438 #[cfg(feature = "instrumentation")]
3439 #[test]
3440 fn test_partial_log_recovery() {
3441 let tmp = tempdir().unwrap();
3442 let mut options = Options::with_columns(tmp.path(), 1);
3443 options.columns[0].btree_index = true;
3444 options.always_flush = true;
3445 options.with_background_thread = false;
3446
3447 {
3449 let db = Db::open_or_create(&options).unwrap();
3450 db.commit::<_, Vec<u8>>(vec![(0, vec![0], Some(vec![0]))]).unwrap();
3451 db.process_commits().unwrap();
3452 db.commit::<_, Vec<u8>>(vec![(0, vec![1], Some(vec![1]))]).unwrap();
3453 crate::set_number_of_allowed_io_operations(4);
3454 assert!(db.process_commits().is_err());
3455 crate::set_number_of_allowed_io_operations(usize::MAX);
3456 db.flush_logs().unwrap();
3457 }
3458
3459 {
3461 let db = Db::open(&options).unwrap();
3462 assert_eq!(db.get(0, &[0]).unwrap(), Some(vec![0]));
3463 }
3464
3465 {
3467 let db = Db::open(&options).unwrap();
3468 assert!(db.get(0, &[0]).unwrap().is_some());
3469 }
3470 }
3471
3472 #[cfg(feature = "instrumentation")]
3473 #[test]
3474 fn test_continue_reindex() {
3475 let _ = env_logger::try_init();
3476 let tmp = tempdir().unwrap();
3477 let mut options = Options::with_columns(tmp.path(), 1);
3478 options.columns[0].preimage = true;
3479 options.columns[0].uniform = true;
3480 options.always_flush = true;
3481 options.with_background_thread = false;
3482 options.salt = Some(Default::default());
3483
3484 {
3485 let db = Db::open_or_create(&options).unwrap();
3487 let commit: Vec<_> = (0..65u32)
3488 .map(|index| {
3489 let mut key = [0u8; 32];
3490 key[2] = (index as u8) << 1;
3491 (0, key.to_vec(), Some(vec![index as u8]))
3492 })
3493 .collect();
3494 db.commit(commit).unwrap();
3495
3496 db.process_commits().unwrap();
3497 db.flush_logs().unwrap();
3498 db.enact_logs().unwrap();
3499 std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
3504 .unwrap();
3505 db.process_reindex().unwrap();
3506 db.flush_logs().unwrap();
3507 db.enact_logs().unwrap();
3508 db.clean_logs().unwrap();
3509 std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
3510 .unwrap();
3511 }
3512
3513 {
3515 let db = Db::open(&options).unwrap();
3516 db.process_reindex().unwrap();
3517 let mut entries = 0;
3518 db.iter_column_while(0, |_| {
3519 entries += 1;
3520 true
3521 })
3522 .unwrap();
3523
3524 assert_eq!(entries, 65);
3525 assert_eq!(db.inner.columns[0].index_bits(), Some(17));
3526 }
3527 }
3528
3529 #[test]
3530 fn test_remove_column() {
3531 let tmp = tempdir().unwrap();
3532 let db_test_file = EnableCommitPipelineStages::DbFile;
3533 let mut options_db_files = db_test_file.options(tmp.path(), 2);
3534 options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
3535 let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
3536 options_std.salt = options_db_files.salt.clone();
3537
3538 let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();
3539
3540 let payload: Vec<(u8, _, _)> = (0u16..100)
3541 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3542 .collect();
3543
3544 db.commit(payload.clone()).unwrap();
3545
3546 db_test_file.run_stages(&db);
3547 drop(db);
3548
3549 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3550 for (col, key, value) in payload.iter() {
3551 assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
3552 }
3553 drop(db);
3554 Db::reset_column(&mut options_db_files, 1, None).unwrap();
3555
3556 let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
3557 for (col, key, _value) in payload.iter() {
3558 assert_eq!(db.get(*col, key).unwrap(), None);
3559 }
3560
3561 let payload: Vec<(u8, _, _)> = (0u16..10)
3562 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3563 .collect();
3564
3565 db.commit(payload.clone()).unwrap();
3566
3567 db_test_file.run_stages(&db);
3568 drop(db);
3569
3570 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3571 let payload: Vec<(u8, _, _)> = (10u16..100)
3572 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3573 .collect();
3574
3575 db.commit(payload.clone()).unwrap();
3576 assert!(db.iter(1).is_err());
3577
3578 drop(db);
3579
3580 let mut col_option = options_std.columns[1].clone();
3581 col_option.btree_index = true;
3582 Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();
3583
3584 let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
3585 let payload: Vec<(u8, _, _)> = (0u16..10)
3586 .map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
3587 .collect();
3588
3589 db.commit(payload.clone()).unwrap();
3590 assert!(db.iter(1).is_ok());
3591 }
3592}