bitcasky_database/
core.rs

1use std::{
2    cell::Cell,
3    mem,
4    path::{Path, PathBuf},
5    sync::Arc,
6    thread::{self, JoinHandle},
7    time::{Duration, Instant},
8};
9
10use crossbeam_channel::{select, Receiver, Sender};
11use dashmap::{mapref::one::RefMut, DashMap};
12use parking_lot::{Mutex, MutexGuard};
13
14use bitcasky_common::{
15    clock::Clock,
16    formatter::{BitcaskyFormatter, RowToWrite},
17    fs::{self as SelfFs, FileType},
18    options::BitcaskyOptions,
19    storage_id::{StorageId, StorageIdGenerator},
20};
21
22use crate::{
23    common::{DatabaseError, DatabaseResult},
24    data_storage::DataStorageTelemetry,
25    hint::{self, HintWriter},
26};
27
28use log::{debug, error, info, trace, warn};
29
30use super::{
31    common::{RecoveredRow, TimedValue},
32    data_storage::{DataStorage, DataStorageReader, DataStorageWriter, StorageIter},
33    DataStorageError,
34};
35use super::{
36    common::{RowLocation, RowToRead},
37    hint::HintFile,
38};
39/**
40 * Statistics of a Database.
41 * Some of the metrics may not accurate due to concurrent access.
42 */
43#[derive(Debug)]
44pub struct DatabaseTelemetry {
45    pub writing_storage: DataStorageTelemetry,
46    pub stable_storages: Vec<DataStorageTelemetry>,
47    pub hint_file_writer: hint::HintWriterTelemetry,
48}
49
50#[derive(Debug)]
51pub struct StorageIds {
52    pub stable_storage_ids: Vec<StorageId>,
53    pub writing_storage_id: StorageId,
54}
55
56#[derive(Debug)]
57pub struct Database {
58    pub database_dir: PathBuf,
59    storage_id_generator: Arc<StorageIdGenerator>,
60    writing_storage: Arc<Mutex<DataStorage>>,
61    stable_storages: DashMap<StorageId, Mutex<DataStorage>>,
62    options: Arc<BitcaskyOptions>,
63    hint_file_writer: Option<HintWriter>,
64    /// Process that periodically flushes writing storage
65    sync_worker: Option<SyncWorker>,
66    formatter: Arc<BitcaskyFormatter>,
67    is_error: Mutex<Option<String>>,
68}
69
70impl Database {
71    pub fn open(
72        directory: &Path,
73        storage_id_generator: Arc<StorageIdGenerator>,
74        options: Arc<BitcaskyOptions>,
75    ) -> DatabaseResult<Database> {
76        let database_dir: PathBuf = directory.into();
77
78        debug!(target: "Database", "opening database at directory {:?}", directory);
79
80        hint::clear_temp_hint_file_directory(&database_dir);
81
82        let data_storage_ids = SelfFs::get_storage_ids_in_dir(&database_dir, FileType::DataFile);
83        if let Some(id) = data_storage_ids.iter().max() {
84            storage_id_generator.update_id(*id);
85        }
86
87        let hint_file_writer = Some(HintWriter::start(&database_dir, options.clone()));
88
89        let formatter = Arc::new(BitcaskyFormatter::default());
90        let (writing_storage, storages) = prepare_db_storages(
91            &database_dir,
92            &data_storage_ids,
93            &storage_id_generator,
94            formatter.clone(),
95            options.clone(),
96        )?;
97
98        let stable_storages = storages.into_iter().fold(DashMap::new(), |m, s| {
99            m.insert(s.storage_id(), Mutex::new(s));
100            m
101        });
102
103        let writing_storage = Arc::new(Mutex::new(writing_storage));
104        let mut db = Database {
105            writing_storage,
106            storage_id_generator,
107            database_dir,
108            stable_storages,
109            options: options.clone(),
110            hint_file_writer,
111            sync_worker: None,
112            formatter,
113            is_error: Mutex::new(None),
114        };
115
116        if options.database.sync_interval_sec > 0 {
117            db.sync_worker = Some(SyncWorker::start_sync_worker(
118                db.writing_storage.clone(),
119                options.database.sync_interval_sec,
120            ));
121        }
122
123        info!(target: "Database", "database opened at directory: {:?}, with {} data files", directory, data_storage_ids.len());
124        Ok(db)
125    }
126
127    pub fn get_database_dir(&self) -> &Path {
128        &self.database_dir
129    }
130
131    pub fn get_max_storage_id(&self) -> StorageId {
132        let writing_file_ref = self.writing_storage.lock();
133        writing_file_ref.storage_id()
134    }
135
136    pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
137        &self,
138        key: K,
139        value: TimedValue<V>,
140    ) -> DatabaseResult<RowLocation> {
141        let ts = value.expire_timestamp;
142        let row: RowToWrite<K, TimedValue<V>> = RowToWrite::new_with_timestamp(key, value, ts);
143        let mut writing_file_ref = self.writing_storage.lock();
144
145        match writing_file_ref.write_row(&row) {
146            Err(DataStorageError::StorageOverflow(id)) => {
147                debug!("Flush writing storage with id: {} on overflow", id);
148                self.do_flush_writing_file(&mut writing_file_ref)?;
149                Ok(writing_file_ref.write_row(&row)?)
150            }
151            r => Ok(r?),
152        }
153    }
154
155    pub fn flush_writing_file(&self) -> DatabaseResult<()> {
156        let mut writing_file_ref = self.writing_storage.lock();
157        debug!(
158            "Flush writing file with id: {}",
159            writing_file_ref.storage_id()
160        );
161        // flush file only when we actually wrote something
162        self.do_flush_writing_file(&mut writing_file_ref)?;
163
164        Ok(())
165    }
166
167    pub fn recovery_iter(&self) -> DatabaseResult<DatabaseRecoverIter> {
168        let mut storage_ids: Vec<StorageId>;
169        {
170            let writing_storage = self.writing_storage.lock();
171            let writing_storage_id = writing_storage.storage_id();
172
173            storage_ids = self
174                .stable_storages
175                .iter()
176                .map(|f| f.lock().storage_id())
177                .collect::<Vec<StorageId>>();
178            storage_ids.push(writing_storage_id);
179            storage_ids.sort();
180            storage_ids.reverse();
181        }
182        DatabaseRecoverIter::new(self.database_dir.clone(), storage_ids, self.options.clone())
183    }
184
185    pub fn iter(&self) -> DatabaseResult<DatabaseIter> {
186        let mut storage_ids: Vec<StorageId>;
187        {
188            let writing_storage = self.writing_storage.lock();
189            let writing_storage_id = writing_storage.storage_id();
190
191            storage_ids = self
192                .stable_storages
193                .iter()
194                .map(|f| f.lock().storage_id())
195                .collect::<Vec<StorageId>>();
196            storage_ids.push(writing_storage_id);
197        }
198
199        let files: DatabaseResult<Vec<DataStorage>> = storage_ids
200            .iter()
201            .map(|f| {
202                DataStorage::open(&self.database_dir, *f, self.options.clone())
203                    .map_err(DatabaseError::StorageError)
204            })
205            .collect();
206
207        let mut opened_stable_files = files?;
208        opened_stable_files.sort_by_key(|e| e.storage_id());
209        let iters: crate::data_storage::Result<Vec<StorageIter>> =
210            opened_stable_files.iter().rev().map(|f| f.iter()).collect();
211
212        Ok(DatabaseIter::new(iters?))
213    }
214
215    pub fn read_value(
216        &self,
217        row_location: &RowLocation,
218    ) -> DatabaseResult<Option<TimedValue<Vec<u8>>>> {
219        {
220            let mut writing_file_ref = self.writing_storage.lock();
221            if row_location.storage_id == writing_file_ref.storage_id() {
222                return Ok(writing_file_ref.read_value(row_location.row_offset)?);
223            }
224        }
225
226        let l = self.get_file_to_read(row_location.storage_id)?;
227        let mut f = l.lock();
228        let ret = f.read_value(row_location.row_offset)?;
229        Ok(ret)
230    }
231
232    pub fn reload_data_files(&self, data_storage_ids: Vec<StorageId>) -> DatabaseResult<()> {
233        let (writing, stables) = prepare_db_storages(
234            &self.database_dir,
235            &data_storage_ids,
236            &self.storage_id_generator,
237            self.formatter.clone(),
238            self.options.clone(),
239        )?;
240
241        {
242            let mut writing_storage_ref = self.writing_storage.lock();
243            debug!(
244                "reload writing storage with id: {}",
245                writing_storage_ref.storage_id()
246            );
247            let _ = mem::replace(&mut *writing_storage_ref, writing);
248        }
249
250        self.stable_storages.clear();
251
252        for s in stables {
253            if self.stable_storages.contains_key(&s.storage_id()) {
254                core::panic!("file id: {} already loaded in database", s.storage_id());
255            }
256            debug!("reload stable file with id: {}", s.storage_id());
257            self.stable_storages.insert(s.storage_id(), Mutex::new(s));
258        }
259        Ok(())
260    }
261
262    pub fn get_storage_ids(&self) -> StorageIds {
263        let writing_file_ref = self.writing_storage.lock();
264        let writing_storage_id = writing_file_ref.storage_id();
265        let stable_storage_ids: Vec<StorageId> = self
266            .stable_storages
267            .iter()
268            .map(|f| f.value().lock().storage_id())
269            .collect();
270        StorageIds {
271            stable_storage_ids,
272            writing_storage_id,
273        }
274    }
275
276    pub fn get_telemetry_data(&self) -> DatabaseTelemetry {
277        let writing_storage = { self.writing_storage.lock().get_telemetry_data() };
278        let stable_storages: Vec<DataStorageTelemetry> = {
279            self.stable_storages
280                .iter()
281                .map(|s| s.lock().get_telemetry_data())
282                .collect()
283        };
284
285        DatabaseTelemetry {
286            hint_file_writer: self
287                .hint_file_writer
288                .as_ref()
289                .map(|h| h.get_telemetry_data())
290                .unwrap_or_default(),
291            writing_storage,
292            stable_storages,
293        }
294    }
295
296    // Clear this database completely. Delete data physically and delete all data files.
297    pub fn drop(&self) -> DatabaseResult<()> {
298        debug!("Drop database called");
299
300        {
301            let mut writing_file_ref = self.writing_storage.lock();
302            debug!(
303                "Flush writing file with id: {} on drop database",
304                writing_file_ref.storage_id()
305            );
306            // flush file only when we actually wrote something
307            self.do_flush_writing_file(&mut writing_file_ref)?;
308        }
309        for storage_id in self.stable_storages.iter().map(|v| v.lock().storage_id()) {
310            SelfFs::delete_file(&self.database_dir, FileType::DataFile, Some(storage_id))?;
311            SelfFs::delete_file(&self.database_dir, FileType::HintFile, Some(storage_id))?;
312        }
313        self.stable_storages.clear();
314        Ok(())
315    }
316
317    pub fn sync(&self) -> DatabaseResult<()> {
318        let mut f = self.writing_storage.lock();
319        f.flush()?;
320        Ok(())
321    }
322
323    pub fn mark_db_error(&self, error_string: String) {
324        let mut err = self.is_error.lock();
325        *err = Some(error_string)
326    }
327
328    pub fn check_db_error(&self) -> Result<(), DatabaseError> {
329        let err = self.is_error.lock();
330        if err.is_some() {
331            return Err(DatabaseError::DatabaseBroken(err.as_ref().unwrap().clone()));
332        }
333        Ok(())
334    }
335
336    fn do_flush_writing_file(
337        &self,
338        writing_file_ref: &mut MutexGuard<DataStorage>,
339    ) -> DatabaseResult<()> {
340        if !writing_file_ref.is_dirty() {
341            debug!(
342                "Skip flush empty wirting file with id: {}",
343                writing_file_ref.storage_id()
344            );
345            return Ok(());
346        }
347        let next_storage_id = self.storage_id_generator.generate_next_id();
348        let next_writing_file = DataStorage::new(
349            &self.database_dir,
350            next_storage_id,
351            self.formatter.clone(),
352            self.options.clone(),
353        )?;
354        let mut old_storage = mem::replace(&mut **writing_file_ref, next_writing_file);
355        old_storage.flush()?;
356        let storage_id = old_storage.storage_id();
357        self.stable_storages
358            .insert(storage_id, Mutex::new(old_storage));
359        if let Some(w) = self.hint_file_writer.as_ref() {
360            w.async_write_hint_file(storage_id);
361        }
362        debug!(target: "Database", "writing file with id: {} flushed, new writing file with id: {} created", storage_id, next_storage_id);
363        Ok(())
364    }
365
366    fn get_file_to_read(
367        &self,
368        storage_id: StorageId,
369    ) -> DatabaseResult<RefMut<StorageId, Mutex<DataStorage>>> {
370        self.stable_storages
371            .get_mut(&storage_id)
372            .ok_or(DatabaseError::TargetFileIdNotFound(storage_id))
373    }
374}
375
376impl Drop for Database {
377    fn drop(&mut self) {
378        let mut writing_file_ref = self.writing_storage.lock();
379        if let Err(e) = writing_file_ref.flush() {
380            warn!(target: "Database", "sync database failed: {}", e)
381        }
382
383        if let Some(worker) = self.sync_worker.take() {
384            drop(worker);
385        }
386
387        if let Some(hint_w) = self.hint_file_writer.take() {
388            drop(hint_w);
389        }
390
391        info!(target: "Database", "database on directory: {:?} closed", self.database_dir)
392    }
393}
394
395#[derive(Debug)]
396struct SyncWorker {
397    stop_sender: Sender<()>,
398    handle: Option<JoinHandle<()>>,
399}
400
401impl SyncWorker {
402    fn start_sync_worker(
403        datastorage: Arc<Mutex<DataStorage>>,
404        sync_interval_sec: u64,
405    ) -> SyncWorker {
406        let channel = crossbeam_channel::bounded(1);
407        let stop_sender = channel.0;
408        let stop_receiver: Receiver<()> = channel.1;
409
410        let sync_duration = Duration::from_secs(sync_interval_sec);
411        let receiver = crossbeam_channel::tick(sync_duration);
412        let handle = thread::spawn(move || {
413            let mut last_sync = Instant::now();
414            loop {
415                select! {
416                    recv(stop_receiver) -> _ => {
417                        info!(target: "Database", "stopping sync worker");
418                        return
419                    }
420
421                    recv(receiver) -> _ => {
422                        if last_sync.elapsed() < sync_duration {
423                            continue;
424                        }
425
426                        trace!("Attempting syncing");
427                        let mut f = datastorage.lock();
428                        if let Err(e) = f.flush() {
429                            error!(target: "Database", "flush database failed: {}", e);
430                        }
431                        last_sync = Instant::now();
432                    },
433                }
434            }
435        });
436        SyncWorker {
437            stop_sender,
438            handle: Some(handle),
439        }
440    }
441}
442
443impl Drop for SyncWorker {
444    fn drop(&mut self) {
445        if self.stop_sender.send(()).is_err() {
446            warn!("Failed to stop sync worker.");
447        }
448
449        if let Some(handle) = self.handle.take() {
450            if handle.join().is_err() {
451                warn!(target: "Database", "wait sync worker done failed");
452            }
453        }
454    }
455}
456
457pub struct DatabaseIter {
458    current_iter: Cell<Option<StorageIter>>,
459    remain_iters: Vec<StorageIter>,
460}
461
462impl DatabaseIter {
463    fn new(mut iters: Vec<StorageIter>) -> Self {
464        if iters.is_empty() {
465            DatabaseIter {
466                remain_iters: iters,
467                current_iter: Cell::new(None),
468            }
469        } else {
470            let current_iter = iters.pop();
471            DatabaseIter {
472                remain_iters: iters,
473                current_iter: Cell::new(current_iter),
474            }
475        }
476    }
477}
478
479impl Iterator for DatabaseIter {
480    type Item = DatabaseResult<RowToRead>;
481
482    fn next(&mut self) -> Option<Self::Item> {
483        loop {
484            match self.current_iter.get_mut() {
485                None => break,
486                Some(iter) => match iter.next() {
487                    None => {
488                        self.current_iter.replace(self.remain_iters.pop());
489                    }
490                    other => return other.map(|r| r.map_err(DatabaseError::StorageError)),
491                },
492            }
493        }
494        None
495    }
496}
497
498fn recovered_iter(
499    database_dir: &Path,
500    storage_id: StorageId,
501    options: Arc<BitcaskyOptions>,
502) -> DatabaseResult<Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>>> {
503    if FileType::HintFile
504        .get_path(database_dir, Some(storage_id))
505        .exists()
506    {
507        debug!(target: "Database", "recover from hint file with id: {}", storage_id);
508        Ok(Box::new(HintFile::open_iterator(database_dir, storage_id)?))
509    } else {
510        debug!(target: "Database", "recover from data file with id: {}", storage_id);
511        let stable_file = DataStorage::open(database_dir, storage_id, options.clone())?;
512        let i = stable_file.iter().map(move |iter| {
513            iter.map(move |row| {
514                row.map(|r| RecoveredRow {
515                    row_location: r.row_location,
516                    key: r.key,
517                    invalid: !r.value.is_valid(options.clock.now()),
518                })
519                .map_err(DatabaseError::StorageError)
520            })
521        })?;
522        Ok(Box::new(i))
523    }
524}
525
526pub struct DatabaseRecoverIter {
527    current_iter: Cell<Option<Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>>>>,
528    data_storage_ids: Vec<StorageId>,
529    database_dir: PathBuf,
530    options: Arc<BitcaskyOptions>,
531}
532
533impl DatabaseRecoverIter {
534    fn new(
535        database_dir: PathBuf,
536        mut iters: Vec<StorageId>,
537        options: Arc<BitcaskyOptions>,
538    ) -> DatabaseResult<Self> {
539        if let Some(id) = iters.pop() {
540            let iter: Box<dyn Iterator<Item = DatabaseResult<RecoveredRow>>> =
541                recovered_iter(&database_dir, id, options.clone())?;
542            Ok(DatabaseRecoverIter {
543                database_dir,
544                data_storage_ids: iters,
545                current_iter: Cell::new(Some(iter)),
546                options,
547            })
548        } else {
549            Ok(DatabaseRecoverIter {
550                database_dir,
551                data_storage_ids: iters,
552                current_iter: Cell::new(None),
553                options,
554            })
555        }
556    }
557}
558
559impl Iterator for DatabaseRecoverIter {
560    type Item = DatabaseResult<RecoveredRow>;
561
562    fn next(&mut self) -> Option<Self::Item> {
563        loop {
564            match self.current_iter.get_mut() {
565                None => break,
566                Some(iter) => match iter.next() {
567                    None => {
568                        if let Some(id) = self.data_storage_ids.pop() {
569                            match recovered_iter(&self.database_dir, id, self.options.clone()) {
570                                Ok(iter) => {
571                                    self.current_iter.replace(Some(iter));
572                                }
573                                Err(e) => return Some(Err(e)),
574                            }
575                        } else {
576                            break;
577                        }
578                    }
579                    other => return other,
580                },
581            }
582        }
583        None
584    }
585}
586
587fn open_storages<P: AsRef<Path>>(
588    database_dir: P,
589    data_storage_ids: &[u32],
590    options: Arc<BitcaskyOptions>,
591) -> DatabaseResult<Vec<DataStorage>> {
592    let mut storage_ids = data_storage_ids.to_owned();
593    storage_ids.sort();
594
595    Ok(storage_ids
596        .iter()
597        .map(|id| DataStorage::open(&database_dir, *id, options.clone()))
598        .collect::<crate::data_storage::Result<Vec<DataStorage>>>()?)
599}
600
601fn prepare_db_storages<P: AsRef<Path>>(
602    database_dir: P,
603    data_storage_ids: &[u32],
604    storage_id_generator: &StorageIdGenerator,
605    formatter: Arc<BitcaskyFormatter>,
606    options: Arc<BitcaskyOptions>,
607) -> DatabaseResult<(DataStorage, Vec<DataStorage>)> {
608    let mut storages = open_storages(&database_dir, data_storage_ids, options.clone())?;
609    let mut writing_storage;
610    if storages.is_empty() {
611        let writing_storage_id = storage_id_generator.generate_next_id();
612        let storage = DataStorage::new(&database_dir, writing_storage_id, formatter, options)?;
613        debug!(target: "Database", "create writing file with id: {}", writing_storage_id);
614        writing_storage = storage;
615    } else {
616        writing_storage = storages.pop().unwrap();
617        if let Err(e) = writing_storage.seek_to_end() {
618            match e {
619                DataStorageError::EofError() => {
620                    warn!(target: "Database", "got EOF in writing file with id: {}", writing_storage.storage_id());
621                }
622                DataStorageError::DataStorageFormatter(e) => {
623                    warn!(target: "Database", "has invalid data in writing file with id: {}, reason: {}", writing_storage.storage_id(), e);
624                }
625                _ => return Err(DatabaseError::StorageError(e)),
626            }
627        }
628        debug!(target: "Database", "reuse writing file with id: {}", writing_storage.storage_id());
629    }
630
631    Ok((writing_storage, storages))
632}
633
634#[cfg(test)]
635pub mod database_tests {
636    use std::{
637        io::{Seek, Write},
638        sync::Arc,
639        time::Duration,
640    };
641
642    use bitcasky_common::{
643        clock::DebugClock, fs, fs::FileType, options::BitcaskyOptions,
644        storage_id::StorageIdGenerator,
645    };
646    use utilities::common::{get_temporary_directory_path, TestingKV};
647
648    use test_log::test;
649
650    use crate::{data_storage::DataStorageReader, RowLocation, TimedValue};
651
652    use super::Database;
653
654    #[derive(Debug)]
655    pub struct TestingRow {
656        pub kv: TestingKV,
657        pub pos: RowLocation,
658    }
659
660    impl TestingRow {
661        pub fn new(kv: TestingKV, pos: RowLocation) -> Self {
662            TestingRow { kv, pos }
663        }
664    }
665
666    fn get_database_options() -> BitcaskyOptions {
667        BitcaskyOptions::default()
668            .max_data_file_size(1024)
669            .init_data_file_capacity(100)
670            .sync_interval(Duration::from_secs(60))
671            .init_hint_file_capacity(1024)
672    }
673
674    pub fn assert_rows_value(db: &Database, expect: &Vec<TestingRow>) {
675        for row in expect {
676            assert_row_value(db, row);
677        }
678    }
679
680    pub fn assert_row_value(db: &Database, expect: &TestingRow) {
681        let actual = db.read_value(&expect.pos).unwrap();
682        if expect.kv.expire_timestamp() > 0 {
683            assert!(actual.is_none());
684        } else {
685            assert_eq!(*expect.kv.value(), *actual.unwrap().value);
686        }
687    }
688
689    pub fn assert_database_rows(db: &Database, expect_rows: &Vec<TestingRow>) {
690        let mut i = 0;
691        for actual_row in db.iter().unwrap().map(|r| r.unwrap()) {
692            let expect_row = expect_rows.get(i).unwrap();
693            assert_eq!(expect_row.kv.key(), actual_row.key);
694            assert_eq!(
695                expect_row.kv.expire_timestamp(),
696                actual_row.value.expire_timestamp
697            );
698            if expect_row.kv.expire_timestamp() > 0 {
699                assert!(actual_row.value.value.is_empty());
700            } else {
701                assert_eq!(expect_row.kv.value(), actual_row.value.value);
702            }
703
704            assert_eq!(expect_row.pos, actual_row.row_location);
705            i += 1;
706        }
707        assert_eq!(expect_rows.len(), i);
708    }
709
710    pub fn write_kvs_to_db(db: &Database, kvs: Vec<TestingKV>) -> Vec<TestingRow> {
711        kvs.into_iter()
712            .map(|kv| write_kv_to_db(db, kv))
713            .collect::<Vec<TestingRow>>()
714    }
715
716    pub fn write_kv_to_db(db: &Database, kv: TestingKV) -> TestingRow {
717        let pos = db
718            .write(
719                kv.key(),
720                TimedValue::expirable_value(kv.value(), kv.expire_timestamp()),
721            )
722            .unwrap();
723        TestingRow::new(kv, pos)
724    }
725
726    #[test]
727    fn test_read_write_writing_file() {
728        let dir = get_temporary_directory_path();
729        let storage_id_generator = Arc::new(StorageIdGenerator::default());
730        let db =
731            Database::open(&dir, storage_id_generator, Arc::new(get_database_options())).unwrap();
732        let kvs = vec![
733            TestingKV::new("k1", "value1"),
734            TestingKV::new("k2", "value2"),
735            TestingKV::new("k3", "value3"),
736            TestingKV::new("k1", "value4"),
737        ];
738        let rows = write_kvs_to_db(&db, kvs);
739        assert_rows_value(&db, &rows);
740        assert_database_rows(&db, &rows);
741    }
742
743    #[test]
744    fn test_read_write_expirable_value_in_writing_file() {
745        let dir = get_temporary_directory_path();
746        let storage_id_generator = Arc::new(StorageIdGenerator::default());
747        let clock = Arc::new(DebugClock::new(1000));
748        let db = Database::open(
749            &dir,
750            storage_id_generator,
751            Arc::new(get_database_options().debug_clock(clock)),
752        )
753        .unwrap();
754        let kvs = vec![
755            TestingKV::new_expirable("k1", "value1", 100),
756            TestingKV::new("k2", "value2"),
757            TestingKV::new_expirable("k3", "value3", 100),
758            TestingKV::new("k1", "value4"),
759        ];
760        let rows = write_kvs_to_db(&db, kvs);
761        assert_rows_value(&db, &rows);
762        assert_database_rows(&db, &rows);
763    }
764
765    #[test]
766    fn test_read_write_with_stable_files() {
767        let dir = get_temporary_directory_path();
768        let mut rows: Vec<TestingRow> = vec![];
769        let storage_id_generator = Arc::new(StorageIdGenerator::default());
770        let db = Database::open(
771            &dir,
772            storage_id_generator.clone(),
773            Arc::new(get_database_options()),
774        )
775        .unwrap();
776        let kvs = vec![
777            TestingKV::new("k1", "value1"),
778            TestingKV::new("k2", "value2"),
779        ];
780        rows.append(&mut write_kvs_to_db(&db, kvs));
781        db.flush_writing_file().unwrap();
782
783        let kvs = vec![
784            TestingKV::new("k3", "hello world"),
785            TestingKV::new("k1", "value4"),
786        ];
787        rows.append(&mut write_kvs_to_db(&db, kvs));
788        db.flush_writing_file().unwrap();
789
790        assert_eq!(3, storage_id_generator.get_id());
791        assert_eq!(2, db.stable_storages.len());
792        assert_rows_value(&db, &rows);
793        assert_database_rows(&db, &rows);
794    }
795
796    #[test]
797    fn test_read_write_expirable_value_in_stable_files() {
798        let dir = get_temporary_directory_path();
799        let mut rows: Vec<TestingRow> = vec![];
800        let storage_id_generator = Arc::new(StorageIdGenerator::default());
801        let db = Database::open(
802            &dir,
803            storage_id_generator.clone(),
804            Arc::new(get_database_options()),
805        )
806        .unwrap();
807        let kvs = vec![
808            TestingKV::new("k1", "value1"),
809            TestingKV::new_expirable("k2", "value2", 100),
810        ];
811        rows.append(&mut write_kvs_to_db(&db, kvs));
812        db.flush_writing_file().unwrap();
813
814        let kvs = vec![
815            TestingKV::new_expirable("k3", "hello world", 100),
816            TestingKV::new("k1", "value4"),
817        ];
818        rows.append(&mut write_kvs_to_db(&db, kvs));
819        db.flush_writing_file().unwrap();
820
821        assert_eq!(3, storage_id_generator.get_id());
822        assert_eq!(2, db.stable_storages.len());
823        assert_rows_value(&db, &rows);
824        assert_database_rows(&db, &rows);
825    }
826
827    #[test]
828    fn test_recovery() {
829        let dir = get_temporary_directory_path();
830        let mut rows: Vec<TestingRow> = vec![];
831        let storage_id_generator = Arc::new(StorageIdGenerator::default());
832        {
833            let db = Database::open(
834                &dir,
835                storage_id_generator.clone(),
836                Arc::new(get_database_options()),
837            )
838            .unwrap();
839            let kvs = vec![
840                TestingKV::new("k1", "value1"),
841                TestingKV::new_expirable("k2", "value2", 100),
842            ];
843            rows.append(&mut write_kvs_to_db(&db, kvs));
844            assert_rows_value(&db, &rows);
845        }
846        {
847            let db = Database::open(
848                &dir,
849                storage_id_generator.clone(),
850                Arc::new(get_database_options()),
851            )
852            .unwrap();
853            let kvs = vec![
854                TestingKV::new("k3", "hello world"),
855                TestingKV::new_expirable("k1", "value4", 100),
856            ];
857            rows.append(&mut write_kvs_to_db(&db, kvs));
858            assert_rows_value(&db, &rows);
859        }
860
861        let db = Database::open(
862            &dir,
863            storage_id_generator.clone(),
864            Arc::new(get_database_options()),
865        )
866        .unwrap();
867        assert_eq!(1, storage_id_generator.get_id());
868        assert_eq!(0, db.stable_storages.len());
869        assert_rows_value(&db, &rows);
870        assert_database_rows(&db, &rows);
871    }
872
873    #[test]
874    fn test_recovery_from_key_value_not_fully_written() {
875        let dir = get_temporary_directory_path();
876        let mut rows: Vec<TestingRow> = vec![];
877        let storage_id_generator = Arc::new(StorageIdGenerator::default());
878
879        {
880            let db = Database::open(
881                &dir,
882                storage_id_generator.clone(),
883                Arc::new(get_database_options()),
884            )
885            .unwrap();
886
887            rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
888            write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100));
889
890            let storage_id = db.writing_storage.lock().storage_id();
891            let offset = db.writing_storage.lock().offset();
892            let f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
893                .unwrap()
894                .file;
895
896            // data file broken, key value not fully written
897            f.set_len(offset as u64 - 1).unwrap();
898        }
899        {
900            let db = Database::open(
901                &dir,
902                storage_id_generator.clone(),
903                Arc::new(get_database_options()),
904            )
905            .unwrap();
906            // can only recover one value
907            assert_rows_value(&db, &rows);
908            assert_database_rows(&db, &rows);
909            // overwrite broken value
910            rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
911        }
912
913        let db = Database::open(
914            &dir,
915            storage_id_generator.clone(),
916            Arc::new(get_database_options()),
917        )
918        .unwrap();
919        assert_rows_value(&db, &rows);
920        assert_database_rows(&db, &rows);
921    }
922
923    #[test]
924    fn test_recovery_from_header_not_fully_written() {
925        let dir = get_temporary_directory_path();
926        let mut rows: Vec<TestingRow> = vec![];
927        let storage_id_generator = Arc::new(StorageIdGenerator::default());
928
929        {
930            let db = Database::open(
931                &dir,
932                storage_id_generator.clone(),
933                Arc::new(get_database_options()),
934            )
935            .unwrap();
936
937            rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
938            let pos = write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100)).pos;
939
940            let storage_id = db.writing_storage.lock().storage_id();
941            let f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
942                .unwrap()
943                .file;
944
945            // data file broken, header not fully written
946            f.set_len((pos.row_offset + 1) as u64).unwrap();
947        }
948
949        {
950            let db = Database::open(
951                &dir,
952                storage_id_generator.clone(),
953                Arc::new(get_database_options()),
954            )
955            .unwrap();
956            // can only recover one value
957            assert_rows_value(&db, &rows);
958            assert_database_rows(&db, &rows);
959            // overwrite broken value
960            rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
961        }
962
963        let db = Database::open(
964            &dir,
965            storage_id_generator.clone(),
966            Arc::new(get_database_options()),
967        )
968        .unwrap();
969        assert_rows_value(&db, &rows);
970        assert_database_rows(&db, &rows);
971    }
972
973    #[test]
974    fn test_recovery_from_crc_failed() {
975        let dir = get_temporary_directory_path();
976        let mut rows: Vec<TestingRow> = vec![];
977        let storage_id_generator = Arc::new(StorageIdGenerator::default());
978
979        {
980            let db = Database::open(
981                &dir,
982                storage_id_generator.clone(),
983                Arc::new(get_database_options()),
984            )
985            .unwrap();
986
987            rows.push(write_kv_to_db(&db, TestingKV::new("k1", "value1")));
988            write_kv_to_db(&db, TestingKV::new_expirable("k2", "value2", 100));
989
990            let storage_id = db.writing_storage.lock().storage_id();
991            let offset = db.writing_storage.lock().offset();
992            let mut f = fs::open_file(&dir, FileType::DataFile, Some(storage_id))
993                .unwrap()
994                .file;
995
996            // data file broken, change last byte to break crc check
997            f.set_len(offset as u64 - 1).unwrap();
998            f.seek(std::io::SeekFrom::End(0)).unwrap();
999            f.write_all(&[1_u8]).unwrap();
1000        }
1001
1002        {
1003            let db = Database::open(
1004                &dir,
1005                storage_id_generator.clone(),
1006                Arc::new(get_database_options()),
1007            )
1008            .unwrap();
1009            // can only recover one value
1010            assert_rows_value(&db, &rows);
1011            assert_database_rows(&db, &rows);
1012            // overwrite broken value
1013            rows.push(write_kv_to_db(&db, TestingKV::new("k3", "hello")));
1014        }
1015
1016        let db = Database::open(
1017            &dir,
1018            storage_id_generator.clone(),
1019            Arc::new(get_database_options()),
1020        )
1021        .unwrap();
1022        assert_rows_value(&db, &rows);
1023        assert_database_rows(&db, &rows);
1024    }
1025
1026    #[test]
1027    fn test_wrap_file() {
1028        let storage_id_generator = Arc::new(StorageIdGenerator::default());
1029        let dir = get_temporary_directory_path();
1030        let db = Database::open(
1031            &dir,
1032            storage_id_generator,
1033            Arc::new(
1034                BitcaskyOptions::default()
1035                    .max_data_file_size(120)
1036                    .init_data_file_capacity(100),
1037            ),
1038        )
1039        .unwrap();
1040        let kvs = vec![
1041            TestingKV::new("k1", "value1_value1_value1"),
1042            TestingKV::new("k2", "value2_value2_value2"),
1043            TestingKV::new("k3", "value3_value3_value3"),
1044            TestingKV::new("k1", "value4_value4_value4"),
1045        ];
1046        assert_eq!(0, db.stable_storages.len());
1047        let rows = write_kvs_to_db(&db, kvs);
1048        assert_rows_value(&db, &rows);
1049        assert_eq!(1, db.stable_storages.len());
1050        assert_database_rows(&db, &rows);
1051    }
1052}