bitcoinleveldb_db/db_impl.rs
1crate::ix!();
2
3//-------------------------------------------[.cpp/bitcoin/src/leveldb/db/db_impl.h]
4//-------------------------------------------[.cpp/bitcoin/src/leveldb/db/db_impl.cc]
5
6/**
7 | Information kept for every waiting
8 | writer
9 |
10 */
11pub struct DBImplWriter {
12 status: Status,
13 batch: *mut WriteBatch,
14 sync: bool,
15 done: bool,
16 cv: Condvar,
17}
18
19impl DBImplWriter {
20
21 pub fn new(mu: *mut parking_lot::RawMutex) -> Self {
22
23 todo!();
24 /*
25 : batch(nullptr), sync(false), done(false), cv(mu)
26 */
27 }
28}
29
30///------------------------------
31pub struct DBImpl {
32
33 /**
34 | Constant after construction
35 |
36 */
37 env: Box<dyn Env>,
38
39 internal_comparator: InternalKeyComparator,
40 internal_filter_policy: InternalFilterPolicy,
41
42 /**
43 | options_.comparator == &internal_comparator_
44 |
45 */
46 options: Options,
47
48 owns_info_log: bool,
49 owns_cache: bool,
50 dbname: String,
51
52 /**
53 | table_cache_ provides its own synchronization
54 |
55 */
56 table_cache: *const TableCache,
57
58 /**
59 | Lock over the persistent DB state.
60 |
61 | Non-null iff successfully acquired.
62 |
63 */
64 db_lock: Rc<RefCell<dyn FileLock>>,
65
66 /**
67 | State below is protected by mutex_
68 |
69 */
70 mutex: Mutex<db_impl::Inner>,
71
72 shutting_down: AtomicBool,
73
74 mem: *mut MemTable,
75
76 /**
77 | So bg thread can detect non-null imm_
78 |
79 */
80 has_imm: AtomicBool,
81
82 logfile: Rc<RefCell<dyn WritableFile>>,
83 log: *mut LogWriter,
84}
85
86impl DB for DBImpl {
87
88}
89
90impl GetApproximateSizes for DBImpl {
91
92 fn get_approximate_sizes(&mut self,
93 range: *const db::Range,
94 n: i32,
95 sizes: *mut u64) {
96
97 todo!();
98 /*
99 // TODO(opt): better implementation
100 MutexLock l(&mutex_);
101 Version* v = versions_->current();
102 v->Ref();
103
104 for (int i = 0; i < n; i++) {
105 // Convert user_key into a corresponding internal key.
106 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
107 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
108 uint64_t start = versions_->ApproximateOffsetOf(v, k1);
109 uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
110 sizes[i] = (limit >= start ? limit - start : 0);
111 }
112
113 v->Unref();
114 */
115 }
116}
117
118mod db_impl {
119
120 use super::*;
121
122 pub struct Inner {
123
124 background_work_finished_signal: Condvar,
125
126 /**
127 | Memtable being compacted
128 |
129 */
130 imm: *mut MemTable,
131
132 logfile_number: u64,
133
134 /**
135 | For sampling.
136 |
137 */
138 seed: u32,
139
140 /**
141 | Queue of writers.
142 |
143 */
144 writers: VecDeque<*mut DBImplWriter>,
145 tmp_batch: *mut WriteBatch,
146 snapshots: SnapshotList,
147
148 /**
149 | Set of table files to protect from deletion
150 | because they are part of ongoing compactions.
151 |
152 */
153 pending_outputs: HashSet<u64>,
154
155 /**
156 | Has a background compaction been scheduled
157 | or is running?
158 |
159 */
160 background_compaction_scheduled: bool,
161 manual_compaction: *mut ManualCompaction,
162 versions: *const VersionSet,
163
164 /**
165 | Have we encountered a background error
166 | in paranoid mode?
167 |
168 */
169 bg_error: Status,
170 stats: [CompactionStats; NUM_LEVELS],
171 }
172}
173
174impl DBImpl {
175
176 /* ------- Implementations of the DB interface ------- */
177
178 /*
179 | Extra methods (for testing) that are
180 | not in the public DB interface
181 |
182 */
183
184 pub fn user_comparator(&self) -> Box<dyn SliceComparator> {
185
186 todo!();
187 /*
188 return internal_comparator_.user_comparator();
189 */
190 }
191}
192
193pub const NUM_NON_TABLE_CACHE_FILES: i32 = 10;
194
195/**
196 | Fix user-supplied options to be reasonable
197 |
198 */
199pub fn clip_to_range<T, V>(
200 ptr: *mut T,
201 minvalue: V,
202 maxvalue: V) {
203
204 todo!();
205 /*
206 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
207 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
208 */
209}
210
211/**
212 | Sanitize db options. The caller should
213 | delete result.info_log if it is not
214 | equal to src.info_log.
215 |
216 */
217pub fn sanitize_options(
218 dbname: &String,
219 icmp: *const InternalKeyComparator,
220 ipolicy: *const InternalFilterPolicy,
221 src: &Options) -> Options {
222
223 todo!();
224 /*
225 Options result = src;
226 result.comparator = icmp;
227 result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
228 ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
229 ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
230 ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
231 ClipToRange(&result.block_size, 1 << 10, 4 << 20);
232 if (result.info_log == nullptr) {
233 // Open a log file in the same directory as the db
234 src.env->CreateDir(dbname); // In case it does not exist
235 src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
236 Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
237 if (!s.ok()) {
238 // No place suitable for logging
239 result.info_log = nullptr;
240 }
241 }
242 if (result.block_cache == nullptr) {
243 result.block_cache = NewLRUCache(8 << 20);
244 }
245 return result;
246 */
247}
248
249pub fn table_cache_size(sanitized_options: &Options) -> i32 {
250
251 todo!();
252 /*
253 // Reserve ten files or so for other uses and give the rest to TableCache.
254 return sanitized_options.max_open_files - kNumNonTableCacheFiles;
255 */
256}
257
258impl DBImpl {
259
260 pub fn new(
261 raw_options: &Options,
262 dbname: &String) -> Self {
263
264 todo!();
265 /*
266
267
268 : env_(raw_options.env),
269 internal_comparator_(raw_options.comparator),
270 internal_filter_policy_(raw_options.filter_policy),
271 options_(SanitizeOptions(dbname, &internal_comparator_,
272 &internal_filter_policy_, raw_options)),
273 owns_info_log_(options_.info_log != raw_options.info_log),
274 owns_cache_(options_.block_cache != raw_options.block_cache),
275 dbname_(dbname),
276 table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
277 db_lock_(nullptr),
278 shutting_down_(false),
279 background_work_finished_signal_(&mutex_),
280 mem_(nullptr),
281 imm_(nullptr),
282 has_imm_(false),
283 logfile_(nullptr),
284 logfile_number_(0),
285 log_(nullptr),
286 seed_(0),
287 tmp_batch_(new WriteBatch),
288 background_compaction_scheduled_(false),
289 manual_compaction_(nullptr),
290 versions_(new VersionSet(dbname_, &options_, table_cache_,
291 &internal_comparator_))
292 */
293 }
294}
295
296impl Drop for DBImpl {
297 fn drop(&mut self) {
298 todo!();
299 /*
300 // Wait for background work to finish.
301 mutex_.Lock();
302 shutting_down_.store(true, std::memory_order_release);
303 while (background_compaction_scheduled_) {
304 background_work_finished_signal_.Wait();
305 }
306 mutex_.Unlock();
307
308 if (db_lock_ != nullptr) {
309 env_->UnlockFile(db_lock_);
310 }
311
312 delete versions_;
313 if (mem_ != nullptr) mem_->Unref();
314 if (imm_ != nullptr) imm_->Unref();
315 delete tmp_batch_;
316 delete log_;
317 delete logfile_;
318 delete table_cache_;
319
320 if (owns_info_log_) {
321 delete options_.info_log;
322 }
323 if (owns_cache_) {
324 delete options_.block_cache;
325 }
326 */
327 }
328}
329
330impl CompactRange for DBImpl {
331
332 fn compact_range(&mut self,
333 begin: *const Slice,
334 end: *const Slice) {
335
336 todo!();
337 /*
338 int max_level_with_files = 1;
339 {
340 MutexLock l(&mutex_);
341 Version* base = versions_->current();
342 for (int level = 1; level < config::kNumLevels; level++) {
343 if (base->OverlapInLevel(level, begin, end)) {
344 max_level_with_files = level;
345 }
346 }
347 }
348 TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
349 for (int level = 0; level < max_level_with_files; level++) {
350 TEST_CompactRange(level, begin, end);
351 }
352 */
353 }
354}
355
356impl DBImpl {
357
358 pub fn newdb(&mut self) -> crate::Status {
359
360 todo!();
361 /*
362 VersionEdit new_db;
363 new_db.SetComparatorName(user_comparator()->Name());
364 new_db.SetLogNumber(0);
365 new_db.SetNextFile(2);
366 new_db.SetLastSequence(0);
367
368 const std::string manifest = DescriptorFileName(dbname_, 1);
369 WritableFile* file;
370 Status s = env_->NewWritableFile(manifest, &file);
371 if (!s.ok()) {
372 return s;
373 }
374 {
375 LogWriter log(file);
376 std::string record;
377 new_db.EncodeTo(&record);
378 s = log.AddRecord(record);
379 if (s.ok()) {
380 s = file->Close();
381 }
382 }
383 delete file;
384 if (s.ok()) {
385 // Make "CURRENT" file that points to the new manifest file.
386 s = SetCurrentFile(env_, dbname_, 1);
387 } else {
388 env_->DeleteFile(manifest);
389 }
390 return s;
391 */
392 }
393
394 pub fn maybe_ignore_error(&self, s: *mut Status) {
395
396 todo!();
397 /*
398 if (s->ok() || options_.paranoid_checks) {
399 // No change needed
400 } else {
401 Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
402 *s = Status::OK();
403 }
404 */
405 }
406
407 /**
408 | Delete any unneeded files and stale
409 | in-memory entries.
410 |
411 */
412 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
413 pub fn delete_obsolete_files(&mut self) {
414
415 todo!();
416 /*
417 mutex_.AssertHeld();
418
419 if (!bg_error_.ok()) {
420 // After a background error, we don't know whether a new version may
421 // or may not have been committed, so we cannot safely garbage collect.
422 return;
423 }
424
425 // Make a set of all of the live files
426 std::set<uint64_t> live = pending_outputs_;
427 versions_->AddLiveFiles(&live);
428
429 std::vector<std::string> filenames;
430 env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
431 uint64_t number;
432 FileType type;
433 std::vector<std::string> files_to_delete;
434 for (std::string& filename : filenames) {
435 if (ParseFileName(filename, &number, &type)) {
436 bool keep = true;
437 switch (type) {
438 case kLogFile:
439 keep = ((number >= versions_->LogNumber()) ||
440 (number == versions_->PrevLogNumber()));
441 break;
442 case kDescriptorFile:
443 // Keep my manifest file, and any newer incarnations'
444 // (in case there is a race that allows other incarnations)
445 keep = (number >= versions_->ManifestFileNumber());
446 break;
447 case kTableFile:
448 keep = (live.find(number) != live.end());
449 break;
450 case kTempFile:
451 // Any temp files that are currently being written to must
452 // be recorded in pending_outputs_, which is inserted into "live"
453 keep = (live.find(number) != live.end());
454 break;
455 case kCurrentFile:
456 case kDBLockFile:
457 case kInfoLogFile:
458 keep = true;
459 break;
460 }
461
462 if (!keep) {
463 files_to_delete.push_back(std::move(filename));
464 if (type == kTableFile) {
465 table_cache_->Evict(number);
466 }
467 Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
468 static_cast<unsigned long long>(number));
469 }
470 }
471 }
472
473 // While deleting all files unblock other threads. All files being deleted
474 // have unique names which will not collide with newly created files and
475 // are therefore safe to delete while allowing other threads to proceed.
476 mutex_.Unlock();
477 for (const std::string& filename : files_to_delete) {
478 env_->DeleteFile(dbname_ + "/" + filename);
479 }
480 mutex_.Lock();
481 */
482 }
483
484 /**
485 | Recover the descriptor from persistent
486 | storage. May do a significant amount of work
487 | to recover recently logged updates. Any
488 | changes to be made to the descriptor are
489 | added to *edit.
490 */
491 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
492 pub fn recover(&mut self,
493 edit: *mut VersionEdit,
494 save_manifest: *mut bool) -> crate::Status {
495
496 todo!();
497 /*
498 mutex_.AssertHeld();
499
500 // Ignore error from CreateDir since the creation of the DB is
501 // committed only when the descriptor is created, and this directory
502 // may already exist from a previous failed creation attempt.
503 env_->CreateDir(dbname_);
504 assert(db_lock_ == nullptr);
505 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
506 if (!s.ok()) {
507 return s;
508 }
509
510 if (!env_->FileExists(CurrentFileName(dbname_))) {
511 if (options_.create_if_missing) {
512 s = NewDB();
513 if (!s.ok()) {
514 return s;
515 }
516 } else {
517 return Status::InvalidArgument(
518 dbname_, "does not exist (create_if_missing is false)");
519 }
520 } else {
521 if (options_.error_if_exists) {
522 return Status::InvalidArgument(dbname_,
523 "exists (error_if_exists is true)");
524 }
525 }
526
527 s = versions_->Recover(save_manifest);
528 if (!s.ok()) {
529 return s;
530 }
531 SequenceNumber max_sequence(0);
532
533 // Recover from all newer log files than the ones named in the
534 // descriptor (new log files may have been added by the previous
535 // incarnation without registering them in the descriptor).
536 //
537 // Note that PrevLogNumber() is no longer used, but we pay
538 // attention to it in case we are recovering a database
539 // produced by an older version of leveldb.
540 const uint64_t min_log = versions_->LogNumber();
541 const uint64_t prev_log = versions_->PrevLogNumber();
542 std::vector<std::string> filenames;
543 s = env_->GetChildren(dbname_, &filenames);
544 if (!s.ok()) {
545 return s;
546 }
547 std::set<uint64_t> expected;
548 versions_->AddLiveFiles(&expected);
549 uint64_t number;
550 FileType type;
551 std::vector<uint64_t> logs;
552 for (size_t i = 0; i < filenames.size(); i++) {
553 if (ParseFileName(filenames[i], &number, &type)) {
554 expected.erase(number);
555 if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
556 logs.push_back(number);
557 }
558 }
559 if (!expected.empty()) {
560 char buf[50];
561 snprintf(buf, sizeof(buf), "%d missing files; e.g.",
562 static_cast<int>(expected.size()));
563 return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
564 }
565
566 // Recover in the order in which the logs were generated
567 std::sort(logs.begin(), logs.end());
568 for (size_t i = 0; i < logs.size(); i++) {
569 s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
570 &max_sequence);
571 if (!s.ok()) {
572 return s;
573 }
574
575 // The previous incarnation may not have written any MANIFEST
576 // records after allocating this log number. So we manually
577 // update the file number allocation counter in VersionSet.
578 versions_->MarkFileNumberUsed(logs[i]);
579 }
580
581 if (versions_->LastSequence() < max_sequence) {
582 versions_->SetLastSequence(max_sequence);
583 }
584
585 return Status::OK();
586 */
587 }
588
589 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
590 pub fn recover_log_file(&mut self,
591 log_number: u64,
592 last_log: bool,
593 save_manifest: *mut bool,
594 edit: *mut VersionEdit,
595 max_sequence: *mut SequenceNumber) -> crate::Status {
596
597 todo!();
598 /*
599 struct LogReporter : public LogReader::Reporter {
600 Env* env;
601 Logger* info_log;
602 const char* fname;
603 Status* status; // null if options_.paranoid_checks==false
604 c_void Corruption(size_t bytes, const Status& s) override {
605 Log(info_log, "%s%s: dropping %d bytes; %s",
606 (this->status == nullptr ? "(ignoring error) " : ""), fname,
607 static_cast<int>(bytes), s.ToString().c_str());
608 if (this->status != nullptr && this->status->ok()) *this->status = s;
609 }
610 };
611
612 mutex_.AssertHeld();
613
614 // Open the log file
615 std::string fname = LogFileName(dbname_, log_number);
616 SequentialFile* file;
617 Status status = env_->NewSequentialFile(fname, &file);
618 if (!status.ok()) {
619 MaybeIgnoreError(&status);
620 return status;
621 }
622
623 // Create the log reader.
624 LogReporter reporter;
625 reporter.env = env_;
626 reporter.info_log = options_.info_log;
627 reporter.fname = fname.c_str();
628 reporter.status = (options_.paranoid_checks ? &status : nullptr);
629 // We intentionally make LogReader do checksumming even if
630 // paranoid_checks==false so that corruptions cause entire commits
631 // to be skipped instead of propagating bad information (like overly
632 // large sequence numbers).
633 LogReader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
634 Log(options_.info_log, "Recovering log #%llu",
635 (unsigned long long)log_number);
636
637 // Read all the records and add to a memtable
638 std::string scratch;
639 Slice record;
640 WriteBatch batch;
641 int compactions = 0;
642 MemTable* mem = nullptr;
643 while (reader.ReadRecord(&record, &scratch) && status.ok()) {
644 if (record.size() < 12) {
645 reporter.Corruption(record.size(),
646 Status::Corruption("log record too small", fname));
647 continue;
648 }
649 WriteBatchInternal::SetContents(&batch, record);
650
651 if (mem == nullptr) {
652 mem = new MemTable(internal_comparator_);
653 mem->Ref();
654 }
655 status = WriteBatchInternal::InsertInto(&batch, mem);
656 MaybeIgnoreError(&status);
657 if (!status.ok()) {
658 break;
659 }
660 const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
661 WriteBatchInternal::Count(&batch) - 1;
662 if (last_seq > *max_sequence) {
663 *max_sequence = last_seq;
664 }
665
666 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
667 compactions++;
668 *save_manifest = true;
669 status = WriteLevel0Table(mem, edit, nullptr);
670 mem->Unref();
671 mem = nullptr;
672 if (!status.ok()) {
673 // Reflect errors immediately so that conditions like full
674 // file-systems cause the DB::Open() to fail.
675 break;
676 }
677 }
678 }
679
680 delete file;
681
682 // See if we should keep reusing the last log file.
683 if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
684 assert(logfile_ == nullptr);
685 assert(log_ == nullptr);
686 assert(mem_ == nullptr);
687 uint64_t lfile_size;
688 if (env_->GetFileSize(fname, &lfile_size).ok() &&
689 env_->NewAppendableFile(fname, &logfile_).ok()) {
690 Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
691 log_ = new LogWriter(logfile_, lfile_size);
692 logfile_number_ = log_number;
693 if (mem != nullptr) {
694 mem_ = mem;
695 mem = nullptr;
696 } else {
697 // mem can be nullptr if lognum exists but was empty.
698 mem_ = new MemTable(internal_comparator_);
699 mem_->Ref();
700 }
701 }
702 }
703
704 if (mem != nullptr) {
705 // mem did not get reused; compact it.
706 if (status.ok()) {
707 *save_manifest = true;
708 status = WriteLevel0Table(mem, edit, nullptr);
709 }
710 mem->Unref();
711 }
712
713 return status;
714 */
715 }
716
717 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
718 pub fn write_level_0table(&mut self,
719 mem: *mut MemTable,
720 edit: *mut VersionEdit,
721 base: *mut Version) -> crate::Status {
722
723 todo!();
724 /*
725 mutex_.AssertHeld();
726 const uint64_t start_micros = env_->NowMicros();
727 FileMetaData meta;
728 meta.number = versions_->NewFileNumber();
729 pending_outputs_.insert(meta.number);
730 Iterator* iter = mem->NewIterator();
731 Log(options_.info_log, "Level-0 table #%llu: started",
732 (unsigned long long)meta.number);
733
734 Status s;
735 {
736 mutex_.Unlock();
737 s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
738 mutex_.Lock();
739 }
740
741 Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
742 (unsigned long long)meta.number, (unsigned long long)meta.file_size,
743 s.ToString().c_str());
744 delete iter;
745 pending_outputs_.erase(meta.number);
746
747 // Note that if file_size is zero, the file has been deleted and
748 // should not be added to the manifest.
749 int level = 0;
750 if (s.ok() && meta.file_size > 0) {
751 const Slice min_user_key = meta.smallest.user_key();
752 const Slice max_user_key = meta.largest.user_key();
753 if (base != nullptr) {
754 level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
755 }
756 edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
757 meta.largest);
758 }
759
760 CompactionStats stats;
761 stats.micros = env_->NowMicros() - start_micros;
762 stats.bytes_written = meta.file_size;
763 stats_[level].Add(stats);
764 return s;
765 */
766 }
767
768 /**
769 | Compact the in-memory write buffer to disk.
770 | Switches to a new log-file/memtable and
771 | writes a new descriptor iff successful.
772 |
773 | Errors are recorded in bg_error_.
774 */
775 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
776 pub fn compact_mem_table(&mut self) {
777
778 todo!();
779 /*
780 mutex_.AssertHeld();
781 assert(imm_ != nullptr);
782
783 // Save the contents of the memtable as a new Table
784 VersionEdit edit;
785 Version* base = versions_->current();
786 base->Ref();
787 Status s = WriteLevel0Table(imm_, &edit, base);
788 base->Unref();
789
790 if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
791 s = Status::IOError("Deleting DB during memtable compaction");
792 }
793
794 // Replace immutable memtable with the generated Table
795 if (s.ok()) {
796 edit.SetPrevLogNumber(0);
797 edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
798 s = versions_->LogAndApply(&edit, &mutex_);
799 }
800
801 if (s.ok()) {
802 // Commit to the new state
803 imm_->Unref();
804 imm_ = nullptr;
805 has_imm_.store(false, std::memory_order_release);
806 DeleteObsoleteFiles();
807 } else {
808 RecordBackgroundError(s);
809 }
810 */
811 }
812
813 /**
814 | Compact any files in the named level
815 | that overlap [*begin,*end]
816 |
817 */
818 pub fn test_compact_range(&mut self,
819 level: i32,
820 begin: *const Slice,
821 end: *const Slice) {
822
823 todo!();
824 /*
825 assert(level >= 0);
826 assert(level + 1 < config::kNumLevels);
827
828 InternalKey begin_storage, end_storage;
829
830 ManualCompaction manual;
831 manual.level = level;
832 manual.done = false;
833 if (begin == nullptr) {
834 manual.begin = nullptr;
835 } else {
836 begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
837 manual.begin = &begin_storage;
838 }
839 if (end == nullptr) {
840 manual.end = nullptr;
841 } else {
842 end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
843 manual.end = &end_storage;
844 }
845
846 MutexLock l(&mutex_);
847 while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
848 bg_error_.ok()) {
849 if (manual_compaction_ == nullptr) { // Idle
850 manual_compaction_ = &manual;
851 MaybeScheduleCompaction();
852 } else { // Running either my compaction or another compaction.
853 background_work_finished_signal_.Wait();
854 }
855 }
856 if (manual_compaction_ == &manual) {
857 // Cancel my manual compaction since we aborted early for some reason.
858 manual_compaction_ = nullptr;
859 }
860 */
861 }
862
863 /**
864 | Force current memtable contents to
865 | be compacted.
866 |
867 */
868 pub fn test_compact_mem_table(&mut self) -> crate::Status {
869
870 todo!();
871 /*
872 // nullptr batch means just wait for earlier writes to be done
873 Status s = Write(WriteOptions(), nullptr);
874 if (s.ok()) {
875 // Wait until the compaction completes
876 MutexLock l(&mutex_);
877 while (imm_ != nullptr && bg_error_.ok()) {
878 background_work_finished_signal_.Wait();
879 }
880 if (imm_ != nullptr) {
881 s = bg_error_;
882 }
883 }
884 return s;
885 */
886 }
887
888 pub fn record_background_error(&mut self, s: &Status) {
889
890 todo!();
891 /*
892 mutex_.AssertHeld();
893 if (bg_error_.ok()) {
894 bg_error_ = s;
895 background_work_finished_signal_.SignalAll();
896 }
897 */
898 }
899
900 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
901 pub fn maybe_schedule_compaction(&mut self) {
902
903 todo!();
904 /*
905 mutex_.AssertHeld();
906 if (background_compaction_scheduled_) {
907 // Already scheduled
908 } else if (shutting_down_.load(std::memory_order_acquire)) {
909 // DB is being deleted; no more background compactions
910 } else if (!bg_error_.ok()) {
911 // Already got an error; no more changes
912 } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
913 !versions_->NeedsCompaction()) {
914 // No work to be done
915 } else {
916 background_compaction_scheduled_ = true;
917 env_->Schedule(&DBImpl::BGWork, this);
918 }
919 */
920 }
921
922 pub fn bg_work(&mut self, db: *mut c_void) {
923
924 todo!();
925 /*
926 reinterpret_cast<DBImpl*>(db)->BackgroundCall();
927 */
928 }
929
930 pub fn background_call(&mut self) {
931
932 todo!();
933 /*
934 MutexLock l(&mutex_);
935 assert(background_compaction_scheduled_);
936 if (shutting_down_.load(std::memory_order_acquire)) {
937 // No more background work when shutting down.
938 } else if (!bg_error_.ok()) {
939 // No more background work after a background error.
940 } else {
941 BackgroundCompaction();
942 }
943
944 background_compaction_scheduled_ = false;
945
946 // Previous compaction may have produced too many files in a level,
947 // so reschedule another compaction if needed.
948 MaybeScheduleCompaction();
949 background_work_finished_signal_.SignalAll();
950 */
951 }
952
953 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
954 pub fn background_compaction(&mut self) {
955
956 todo!();
957 /*
958 mutex_.AssertHeld();
959
960 if (imm_ != nullptr) {
961 CompactMemTable();
962 return;
963 }
964
965 Compaction* c;
966 bool is_manual = (manual_compaction_ != nullptr);
967 InternalKey manual_end;
968 if (is_manual) {
969 ManualCompaction* m = manual_compaction_;
970 c = versions_->CompactRange(m->level, m->begin, m->end);
971 m->done = (c == nullptr);
972 if (c != nullptr) {
973 manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
974 }
975 Log(options_.info_log,
976 "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
977 m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
978 (m->end ? m->end->DebugString().c_str() : "(end)"),
979 (m->done ? "(end)" : manual_end.DebugString().c_str()));
980 } else {
981 c = versions_->PickCompaction();
982 }
983
984 Status status;
985 if (c == nullptr) {
986 // Nothing to do
987 } else if (!is_manual && c->IsTrivialMove()) {
988 // Move file to next level
989 assert(c->num_input_files(0) == 1);
990 FileMetaData* f = c->input(0, 0);
991 c->edit()->DeleteFile(c->level(), f->number);
992 c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
993 f->largest);
994 status = versions_->LogAndApply(c->edit(), &mutex_);
995 if (!status.ok()) {
996 RecordBackgroundError(status);
997 }
998 VersionSet::LevelSummaryStorage tmp;
999 Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
1000 static_cast<unsigned long long>(f->number), c->level() + 1,
1001 static_cast<unsigned long long>(f->file_size),
1002 status.ToString().c_str(), versions_->LevelSummary(&tmp));
1003 } else {
1004 CompactionState* compact = new CompactionState(c);
1005 status = DoCompactionWork(compact);
1006 if (!status.ok()) {
1007 RecordBackgroundError(status);
1008 }
1009 CleanupCompaction(compact);
1010 c->ReleaseInputs();
1011 DeleteObsoleteFiles();
1012 }
1013 delete c;
1014
1015 if (status.ok()) {
1016 // Done
1017 } else if (shutting_down_.load(std::memory_order_acquire)) {
1018 // Ignore compaction errors found during shutting down
1019 } else {
1020 Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
1021 }
1022
1023 if (is_manual) {
1024 ManualCompaction* m = manual_compaction_;
1025 if (!status.ok()) {
1026 m->done = true;
1027 }
1028 if (!m->done) {
1029 // We only compacted part of the requested range. Update *m
1030 // to the range that is left to be compacted.
1031 m->tmp_storage = manual_end;
1032 m->begin = &m->tmp_storage;
1033 }
1034 manual_compaction_ = nullptr;
1035 }
1036 */
1037 }
1038
1039 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
1040 pub fn cleanup_compaction(&mut self, compact: *mut CompactionState) {
1041
1042 todo!();
1043 /*
1044 mutex_.AssertHeld();
1045 if (compact->builder != nullptr) {
1046 // May happen if we get a shutdown call in the middle of compaction
1047 compact->builder->Abandon();
1048 delete compact->builder;
1049 } else {
1050 assert(compact->outfile == nullptr);
1051 }
1052 delete compact->outfile;
1053 for (size_t i = 0; i < compact->outputs.size(); i++) {
1054 const CompactionState::Output& out = compact->outputs[i];
1055 pending_outputs_.erase(out.number);
1056 }
1057 delete compact;
1058 */
1059 }
1060
1061 pub fn open_compaction_output_file(&mut self, compact: *mut CompactionState) -> crate::Status {
1062
1063 todo!();
1064 /*
1065 assert(compact != nullptr);
1066 assert(compact->builder == nullptr);
1067 uint64_t file_number;
1068 {
1069 mutex_.Lock();
1070 file_number = versions_->NewFileNumber();
1071 pending_outputs_.insert(file_number);
1072 CompactionState::Output out;
1073 out.number = file_number;
1074 out.smallest.Clear();
1075 out.largest.Clear();
1076 compact->outputs.push_back(out);
1077 mutex_.Unlock();
1078 }
1079
1080 // Make the output file
1081 std::string fname = TableFileName(dbname_, file_number);
1082 Status s = env_->NewWritableFile(fname, &compact->outfile);
1083 if (s.ok()) {
1084 compact->builder = new TableBuilder(options_, compact->outfile);
1085 }
1086 return s;
1087 */
1088 }
1089
1090 pub fn finish_compaction_output_file(&mut self,
1091 compact: *mut CompactionState,
1092 input: *mut LevelDBIterator) -> Status {
1093
1094 todo!();
1095 /*
1096 assert(compact != nullptr);
1097 assert(compact->outfile != nullptr);
1098 assert(compact->builder != nullptr);
1099
1100 const uint64_t output_number = compact->current_output()->number;
1101 assert(output_number != 0);
1102
1103 // Check for iterator errors
1104 Status s = input->status();
1105 const uint64_t current_entries = compact->builder->NumEntries();
1106 if (s.ok()) {
1107 s = compact->builder->Finish();
1108 } else {
1109 compact->builder->Abandon();
1110 }
1111 const uint64_t current_bytes = compact->builder->FileSize();
1112 compact->current_output()->file_size = current_bytes;
1113 compact->total_bytes += current_bytes;
1114 delete compact->builder;
1115 compact->builder = nullptr;
1116
1117 // Finish and check for file errors
1118 if (s.ok()) {
1119 s = compact->outfile->Sync();
1120 }
1121 if (s.ok()) {
1122 s = compact->outfile->Close();
1123 }
1124 delete compact->outfile;
1125 compact->outfile = nullptr;
1126
1127 if (s.ok() && current_entries > 0) {
1128 // Verify that the table is usable
1129 Iterator* iter =
1130 table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
1131 s = iter->status();
1132 delete iter;
1133 if (s.ok()) {
1134 Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
1135 (unsigned long long)output_number, compact->compaction->level(),
1136 (unsigned long long)current_entries,
1137 (unsigned long long)current_bytes);
1138 }
1139 }
1140 return s;
1141 */
1142 }
1143
1144 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
1145 pub fn install_compaction_results(&mut self, compact: *mut CompactionState) -> crate::Status {
1146
1147 todo!();
1148 /*
1149 mutex_.AssertHeld();
1150 Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
1151 compact->compaction->num_input_files(0), compact->compaction->level(),
1152 compact->compaction->num_input_files(1), compact->compaction->level() + 1,
1153 static_cast<long long>(compact->total_bytes));
1154
1155 // Add compaction outputs
1156 compact->compaction->AddInputDeletions(compact->compaction->edit());
1157 const int level = compact->compaction->level();
1158 for (size_t i = 0; i < compact->outputs.size(); i++) {
1159 const CompactionState::Output& out = compact->outputs[i];
1160 compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
1161 out.smallest, out.largest);
1162 }
1163 return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
1164 */
1165 }
1166
1167 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
1168 pub fn do_compaction_work(&mut self, compact: *mut CompactionState) -> crate::Status {
1169
1170 todo!();
1171 /*
1172 const uint64_t start_micros = env_->NowMicros();
1173 int64_t imm_micros = 0; // Micros spent doing imm_ compactions
1174
1175 Log(options_.info_log, "Compacting %d@%d + %d@%d files",
1176 compact->compaction->num_input_files(0), compact->compaction->level(),
1177 compact->compaction->num_input_files(1),
1178 compact->compaction->level() + 1);
1179
1180 assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
1181 assert(compact->builder == nullptr);
1182 assert(compact->outfile == nullptr);
1183 if (snapshots_.empty()) {
1184 compact->smallest_snapshot = versions_->LastSequence();
1185 } else {
1186 compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
1187 }
1188
1189 Iterator* input = versions_->MakeInputIterator(compact->compaction);
1190
1191 // Release mutex while we're actually doing the compaction work
1192 mutex_.Unlock();
1193
1194 input->SeekToFirst();
1195 Status status;
1196 ParsedInternalKey ikey;
1197 std::string current_user_key;
1198 bool has_current_user_key = false;
1199 SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
1200 while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
1201 // Prioritize immutable compaction work
1202 if (has_imm_.load(std::memory_order_relaxed)) {
1203 const uint64_t imm_start = env_->NowMicros();
1204 mutex_.Lock();
1205 if (imm_ != nullptr) {
1206 CompactMemTable();
1207 // Wake up MakeRoomForWrite() if necessary.
1208 background_work_finished_signal_.SignalAll();
1209 }
1210 mutex_.Unlock();
1211 imm_micros += (env_->NowMicros() - imm_start);
1212 }
1213
1214 Slice key = input->key();
1215 if (compact->compaction->ShouldStopBefore(key) &&
1216 compact->builder != nullptr) {
1217 status = FinishCompactionOutputFile(compact, input);
1218 if (!status.ok()) {
1219 break;
1220 }
1221 }
1222
1223 // Handle key/value, add to state, etc.
1224 bool drop = false;
1225 if (!ParseInternalKey(key, &ikey)) {
1226 // Do not hide error keys
1227 current_user_key.clear();
1228 has_current_user_key = false;
1229 last_sequence_for_key = kMaxSequenceNumber;
1230 } else {
1231 if (!has_current_user_key ||
1232 user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
1233 0) {
1234 // First occurrence of this user key
1235 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
1236 has_current_user_key = true;
1237 last_sequence_for_key = kMaxSequenceNumber;
1238 }
1239
1240 if (last_sequence_for_key <= compact->smallest_snapshot) {
1241 // Hidden by an newer entry for same user key
1242 drop = true; // (A)
1243 } else if (ikey.type == kTypeDeletion &&
1244 ikey.sequence <= compact->smallest_snapshot &&
1245 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
1246 // For this user key_:
1247 // (1) there is no data in higher levels
1248 // (2) data in lower levels will have larger sequence numbers
1249 // (3) data in layers that are being compacted here and have
1250 // smaller sequence numbers will be dropped in the next
1251 // few iterations of this loop (by rule (A) above).
1252 // Therefore this deletion marker is obsolete and can be dropped.
1253 drop = true;
1254 }
1255
1256 last_sequence_for_key = ikey.sequence;
1257 }
1258 #if 0
1259 Log(options_.info_log,
1260 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
1261 "%d smallest_snapshot: %d",
1262 ikey.user_key.ToString().c_str(),
1263 (int)ikey.sequence, ikey.type, kTypeValue, drop,
1264 compact->compaction->IsBaseLevelForKey(ikey.user_key),
1265 (int)last_sequence_for_key, (int)compact->smallest_snapshot);
1266 #endif
1267
1268 if (!drop) {
1269 // Open output file if necessary
1270 if (compact->builder == nullptr) {
1271 status = OpenCompactionOutputFile(compact);
1272 if (!status.ok()) {
1273 break;
1274 }
1275 }
1276 if (compact->builder->NumEntries() == 0) {
1277 compact->current_output()->smallest.DecodeFrom(key);
1278 }
1279 compact->current_output()->largest.DecodeFrom(key);
1280 compact->builder->Add(key, input->value());
1281
1282 // Close output file if it is big enough
1283 if (compact->builder->FileSize() >=
1284 compact->compaction->MaxOutputFileSize()) {
1285 status = FinishCompactionOutputFile(compact, input);
1286 if (!status.ok()) {
1287 break;
1288 }
1289 }
1290 }
1291
1292 input->Next();
1293 }
1294
1295 if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1296 status = Status::IOError("Deleting DB during compaction");
1297 }
1298 if (status.ok() && compact->builder != nullptr) {
1299 status = FinishCompactionOutputFile(compact, input);
1300 }
1301 if (status.ok()) {
1302 status = input->status();
1303 }
1304 delete input;
1305 input = nullptr;
1306
1307 CompactionStats stats;
1308 stats.micros = env_->NowMicros() - start_micros - imm_micros;
1309 for (int which = 0; which < 2; which++) {
1310 for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1311 stats.bytes_read += compact->compaction->input(which, i)->file_size;
1312 }
1313 }
1314 for (size_t i = 0; i < compact->outputs.size(); i++) {
1315 stats.bytes_written += compact->outputs[i].file_size;
1316 }
1317
1318 mutex_.Lock();
1319 stats_[compact->compaction->level() + 1].Add(stats);
1320
1321 if (status.ok()) {
1322 status = InstallCompactionResults(compact);
1323 }
1324 if (!status.ok()) {
1325 RecordBackgroundError(status);
1326 }
1327 VersionSet::LevelSummaryStorage tmp;
1328 Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1329 return status;
1330 */
1331 }
1332}
1333
1334pub struct IterState {
1335 mu: *const Mutex<iter_state::Inner>,
1336}
1337
1338pub mod iter_state {
1339
1340 use super::*;
1341
1342 pub struct Inner {
1343 version: *const Version,
1344 mem: *const MemTable,
1345 imm: *const MemTable,
1346 }
1347}
1348
1349impl IterState {
1350
1351 pub fn new(
1352 mutex: *mut parking_lot::RawMutex,
1353 mem: *mut MemTable,
1354 imm: *mut MemTable,
1355 version: *mut Version) -> Self {
1356
1357 todo!();
1358 /*
1359 : mu(mutex),
1360 : version(version),
1361 : mem(mem),
1362 : imm(imm),
1363
1364
1365 */
1366 }
1367}
1368
1369impl GetSnapshot for DBImpl {
1370
1371 fn get_snapshot(&mut self) -> Box<dyn Snapshot> {
1372
1373 todo!();
1374 /*
1375 MutexLock l(&mutex_);
1376 return snapshots_.New(versions_->LastSequence());
1377 */
1378 }
1379}
1380
1381impl NewIterator for DBImpl {
1382
1383 fn new_iterator(&mut self, options: &ReadOptions) -> *mut LevelDBIterator {
1384
1385 todo!();
1386 /*
1387 SequenceNumber latest_snapshot;
1388 uint32_t seed;
1389 Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1390 return NewDBIterator(this, user_comparator(), iter,
1391 (options.snapshot != nullptr
1392 ? static_cast<const SnapshotImpl*>(options.snapshot)
1393 ->sequence_number()
1394 : latest_snapshot),
1395 seed);
1396 */
1397 }
1398}
1399
1400impl Get for DBImpl {
1401
1402 fn get(&mut self,
1403 options: &ReadOptions,
1404 key_: &Slice,
1405 value: *mut String) -> crate::Status {
1406
1407 todo!();
1408 /*
1409 Status s;
1410 MutexLock l(&mutex_);
1411 SequenceNumber snapshot;
1412 if (options.snapshot != nullptr) {
1413 snapshot =
1414 static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1415 } else {
1416 snapshot = versions_->LastSequence();
1417 }
1418
1419 MemTable* mem = mem_;
1420 MemTable* imm = imm_;
1421 Version* current = versions_->current();
1422 mem->Ref();
1423 if (imm != nullptr) imm->Ref();
1424 current->Ref();
1425
1426 bool have_stat_update = false;
1427 Version::GetStats stats;
1428
1429 // Unlock while reading from files and memtables
1430 {
1431 mutex_.Unlock();
1432 // First look in the memtable, then in the immutable memtable (if any).
1433 LookupKey lkey(key, snapshot);
1434 if (mem->Get(lkey, value, &s)) {
1435 // Done
1436 } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1437 // Done
1438 } else {
1439 s = current->Get(options, lkey, value, &stats);
1440 have_stat_update = true;
1441 }
1442 mutex_.Lock();
1443 }
1444
1445 if (have_stat_update && current->UpdateStats(stats)) {
1446 MaybeScheduleCompaction();
1447 }
1448 mem->Unref();
1449 if (imm != nullptr) imm->Unref();
1450 current->Unref();
1451 return s;
1452 */
1453 }
1454}
1455
1456impl ReleaseSnapshot for DBImpl {
1457
1458 fn release_snapshot(&mut self, snapshot: Box<dyn Snapshot>) {
1459
1460 todo!();
1461 /*
1462 MutexLock l(&mutex_);
1463 snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1464 */
1465 }
1466}
1467
1468impl GetProperty for DBImpl {
1469
1470 fn get_property(&mut self,
1471 property: &str,
1472 value: *mut String) -> bool {
1473
1474 todo!();
1475 /*
1476 value->clear();
1477
1478 MutexLock l(&mutex_);
1479 Slice in = property;
1480 Slice prefix("leveldb.");
1481 if (!in.starts_with(prefix)) return false;
1482 in.remove_prefix(prefix.size());
1483
1484 if (in.starts_with("num-files-at-level")) {
1485 in.remove_prefix(strlen("num-files-at-level"));
1486 uint64_t level;
1487 bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1488 if (!ok || level >= config::kNumLevels) {
1489 return false;
1490 } else {
1491 char buf[100];
1492 snprintf(buf, sizeof(buf), "%d",
1493 versions_->NumLevelFiles(static_cast<int>(level)));
1494 *value = buf;
1495 return true;
1496 }
1497 } else if (in == "stats") {
1498 char buf[200];
1499 snprintf(buf, sizeof(buf),
1500 " Compactions\n"
1501 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1502 "--------------------------------------------------\n");
1503 value->append(buf);
1504 for (int level = 0; level < config::kNumLevels; level++) {
1505 int files = versions_->NumLevelFiles(level);
1506 if (stats_[level].micros > 0 || files > 0) {
1507 snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1508 files, versions_->NumLevelBytes(level) / 1048576.0,
1509 stats_[level].micros / 1e6,
1510 stats_[level].bytes_read / 1048576.0,
1511 stats_[level].bytes_written / 1048576.0);
1512 value->append(buf);
1513 }
1514 }
1515 return true;
1516 } else if (in == "sstables") {
1517 *value = versions_->current()->DebugString();
1518 return true;
1519 } else if (in == "approximate-memory-usage") {
1520 size_t total_usage = options_.block_cache->TotalCharge();
1521 if (mem_) {
1522 total_usage += mem_->ApproximateMemoryUsage();
1523 }
1524 if (imm_) {
1525 total_usage += imm_->ApproximateMemoryUsage();
1526 }
1527 char buf[50];
1528 snprintf(buf, sizeof(buf), "%llu",
1529 static_cast<unsigned long long>(total_usage));
1530 value->append(buf);
1531 return true;
1532 }
1533
1534 return false;
1535 */
1536 }
1537}
1538
1539impl db::Write for DBImpl {
1540
1541 fn write(&mut self,
1542 options: &WriteOptions,
1543 updates: *mut WriteBatch) -> crate::Status {
1544
1545 todo!();
1546 /*
1547 Writer w(&mutex_);
1548 w.batch = updates;
1549 w.sync = options.sync;
1550 w.done = false;
1551
1552 MutexLock l(&mutex_);
1553 writers_.push_back(&w);
1554 while (!w.done && &w != writers_.front()) {
1555 w.cv.Wait();
1556 }
1557 if (w.done) {
1558 return w.status;
1559 }
1560
1561 // May temporarily unlock and wait.
1562 Status status = MakeRoomForWrite(updates == nullptr);
1563 uint64_t last_sequence = versions_->LastSequence();
1564 Writer* last_writer = &w;
1565 if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
1566 WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1567 WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1568 last_sequence += WriteBatchInternal::Count(write_batch);
1569
1570 // Add to log and apply to memtable. We can release the lock
1571 // during this phase since &w is currently responsible for logging
1572 // and protects against concurrent loggers and concurrent writes
1573 // into mem_.
1574 {
1575 mutex_.Unlock();
1576 status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1577 bool sync_error = false;
1578 if (status.ok() && options.sync) {
1579 status = logfile_->Sync();
1580 if (!status.ok()) {
1581 sync_error = true;
1582 }
1583 }
1584 if (status.ok()) {
1585 status = WriteBatchInternal::InsertInto(write_batch, mem_);
1586 }
1587 mutex_.Lock();
1588 if (sync_error) {
1589 // The state of the log file is indeterminate: the log record we
1590 // just added may or may not show up when the DB is re-opened.
1591 // So we force the DB into a mode where all future writes fail.
1592 RecordBackgroundError(status);
1593 }
1594 }
1595 if (write_batch == tmp_batch_) tmp_batch_->Clear();
1596
1597 versions_->SetLastSequence(last_sequence);
1598 }
1599
1600 while (true) {
1601 Writer* ready = writers_.front();
1602 writers_.pop_front();
1603 if (ready != &w) {
1604 ready->status = status;
1605 ready->done = true;
1606 ready->cv.Signal();
1607 }
1608 if (ready == last_writer) break;
1609 }
1610
1611 // Notify new head of write queue
1612 if (!writers_.empty()) {
1613 writers_.front()->cv.Signal();
1614 }
1615
1616 return status;
1617 */
1618 }
1619}
1620
1621impl Put for DBImpl {
1622
1623 /**
1624 | Convenience methods
1625 |
1626 */
1627 fn put(&mut self,
1628 o: &WriteOptions,
1629 key_: &Slice,
1630 val: &Slice) -> crate::Status {
1631
1632 todo!();
1633 /*
1634 return DB::Put(o, key, val);
1635 */
1636 }
1637}
1638
1639impl Delete for DBImpl {
1640
1641 fn delete(&mut self,
1642 options: &WriteOptions,
1643 key_: &Slice) -> crate::Status {
1644
1645 todo!();
1646 /*
1647 return DB::Delete(options, key);
1648 */
1649 }
1650}
1651
1652impl DBImpl {
1653
1654 pub fn new_internal_iterator(&mut self,
1655 options: &ReadOptions,
1656 latest_snapshot: *mut SequenceNumber,
1657 seed: *mut u32) -> *mut LevelDBIterator {
1658
1659 todo!();
1660 /*
1661 mutex_.Lock();
1662 *latest_snapshot = versions_->LastSequence();
1663
1664 // Collect together all needed child iterators
1665 std::vector<Iterator*> list;
1666 list.push_back(mem_->NewIterator());
1667 mem_->Ref();
1668 if (imm_ != nullptr) {
1669 list.push_back(imm_->NewIterator());
1670 imm_->Ref();
1671 }
1672 versions_->current()->AddIterators(options, &list);
1673 Iterator* internal_iter =
1674 NewMergingIterator(&internal_comparator_, &list[0], list.size());
1675 versions_->current()->Ref();
1676
1677 IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1678 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1679
1680 *seed = ++seed_;
1681 mutex_.Unlock();
1682 return internal_iter;
1683 */
1684 }
1685
1686 /**
1687 | Return an internal iterator over the current
1688 | state of the database.
1689 |
1690 | The keys of this iterator are internal keys
1691 | (see format.h).
1692 |
1693 | The returned iterator should be deleted when
1694 | no longer needed.
1695 */
1696 pub fn test_new_internal_iterator(&mut self) -> *mut LevelDBIterator {
1697
1698 todo!();
1699 /*
1700 SequenceNumber ignored;
1701 uint32_t ignored_seed;
1702 return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1703 */
1704 }
1705
1706 /**
1707 | Return the maximum overlapping data
1708 | (in bytes) at next level for any file
1709 | at a level >= 1.
1710 |
1711 */
1712 pub fn test_max_next_level_overlapping_bytes(&mut self) -> i64 {
1713
1714 todo!();
1715 /*
1716 MutexLock l(&mutex_);
1717 return versions_->MaxNextLevelOverlappingBytes();
1718 */
1719 }
1720
1721 /**
1722 | Record a sample of bytes read at the
1723 | specified internal key.
1724 |
1725 | Samples are taken approximately once every
1726 | config::kReadBytesPeriod bytes.
1727 */
1728 pub fn record_read_sample(&mut self, key_: Slice) {
1729
1730 todo!();
1731 /*
1732 MutexLock l(&mutex_);
1733 if (versions_->current()->RecordReadSample(key)) {
1734 MaybeScheduleCompaction();
1735 }
1736 */
1737 }
1738
1739 /**
1740 | REQUIRES: Writer list must be non-empty
1741 |
1742 | REQUIRES: First writer must have a non-null
1743 | batch
1744 */
1745 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
1746 pub fn build_batch_group(&mut self, last_writer: *mut *mut DBImplWriter) -> *mut WriteBatch {
1747
1748 todo!();
1749 /*
1750 mutex_.AssertHeld();
1751 assert(!writers_.empty());
1752 Writer* first = writers_.front();
1753 WriteBatch* result = first->batch;
1754 assert(result != nullptr);
1755
1756 size_t size = WriteBatchInternal::ByteSize(first->batch);
1757
1758 // Allow the group to grow up to a maximum size, but if the
1759 // original write is small, limit the growth so we do not slow
1760 // down the small write too much.
1761 size_t max_size = 1 << 20;
1762 if (size <= (128 << 10)) {
1763 max_size = size + (128 << 10);
1764 }
1765
1766 *last_writer = first;
1767 std::deque<Writer*>::iterator iter = writers_.begin();
1768 ++iter; // Advance past "first"
1769 for (; iter != writers_.end(); ++iter) {
1770 Writer* w = *iter;
1771 if (w->sync && !first->sync) {
1772 // Do not include a sync write into a batch handled by a non-sync write.
1773 break;
1774 }
1775
1776 if (w->batch != nullptr) {
1777 size += WriteBatchInternal::ByteSize(w->batch);
1778 if (size > max_size) {
1779 // Do not make batch too big
1780 break;
1781 }
1782
1783 // Append to *result
1784 if (result == first->batch) {
1785 // Switch to temporary batch instead of disturbing caller's batch
1786 result = tmp_batch_;
1787 assert(WriteBatchInternal::Count(result) == 0);
1788 WriteBatchInternal::Append(result, first->batch);
1789 }
1790 WriteBatchInternal::Append(result, w->batch);
1791 }
1792 *last_writer = w;
1793 }
1794 return result;
1795 */
1796 }
1797
1798 /**
1799 | REQUIRES: mutex_ is held
1800 |
1801 | REQUIRES: this thread is currently at the front
1802 | of the writer queue
1803 |
1804 | force - compact even if there is room?
1805 |
1806 */
1807 #[EXCLUSIVE_LOCKS_REQUIRED(mutex_)]
1808 pub fn make_room_for_write(&mut self, force: bool) -> crate::Status {
1809
1810 todo!();
1811 /*
1812 mutex_.AssertHeld();
1813 assert(!writers_.empty());
1814 bool allow_delay = !force;
1815 Status s;
1816 while (true) {
1817 if (!bg_error_.ok()) {
1818 // Yield previous error
1819 s = bg_error_;
1820 break;
1821 } else if (allow_delay && versions_->NumLevelFiles(0) >=
1822 config::kL0_SlowdownWritesTrigger) {
1823 // We are getting close to hitting a hard limit on the number of
1824 // L0 files. Rather than delaying a single write by several
1825 // seconds when we hit the hard limit, start delaying each
1826 // individual write by 1ms to reduce latency variance. Also,
1827 // this delay hands over some CPU to the compaction thread in
1828 // case it is sharing the same core as the writer.
1829 mutex_.Unlock();
1830 env_->SleepForMicroseconds(1000);
1831 allow_delay = false; // Do not delay a single write more than once
1832 mutex_.Lock();
1833 } else if (!force &&
1834 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1835 // There is room in current memtable
1836 break;
1837 } else if (imm_ != nullptr) {
1838 // We have filled up the current memtable, but the previous
1839 // one is still being compacted, so we wait.
1840 Log(options_.info_log, "Current memtable full; waiting...\n");
1841 background_work_finished_signal_.Wait();
1842 } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1843 // There are too many level-0 files.
1844 Log(options_.info_log, "Too many L0 files; waiting...\n");
1845 background_work_finished_signal_.Wait();
1846 } else {
1847 // Attempt to switch to a new memtable and trigger compaction of old
1848 assert(versions_->PrevLogNumber() == 0);
1849 uint64_t new_log_number = versions_->NewFileNumber();
1850 WritableFile* lfile = nullptr;
1851 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1852 if (!s.ok()) {
1853 // Avoid chewing through file number space in a tight loop.
1854 versions_->ReuseFileNumber(new_log_number);
1855 break;
1856 }
1857 delete log_;
1858 delete logfile_;
1859 logfile_ = lfile;
1860 logfile_number_ = new_log_number;
1861 log_ = new LogWriter(lfile);
1862 imm_ = mem_;
1863 has_imm_.store(true, std::memory_order_release);
1864 mem_ = new MemTable(internal_comparator_);
1865 mem_->Ref();
1866 force = false; // Do not force another compaction if have room
1867 MaybeScheduleCompaction();
1868 }
1869 }
1870 return s;
1871 */
1872 }
1873}
1874
1875pub fn destroydb(
1876 dbname: &String,
1877 options: &Options) -> crate::Status {
1878
1879 todo!();
1880 /*
1881 Env* env = options.env;
1882 std::vector<std::string> filenames;
1883 Status result = env->GetChildren(dbname, &filenames);
1884 if (!result.ok()) {
1885 // Ignore error in case directory does not exist
1886 return Status::OK();
1887 }
1888
1889 FileLock* lock;
1890 const std::string lockname = LockFileName(dbname);
1891 result = env->LockFile(lockname, &lock);
1892 if (result.ok()) {
1893 uint64_t number;
1894 FileType type;
1895 for (size_t i = 0; i < filenames.size(); i++) {
1896 if (ParseFileName(filenames[i], &number, &type) &&
1897 type != kDBLockFile) { // Lock file will be deleted at end
1898 Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1899 if (result.ok() && !del.ok()) {
1900 result = del;
1901 }
1902 }
1903 }
1904 env->UnlockFile(lock); // Ignore error since state is already gone
1905 env->DeleteFile(lockname);
1906 env->DeleteDir(dbname); // Ignore error in case dir contains other files
1907 }
1908 return result;
1909 */
1910}
1911
1912pub fn cleanup_iterator_state(
1913 arg1: *mut c_void,
1914 arg2: *mut c_void) {
1915
1916 todo!();
1917 /*
1918 IterState* state = reinterpret_cast<IterState*>(arg1);
1919 state->mu->Lock();
1920 state->mem->Unref();
1921 if (state->imm != nullptr) state->imm->Unref();
1922 state->version->Unref();
1923 state->mu->Unlock();
1924 delete state;
1925 */
1926}