1use std::{
2 cell::Cell,
3 mem,
4 path::{Path, PathBuf},
5 sync::Arc,
6 thread::{self, JoinHandle},
7 time::{Duration, Instant},
8};
9
10use crossbeam_channel::{select, Receiver, Sender};
11use dashmap::{mapref::one::RefMut, DashMap};
12use parking_lot::{Mutex, MutexGuard};
13
14use bitcasky_common::{
15 clock::Clock,
16 formatter::{BitcaskyFormatter, RowToWrite},
17 fs::{self as SelfFs, FileType},
18 options::BitcaskyOptions,
19 storage_id::{StorageId, StorageIdGenerator},
20};
21
22use crate::{
23 common::{DatabaseError, DatabaseResult},
24 data_storage::DataStorageTelemetry,
25 hint::{self, HintWriter},
26};
27
28use log::{debug, error, info, trace, warn};
29
30use super::{
31 common::{RecoveredRow, TimedValue},
32 data_storage::{DataStorage, DataStorageReader, DataStorageWriter, StorageIter},
33 DataStorageError,
34};
35use super::{
36 common::{RowLocation, RowToRead},
37 hint::HintFile,
38};
39#[derive(Debug)]
44pub struct DatabaseTelemetry {
45 pub writing_storage: DataStorageTelemetry,
46 pub stable_storages: Vec<DataStorageTelemetry>,
47 pub hint_file_writer: hint::HintWriterTelemetry,
48}
49
50#[derive(Debug)]
51pub struct StorageIds {
52 pub stable_storage_ids: Vec<StorageId>,
53 pub writing_storage_id: StorageId,
54}
55
56#[derive(Debug)]
57pub struct Database {
58 pub database_dir: PathBuf,
59 storage_id_generator: Arc<StorageIdGenerator>,
60 writing_storage: Arc<Mutex<DataStorage>>,
61 stable_storages: DashMap<StorageId, Mutex<DataStorage>>,
62 options: Arc<BitcaskyOptions>,
63 hint_file_writer: Option<HintWriter>,
64 sync_worker: Option<SyncWorker>,
66 formatter: Arc<BitcaskyFormatter>,
67 is_error: Mutex<Option<String>>,
68}
69
70impl Database {
71 pub fn open(
72 directory: &Path,
73 storage_id_generator: Arc<StorageIdGenerator>,
74 options: Arc<BitcaskyOptions>,
75 ) -> DatabaseResult<Database> {
76 let database_dir: PathBuf = directory.into();
77
78 debug!(target: "Database", "opening database at directory {:?}", directory);
79
80 hint::clear_temp_hint_file_directory(&database_dir);
81
82 let data_storage_ids = SelfFs::get_storage_ids_in_dir(&database_dir, FileType::DataFile);
83 if let Some(id) = data_storage_ids.iter().max() {
84 storage_id_generator.update_id(*id);
85 }
86
87 let hint_file_writer = Some(HintWriter::start(&database_dir, options.clone()));
88
89 let formatter = Arc::new(BitcaskyFormatter::default());
90 let (writing_storage, storages) = prepare_db_storages(
91 &database_dir,
92 &data_storage_ids,
93 &storage_id_generator,
94 formatter.clone(),
95 options.clone(),
96 )?;
97
98 let stable_storages = storages.into_iter().fold(DashMap::new(), |m, s| {
99 m.insert(s.storage_id(), Mutex::new(s));
100 m
101 });
102
103 let writing_storage = Arc::new(Mutex::new(writing_storage));
104 let mut db = Database {
105 writing_storage,
106 storage_id_generator,
107 database_dir,
108 stable_storages,
109 options: options.clone(),
110 hint_file_writer,
111 sync_worker: None,
112 formatter,
113 is_error: Mutex::new(None),
114 };
115
116 if options.database.sync_interval_sec > 0 {
117 db.sync_worker = Some(SyncWorker::start_sync_worker(
118 db.writing_storage.clone(),
119 options.database.sync_interval_sec,
120 ));
121 }
122
123 info!(target: "Database", "database opened at directory: {:?}, with {} data files", directory, data_storage_ids.len());
124 Ok(db)
125 }
126
127 pub fn get_database_dir(&self) -> &Path {
128 &self.database_dir
129 }
130
131 pub fn get_max_storage_id(&self) -> StorageId {
132 let writing_file_ref = self.writing_storage.lock();
133 writing_file_ref.storage_id()
134 }
135
136 pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
137 &self,
138 key: K,
139 value: TimedValue<V>,
140 ) -> DatabaseResult<RowLocation> {
141 let ts = value.expire_timestamp;
142 let row: RowToWrite<K, TimedValue<V>> = RowToWrite::new_with_timestamp(key, value, ts);
143 let mut writing_file_ref = self.writing_storage.lock();
144
145 match writing_file_ref.write_row(&row) {
146 Err(DataStorageError::StorageOverflow(id)) => {
147 debug!("Flush writing storage with id: {} on overflow", id);
148 self.do_flush_writing_file(&mut writing_file_ref)?;
149 Ok(writing_file_ref.write_row(&row)?)
150 }
151 r => Ok(r?),
152 }
153 }
154
155 pub fn flush_writing_file(&self) -> DatabaseResult<()> {
156 let mut writing_file_ref = self.writing_storage.lock();
157 debug!(
158 "Flush writing file with id: {}",
159 writing_file_ref.storage_id()
160 );
161 self.do_flush_writing_file(&mut writing_file_ref)?;
163
164 Ok(())
165 }
166
167 pub fn recovery_iter(&self) -> DatabaseResult<DatabaseRecoverIter> {
168 let mut storage_ids: Vec<StorageId>;
169 {
170 let writing_storage = self.writing_storage.lock();
171 let writing_storage_id = writing_storage.storage_id();
172
173 storage_ids = self
174 .stable_storages
175 .iter()
176 .map(|f| f.lock().storage_id())
177 .collect::<Vec<StorageId>>();
178 storage_ids.push(writing_storage_id);
179 storage_ids.sort();
180 storage_ids.reverse();
181 }
182 DatabaseRecoverIter::new(self.database_dir.clone(), storage_ids, self.options.clone())
183 }
184
185 pub fn iter(&self) -> DatabaseResult<DatabaseIter> {
186 let mut storage_ids: Vec<StorageId>;
187 {
188 let writing_storage = self.writing_storage.lock();
189 let writing_storage_id = writing_storage.storage_id();
190
191 storage_ids = self
192 .stable_storages
193 .iter()
194 .map(|f| f.lock().storage_id())
195 .collect::<Vec<StorageId>>();
196 storage_ids.push(writing_storage_id);
197 }
198
199 let files: DatabaseResult<Vec<DataStorage>> = storage_ids
200 .iter()
201 .map(|f| {
202 DataStorage::open(&self.database_dir, *f, self.options.clone())
203 .map_err(DatabaseError::StorageError)
204 })
205 .collect();
206
207 let mut opened_stable_files = files?;
208 opened_stable_files.sort_by_key(|e| e.storage_id());
209 let iters: crate::data_storage::Result<Vec<StorageIter>> =
210 opened_stable_files.iter().rev().map(|f| f.iter()).collect();
211
212 Ok(DatabaseIter::new(iters?))
213 }
214
215 pub fn read_value(
216 &self,
217 row_location: &RowLocation,
218 ) -> DatabaseResult<Option<TimedValue<Vec<u8>>>> {
219 {
220 let mut writing_file_ref = self.writing_storage.lock();
221 if row_location.storage_id == writing_file_ref.storage_id() {
222 return Ok(writing_file_ref.read_value(row_location.row_offset)?);
223 }
224 }
225
226 let l = self.get_file_to_read(row_location.storage_id)?;
227 let mut f = l.lock();
228 let ret = f.read_value(row_location.row_offset)?;
229 Ok(ret)
230 }
231
232 pub fn reload_data_files(&self, data_storage_ids: Vec<StorageId>) -> DatabaseResult<()> {
233 let (writing, stables) = prepare_db_storages(
234 &self.database_dir,
235 &data_storage_ids,
236 &self.storage_id_generator,
237 self.formatter.clone(),
238 self.options.clone(),
239 )?;
240
241 {
242 let mut writing_storage_ref = self.writing_storage.lock();
243 debug!(
244 "reload writing storage with id: {}",
245 writing_storage_ref.storage_id()
246 );
247 let _ = mem::replace(&mut *writing_storage_ref, writing);
248 }
249
250 self.stable_storages.clear();
251
252 for s in stables {
253 if self.stable_storages.contains_key(&s.storage_id()) {
254 core::panic!("file id: {} already loaded in database", s.storage_id());
255 }
256 debug!("reload stable file with id: {}", s.storage_id());
257 self.stable_storages.insert(s.storage_id(), Mutex::new(s));
258 }
259 Ok(())
260 }
261
262 pub fn get_storage_ids(&self) -> StorageIds {
263 let writing_file_ref = self.writing_storage.lock();
264 let writing_storage_id = writing_file_ref.storage_id();
265 let stable_storage_ids: Vec<StorageId> = self
266 .stable_storages
267 .iter()
268 .map(|f| f.value().lock().storage_id())
269 .collect();
270 StorageIds {
271 stable_storage_ids,
272 writing_storage_id,
273 }
274 }
275
276 pub fn get_telemetry_data(&self) -> DatabaseTelemetry {
277 let writing_storage = { self.writing_storage.lock().get_telemetry_data() };
278 let stable_storages: Vec<DataStorageTelemetry> = {
279 self.stable_storages
280 .iter()
281 .map(|s| s.lock().get_telemetry_data())
282 .collect()
283 };
284
285 DatabaseTelemetry {
286 hint_file_writer: self
287 .hint_file_writer
288 .as_ref()
289 .map(|h| h.get_telemetry_data())
290 .unwrap_or_default(),
291 writing_storage,
292 stable_storages,
293 }
294 }
295
296 pub fn drop(&self) -> DatabaseResult<()> {
298 debug!("Drop database called");
299
300 {
301 let mut writing_file_ref = self.writing_storage.lock();
302 debug!(
303 "Flush writing file with id: {} on drop database",
304 writing_file_ref.storage_id()
305 );
306 self.do_flush_writing_file(&mut writing_file_ref)?;
308 }
309 for storage_id in self.stable_storages.iter().map(|v| v.lock().storage_id()) {
310 SelfFs::delete_file(&self.database_dir, FileType::DataFile, Some(storage_id))?;
311 SelfFs::delete_file(&self.database_dir, FileType::HintFile, Some(storage_id))?;
312 }
313 self.stable_storages.clear();
314 Ok(())
315 }
316
317 pub fn sync(&self) -> DatabaseResult<()> {
318 let mut f = self.writing_storage.lock();
319 f.flush()?;
320 Ok(())
321 }
322
323 pub fn mark_db_error(&self, error_string: String) {
324 let mut err = self.is_error.lock();
325 *err = Some(error_string)
326 }
327
328 pub fn check_db_error(&self) -> Result<(), DatabaseError> {
329 let err = self.is_error.lock();
330 if err.is_some() {
331 return Err(DatabaseError::DatabaseBroken(err.as_ref().unwrap().clone()));
332 }
333 Ok(())
334 }
335
336 fn do_flush_writing_file(
337 &self,
338 writing_file_ref: &mut MutexGuard<DataStorage>,
339 ) -> DatabaseResult<()> {
340 if !writing_file_ref.is_dirty() {
341 debug!(
342 "Skip flush empty wirting file with id: {}",
343 writing_file_ref.storage_id()
344 );
345 return Ok(());
346 }
347 let next_storage_id = self.storage_id_generator.generate_next_id();
348 let next_writing_file = DataStorage::new(
349 &self.database_dir,
350 next_storage_id,
351 self.formatter.clone(),
352 self.options.clone(),
353 )?;
354 let mut old_storage = mem::replace(&mut **writing_file_ref, next_writing_file);
355 old_storage.flush()?;
356 let storage_id = old_storage.storage_id();
357 self.stable_storages
358 .insert(storage_id, Mutex::new(old_storage));
359 if let Some(w) = self.hint_file_writer.as_ref() {
360 w.async_write_hint_file(storage_id);
361 }
362 debug!(target: "Database", "writing file with id: {} flushed, new writing file with id: {} created", storage_id, next_storage_id);
363 Ok(())
364 }
365
366 fn get_file_to_read(
367 &self,
368 storage_id: StorageId,
369 ) -> DatabaseResult<RefMut<StorageId, Mutex<DataStorage>>> {
370 self.stable_storages
371 .get_mut(&storage_id)
372 .ok_or(DatabaseError::TargetFileIdNotFound(storage_id))
373 }
374}
375
376impl Drop for Database {
377 fn drop(&mut self) {
378 let mut writing_file_ref = self.writing_storage.lock();
379 if let Err(e) = writing_file_ref.flush() {
380 warn!(target: "Database", "sync database failed: {}", e)
381 }
382
383 if let Some(worker) = self.sync_worker.take() {
384 drop(worker);
385 }
386
387 if let Some(hint_w) = self.hint_file_writer.take() {
388 drop(hint_w);
389 }
390
391 info!(target: "Database", "database on directory: {:?} closed", self.database_dir)
392 }
393}
394
395#[derive(Debug)]
396struct SyncWorker {
397 stop_sender: Sender<()>,
398 handle: Option<JoinHandle<()>>,
399}
400
401impl SyncWorker {
402 fn start_sync_worker(
403 datastorage: Arc<Mutex<DataStorage>>,
404 sync_interval_sec: u64,
405 ) -> SyncWorker {
406 let channel = crossbeam_channel::bounded(1);
407 let stop_sender = channel.0;
408 let stop_receiver: Receiver<()> = channel.1;
409
410 let sync_duration = Duration::from_secs(sync_interval_sec);
411 let receiver = crossbeam_channel::tick(sync_duration);
412 let handle = thread::spawn(move || {
413 let mut last_sync = Instant::now();
414 loop {
415 select! {
416 recv(stop_receiver) -> _ => {
417 info!(target: "Database", "stopping sync worker");
418 return
419 }
420
421 recv(receiver) -> _ => {
422 if last_sync.elapsed() < sync_duration {
423 continue;
424 }
425
426 trace!("Attempting syncing");
427 let mut f = datastorage.lock();
428 if let Err(e) = f.flush() {
429 error!(target: "Database", "flush database failed: {}", e);
430 }
431 last_sync = Instant::now();
432 },
433 }
434 }
435 });
436 SyncWorker {
437 stop_sender,
438 handle: Some(handle),
439 }
440 }
441}
442
443impl Drop for SyncWorker {
444 fn drop(&mut self) {
445 if self.stop_sender.send(()).is_err() {
446 warn!("Failed to stop sync worker.");
447 }
448
449 if let Some(handle) = self.handle.take() {
450 if handle.join().is_err() {
451 warn!(target: "Database", "wait sync worker done failed");
452 }
453 }
454 }
455}
456
457pub struct DatabaseIter {
458 current_iter: Cell<Option<StorageIter>>,
459 remain_iters: Vec<StorageIter>,
460}
461
462impl DatabaseIter {
463 fn new(mut iters: Vec<StorageIter>) -> Self {
464 if iters.is_empty() {
465 DatabaseIter {
466 remain_iters: iters,
467 current_iter: Cell::new(None),
468 }
469 } else {
470 let current_iter = iters.pop();
471 DatabaseIter {
472 remain_iters: iters,
473 current_iter: Cell::new(current_iter),
474 }
475 }
476 }
477}
478
479impl Iterator for DatabaseIter {
480 type Item = DatabaseResult<RowToRead>;
481
482 fn next(&mut self) -> Option<Self::Item> {
483 loop {
484 match self.current_iter.get_mut() {
485 None => break,
486 Some(iter) => match iter.next() {
487 None => {
488 self.current_iter.replace(self.remain_iters.pop());
489 }
490 other => return other.map(|r| r.map_err(DatabaseError::StorageError)),
491 },
492 }
493 }
494 None
495 }
496}
497
498fn recovered_iter(
499 database_dir: &Path,
500 storage_id: StorageId,
501 options: Arc<BitcaskyOptions>,
502) -> DatabaseResult<Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>>> {
503 if FileType::HintFile
504 .get_path(database_dir, Some(storage_id))
505 .exists()
506 {
507 debug!(target: "Database", "recover from hint file with id: {}", storage_id);
508 Ok(Box::new(HintFile::open_iterator(database_dir, storage_id)?))
509 } else {
510 debug!(target: "Database", "recover from data file with id: {}", storage_id);
511 let stable_file = DataStorage::open(database_dir, storage_id, options.clone())?;
512 let i = stable_file.iter().map(move |iter| {
513 iter.map(move |row| {
514 row.map(|r| RecoveredRow {
515 row_location: r.row_location,
516 key: r.key,
517 invalid: !r.value.is_valid(options.clock.now()),
518 })
519 .map_err(DatabaseError::StorageError)
520 })
521 })?;
522 Ok(Box::new(i))
523 }
524}
525
526pub struct DatabaseRecoverIter {
527 current_iter: Cell<Option<Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>>>>,
528 data_storage_ids: Vec<StorageId>,
529 database_dir: PathBuf,
530 options: Arc<BitcaskyOptions>,
531}
532
533impl DatabaseRecoverIter {
534 fn new(
535 database_dir: PathBuf,
536 mut iters: Vec<StorageId>,
537 options: Arc<BitcaskyOptions>,
538 ) -> DatabaseResult<Self> {
539 if let Some(id) = iters.pop() {
540 let iter: Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>> =
541 recovered_iter(&database_dir, id, options.clone())?;
542 Ok(DatabaseRecoverIter {
543 database_dir,
544 data_storage_ids: iters,
545 current_iter: Cell::new(Some(iter)),
546 options,
547 })
548 } else {
549 Ok(DatabaseRecoverIter {
550 database_dir,
551 data_storage_ids: iters,
552 current_iter: Cell::new(None),
553 options,
554 })
555 }
556 }
557}
558
559impl Iterator for DatabaseRecoverIter {
560 type Item = DatabaseResult<RecoveredRow>;
561
562 fn next(&mut self) -> Option<Self::Item> {
563 loop {
564 match self.current_iter.get_mut() {
565 None => break,
566 Some(iter) => match iter.next() {
567 None => {
568 if let Some(id) = self.data_storage_ids.pop() {
569 match recovered_iter(&self.database_dir, id, self.options.clone()) {
570 Ok(iter) => {
571 self.current_iter.replace(Some(iter));
572 }
573 Err(e) => return Some(Err(e)),
574 }
575 } else {
576 break;
577 }
578 }
579 other => return other,
580 },
581 }
582 }
583 None
584 }
585}
586
587fn open_storages<P: AsRef<Path>>(
588 database_dir: P,
589 data_storage_ids: &[u32],
590 options: Arc<BitcaskyOptions>,
591) -> DatabaseResult<Vec<DataStorage>> {
592 let mut storage_ids = data_storage_ids.to_owned();
593 storage_ids.sort();
594
595 Ok(storage_ids
596 .iter()
597 .map(|id| DataStorage::open(&database_dir, *id, options.clone()))
598 .collect::<crate::data_storage::Result<Vec<DataStorage>>>()?)
599}
600
601fn prepare_db_storages<P: AsRef<Path>>(
602 database_dir: P,
603 data_storage_ids: &[u32],
604 storage_id_generator: &StorageIdGenerator,
605 formatter: Arc<BitcaskyFormatter>,
606 options: Arc<BitcaskyOptions>,
607) -> DatabaseResult<(DataStorage, Vec<DataStorage>)> {
608 let mut storages = open_storages(&database_dir, data_storage_ids, options.clone())?;
609 let mut writing_storage;
610 if storages.is_empty() {
611 let writing_storage_id = storage_id_generator.generate_next_id();
612 let storage = DataStorage::new(&database_dir, writing_storage_id, formatter, options)?;
613 debug!(target: "Database", "create writing file with id: {}", writing_storage_id);
614 writing_storage = storage;
615 } else {
616 writing_storage = storages.pop().unwrap();
617 if let Err(e) = writing_storage.seek_to_end() {
618 match e {
619 DataStorageError::EofError() => {
620 warn!(target: "Database", "got EOF in writing file with id: {}", writing_storage.storage_id());
621 }
622 DataStorageError::DataStorageFormatter(e) => {
623 warn!(target: "Database", "has invalid data in writing file with id: {}, reason: {}", writing_storage.storage_id(), e);
624 }
625 _ => return Err(DatabaseError::StorageError(e)),
626 }
627 }
628 debug!(target: "Database", "reuse writing file with id: {}", writing_storage.storage_id());
629 }
630
631 Ok((writing_storage, storages))
632}
633
634#[cfg(test)]
635pub mod database_tests {
636 use std::{
637 io::{Seek, Write},
638 sync::Arc,
639 time::Duration,
640 };
641
642 use bitcasky_common::{
643 clock::DebugClock, fs, fs::FileType, options::BitcaskyOptions,
644 storage_id::StorageIdGenerator,
645 };
646 use utilities::common::{get_temporary_directory_path, TestingKV};
647
648 use test_log::test;
649
650 use crate::{data_storage::DataStorageReader, RowLocation, TimedValue};
651
652 use super::Database;
653
654 #[derive(Debug)]
655 pub struct TestingRow {
656 pub kv: TestingKV,
657 pub pos: RowLocation,
658 }
659
660 impl TestingRow {
661 pub fn new(kv: TestingKV, pos: RowLocation) -> Self {
662 TestingRow { kv, pos }
663 }
664 }
665
666 fn get_database_options() -> BitcaskyOptions {
667 BitcaskyOptions::default()
668 .max_data_file_size(1024)
669 .init_data_file_capacity(100)
670 .sync_interval(Duration::from_secs(60))
671 .init_hint_file_capacity(1024)
672 }
673
674 pub fn assert_rows_value(db: &Database, expect: &Vec<TestingRow>) {
675 for row in expect {
676 assert_row_value(db, row);
677 }
678 }
679
680 pub fn assert_row_value(db: &Database, expect: &TestingRow) {
681 let actual = db.read_value(&expect.pos).unwrap();
682 if expect.kv.expire_timestamp() > 0 {
683 assert!(actual.is_none());
684 } else {
685 assert_eq!(*expect.kv.value(), *actual.unwrap().value);
686 }
687 }
688
689 pub fn assert_database_rows(db: &Database, expect_rows: &Vec<TestingRow>) {
690 let mut i = 0;
691 for actual_row in db.iter().unwrap().map(|r| r.unwrap()) {
692 let expect_row = expect_rows.get(i).unwrap();
693 assert_eq!(expect_row.kv.key(), actual_row.key);
694 assert_eq!(
695 expect_row.kv.expire_timestamp(),
696 actual_row.value.expire_timestamp
697 );
698 if expect_row.kv.expire_timestamp() > 0 {
699 assert!(actual_row.value.value.is_empty());
700 } else {
701 assert_eq!(expect_row.kv.value(), actual_row.value.value);
702 }
703
704 assert_eq!(expect_row.pos, actual_row.row_location);
705 i += 1;
706 }
707 assert_eq!(expect_rows.len(), i);
708 }
709
710 pub fn write_kvs_to_db(db: &Database, kvs: Vec<TestingKV>) -> Vec<TestingRow> {
711 kvs.into_iter()
712 .map(|kv| write_kv_to_db(db, kv))
713 .collect::<Vec<TestingRow>>()
714 }
715
716 pub fn write_kv_to_db(db: &Database, kv: TestingKV) -> TestingRow {
717 let pos = db
718 .write(
719 kv.key(),
720 TimedValue::expirable_value(kv.value(), kv.expire_timestamp()),
721 )
722 .unwrap();
723 TestingRow::new(kv, pos)
724 }
725
726 #[test]
727 fn test_read_write_writing_file() {
728 let dir = get_temporary_directory_path();
729 let storage_id_generator = Arc::new(StorageIdGenerator::default());
730 let db =
731 Database::open(&dir, storage_id_generator, Arc::new(get_database_options())).unwrap();
732 let kvs = vec![
733 TestingKV::new("k1", "value1"),
734 TestingKV::new("k2", "value2"),
735 TestingKV::new("k3", "value3"),
736 TestingKV::new("k1", "value4"),
737 ];
738 let rows = write_kvs_to_db(&db, kvs);
739 assert_rows_value(&db, &rows);
740 assert_database_rows(&db, &rows);
741 }
742
743 #[test]
744 fn test_read_write_expirable_value_in_writing_file() {
745 let dir = get_temporary_directory_path();
746 let storage_id_generator = Arc::new(StorageIdGenerator::default());
747 let clock = Arc::new(DebugClock::new(1000));
748 let db = Database::open(
749 &dir,
750 storage_id_generator,
751 Arc::new(get_database_options().debug_clock(clock)),
752 )
753 .unwrap();
754 let kvs = vec![
755 TestingKV::new_expirable("k1", "value1", 100),
756 TestingKV::new("k2", "value2"),
757 TestingKV::new_expirable("k3", "value3", 100),
758 TestingKV::new("k1", "value4"),
759 ];
760 let rows = write_kvs_to_db(&db, kvs);
761 assert_rows_value(&db, &rows);
762 assert_database_rows(&db, &rows);
763 }
764
765 #[test]
766 fn test_read_write_with_stable_files() {
767 let dir = get_temporary_directory_path();
768 let mut rows: Vec<TestingRow> = vec![];
769 let storage_id_generator = Arc::new(StorageIdGenerator::default());
770 let db = Database::open(
771 &dir,
772 storage_id_generator.clone(),
773 Arc::new(get_database_options()),
774 )
775 .unwrap();
776 let kvs = vec![
777 TestingKV::new("k1", "value1"),
778 TestingKV::new("k2", "value2"),
779 ];
780 rows.append(&mut write_kvs_to_db(&db, kvs));
781 db.flush_writing_file().unwrap();
782
783 let kvs = vec![
784 TestingKV::new("k3", "hello world"),
785 TestingKV::new("k1", "value4"),
786 ];
787 rows.append(&mut write_kvs_to_db(&db, kvs));
788 db.flush_writing_file().unwrap();
789
790 assert_eq!(3, storage_id_generator.get_id());
791 assert_eq!(2, db.stable_storages.len());
792 assert_rows_value(&db, &rows);
793 assert_database_rows(&db, &rows);
794 }
795
796 #[test]
797 fn test_read_write_expirable_value_in_stable_files() {
798 let dir = get_temporary_directory_path();
799 let mut rows: Vec<TestingRow> = vec![];
800 let storage_id_generator = Arc::new(StorageIdGenerator::default());
801 let db = Database::open(
802 &dir,
803 storage_id_generator.clone(),
804 Arc::new(get_database_options()),
805 )
806 .unwrap();
807 let kvs = vec![
808 TestingKV::new("k1", "value1"),
809 TestingKV::new_expirable("k2", "value2", 100),
810 ];
811 rows.append(&mut write_kvs_to_db(&db, kvs));
812 db.flush_writing_file().unwrap();
813
814 let kvs = vec![
815 TestingKV::new_expirable("k3", "hello world", 100),
816 TestingKV::new("k1", "value4"),
817 ];
818 rows.append(&mut write_kvs_to_db(&db, kvs));
819 db.flush_writing_file().unwrap();
820
821 assert_eq!(3, storage_id_generator.get_id());
822 assert_eq!(2, db.stable_storages.len());
823 assert_rows_value(&db, &rows);
824 assert_database_rows(&db, &rows);
825 }
826
827 #[test]
828 fn test_recovery() {
829 let dir = get_temporary_directory_path();
830 let mut rows: Vec<TestingRow> = vec![];
831 let storage_id_generator = Arc::new(StorageIdGenerator::default());
832 {
833 let db = Database::open(
834 &dir,
835 storage_id_generator.clone(),
836 Arc::new(get_database_options()),
837 )
838 .unwrap();
839 let kvs = vec![
840 TestingKV::new("k1", "value1"),
841 TestingKV::new_expirable("k2", "value2", 100),
842 ];
843 rows.append(&mut write_kvs_to_db(&db, kvs));
844 assert_rows_value(&db, &rows);
845 }
846 {
847 let db = Database::open(
848 &dir,
849 storage_id_generator.clone(),
850 Arc::new(get_database_options()),
851 )
852 .unwrap();
853 let kvs = vec![
854 TestingKV::new("k3", "hello world"),
855 TestingKV::new_expirable("k1", "value4", 100),
856 ];
857 rows.append(&mut write_kvs_to_db(&db, kvs));
858 assert_rows_value(&db, &rows);
859 }
860
861 let db = Database::open(
862 &dir,
863 storage_id_generator.clone(),
864 Arc::new(get_database_options()),
865 )
866 .unwrap();
867 assert_eq!(1, storage_id_generator.get_id());
868 assert_eq!(0, db.stable_storages.len());
869 assert_rows_value(&db, &rows);
870 assert_database_rows(&db, &rows);
871 }
872
873 #[test]
874 fn test_recovery_from_key_value_not_fully_written() {
875 let dir = get_temporary_directory_path();
876 let mut rows: Vec<TestingRow> = vec![];
877 let storage_id_generator = Arc::new(StorageIdGenerator::default());
878
879 {
880 let db = Database::open(
881 &dir,
882 storage_id_generator.clone(),
883 Arc::new(get_database_options()),
884 )
885 .unwrap();
886
887 rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
888 write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100));
889
890 let storage_id = db.writing_storage.lock().storage_id();
891 let offset = db.writing_storage.lock().offset();
892 let f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
893 .unwrap()
894 .file;
895
896 f.set_len(offset as u64 - 1).unwrap();
898 }
899 {
900 let db = Database::open(
901 &dir,
902 storage_id_generator.clone(),
903 Arc::new(get_database_options()),
904 )
905 .unwrap();
906 assert_rows_value(&db, &rows);
908 assert_database_rows(&db, &rows);
909 rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
911 }
912
913 let db = Database::open(
914 &dir,
915 storage_id_generator.clone(),
916 Arc::new(get_database_options()),
917 )
918 .unwrap();
919 assert_rows_value(&db, &rows);
920 assert_database_rows(&db, &rows);
921 }
922
923 #[test]
924 fn test_recovery_from_header_not_fully_written() {
925 let dir = get_temporary_directory_path();
926 let mut rows: Vec<TestingRow> = vec![];
927 let storage_id_generator = Arc::new(StorageIdGenerator::default());
928
929 {
930 let db = Database::open(
931 &dir,
932 storage_id_generator.clone(),
933 Arc::new(get_database_options()),
934 )
935 .unwrap();
936
937 rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
938 let pos = write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100)).pos;
939
940 let storage_id = db.writing_storage.lock().storage_id();
941 let f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
942 .unwrap()
943 .file;
944
945 f.set_len((pos.row_offset + 1) as u64).unwrap();
947 }
948
949 {
950 let db = Database::open(
951 &dir,
952 storage_id_generator.clone(),
953 Arc::new(get_database_options()),
954 )
955 .unwrap();
956 assert_rows_value(&db, &rows);
958 assert_database_rows(&db, &rows);
959 rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
961 }
962
963 let db = Database::open(
964 &dir,
965 storage_id_generator.clone(),
966 Arc::new(get_database_options()),
967 )
968 .unwrap();
969 assert_rows_value(&db, &rows);
970 assert_database_rows(&db, &rows);
971 }
972
973 #[test]
974 fn test_recovery_from_crc_failed() {
975 let dir = get_temporary_directory_path();
976 let mut rows: Vec<TestingRow> = vec![];
977 let storage_id_generator = Arc::new(StorageIdGenerator::default());
978
979 {
980 let db = Database::open(
981 &dir,
982 storage_id_generator.clone(),
983 Arc::new(get_database_options()),
984 )
985 .unwrap();
986
987 rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
988 write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100));
989
990 let storage_id = db.writing_storage.lock().storage_id();
991 let offset = db.writing_storage.lock().offset();
992 let mut f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
993 .unwrap()
994 .file;
995
996 f.set_len(offset as u64 - 1).unwrap();
998 f.seek(std::io::SeekFrom::End(0)).unwrap();
999 f.write_all(&[1_u8]).unwrap();
1000 }
1001
1002 {
1003 let db = Database::open(
1004 &dir,
1005 storage_id_generator.clone(),
1006 Arc::new(get_database_options()),
1007 )
1008 .unwrap();
1009 assert_rows_value(&db, &rows);
1011 assert_database_rows(&db, &rows);
1012 rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
1014 }
1015
1016 let db = Database::open(
1017 &dir,
1018 storage_id_generator.clone(),
1019 Arc::new(get_database_options()),
1020 )
1021 .unwrap();
1022 assert_rows_value(&db, &rows);
1023 assert_database_rows(&db, &rows);
1024 }
1025
1026 #[test]
1027 fn test_wrap_file() {
1028 let storage_id_generator = Arc::new(StorageIdGenerator::default());
1029 let dir = get_temporary_directory_path();
1030 let db = Database::open(
1031 &dir,
1032 storage_id_generator,
1033 Arc::new(
1034 BitcaskyOptions::default()
1035 .max_data_file_size(120)
1036 .init_data_file_capacity(100),
1037 ),
1038 )
1039 .unwrap();
1040 let kvs = vec![
1041 TestingKV::new("k1", "value1_value1_value1"),
1042 TestingKV::new("k2", "value2_value2_value2"),
1043 TestingKV::new("k3", "value3_value3_value3"),
1044 TestingKV::new("k1", "value4_value4_value4"),
1045 ];
1046 assert_eq!(0, db.stable_storages.len());
1047 let rows = write_kvs_to_db(&db, kvs);
1048 assert_rows_value(&db, &rows);
1049 assert_eq!(1, db.stable_storages.len());
1050 assert_database_rows(&db, &rows);
1051 }
1052}