rustcask/
lib.rs

1//! `Rustcask` is a fast and efficient key-value storage engine implemented in Rust.
2//! It's based on [Bitcask,
3//! "A Log-Structured Hash Table for Fast Key/Value Data"](https://riak.com/assets/bitcask-intro.pdf).
4//!
5//! For more details on the design of Rustcask, see [the README on Github](https://github.com/RyanStan/rustcask).
6//!
7//! # Example
8//! ```
9//! # use rustcask::Rustcask;
10//! # use tempfile::TempDir;
11//! # let temp_dir = TempDir::new().expect("unable to create temporary working directory");
12//! # let rustcask_dir = temp_dir.path();
13//! let mut store = Rustcask::builder().open(rustcask_dir).unwrap();
14//!
15//! let key = "leader-node".as_bytes().to_vec();
16//! let value = "instance-a".as_bytes().to_vec();
17//!
18//! store.set(key.clone(), value).unwrap();
19//! store.get(&key);
20//! ```
21
22use error::{
23    GetError, MergeError, MergeErrorKind, OpenError, OpenErrorKind, RemoveError,
24    SetError,
25};
26use keydir::KeyDir;
27use logfile::LogFileEntry;
28use readers::Readers;
29
30use log::{info, trace};
31use writer::Writer;
32
33use std::sync::{Arc, Mutex, RwLock};
34use std::{
35    io::{Seek, SeekFrom},
36    path::{Path, PathBuf},
37};
38
39use crate::error::GetErrorKind;
40
41/// Rustcask error types.
42pub mod error;
43
44mod bufio;
45mod keydir;
46mod logfile;
47mod readers;
48mod utils;
49mod writer;
50
51type GenerationNumber = u64;
52
53const MAX_DATA_FILE_SIZE: u64 = 2 * 1024 * 1024 * 1024; // 2 GiB
54
55/// A handle to interact with a Rustcask storage engine.
56#[derive(Clone, Debug)]
57pub struct Rustcask {
58    // Writes to active data file. Performs data file rotation as needed.
59    writer: Arc<Mutex<Writer>>,
60
61    // Data file readers
62    readers: Readers,
63
64    pub(crate) keydir: Arc<RwLock<KeyDir>>,
65
66    sync_mode: bool,
67
68    pub(crate) directory: Arc<PathBuf>,
69}
70
71impl Rustcask {
72    /// Returns a Rustcask builder with default configuration values.
73    pub fn builder() -> RustcaskBuilder {
74        RustcaskBuilder::default()
75    }
76
77    /// Inserts a key-value pair into Rustcask.
78    ///
79    ///  # Arguments
80    ///
81    /// * `key` - The key to insert, as a `Vec<u8>`.
82    /// * `value` - The value to associate with the key, as a `Vec<u8>`.
83    ///
84    /// # Returns
85    ///
86    /// * `Ok(())` if the key-value pair was successfully inserted.
87    /// * `Err(SetError)` if there was an error serializing the entry or writing to the data file.
88    ///
89    /// # Errors
90    ///
91    /// This function may return a `SetError` if:
92    ///
93    /// * The `LogFileEntry` could not be serialized (`SetErrorKind::Serialize`).
94    /// * There was an error writing to the active data file.
95    ///
96    /// # Panics
97    ///
98    /// This function will panic if another thread crashed while holding the lock on the key directory.
99    pub fn set(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), SetError> {
100        trace!(
101            "Set called with key (as UTF 8) {}",
102            String::from_utf8_lossy(&key)
103        );
104
105        let mut writer = self
106            .writer
107            .lock()
108            .expect("Another thread crashed while holding the writer lock. Panicking.");
109
110        writer.set(key, value)
111    }
112
113    /// Returns a reference to the value corresponding to the key.
114    ///
115    /// # Arguments
116    ///
117    /// * `key` - A reference to the `Vec<u8>` representing the key to look up.
118    ///
119    /// # Returns
120    ///
121    /// * `Ok(Some(value))` - If the key is found in the data store, returns the corresponding value as a `Vec<u8>`.
122    /// * `Ok(None)` - If the key is not found in the data store.
123    /// * `Err(GetError)` - If an error occurs while reading or deserializing the data from the data store.
124    ///
125    /// # Errors
126    ///
127    /// This function may return a `GetError` with the following variants:
128    ///
129    /// * `GetErrorKind::Io(err)` - An I/O error occurred while reading the data file.
130    /// * `GetErrorKind::Deserialize(err)` - An error occurred while deserializing the data from the data file.
131    pub fn get<'a>(&'a mut self, key: &'a Vec<u8>) -> Result<Option<Vec<u8>>, GetError<'a>> {
132        trace!(
133            "Get called with key (as UTF 8) {}",
134            String::from_utf8_lossy(key)
135        );
136        let keydir = self
137            .keydir
138            .read()
139            .expect("Another thread panicked while holding the keydir lock. Panicking.");
140        let keydir_entry = keydir.get(key);
141        if keydir_entry.is_none() {
142            return Ok(None);
143        }
144        let keydir_entry = keydir_entry.unwrap();
145
146        let reader = self
147            .readers
148            .get_data_file_reader(keydir_entry.data_file_gen);
149
150        // TODO [RyanStan 3-25-24] This code is duplicated in remove. Extract it into a separate function.
151        let log_index = &keydir_entry.index;
152        reader
153            .seek(SeekFrom::Start(log_index.offset))
154            .map_err(|err| GetError {
155                kind: GetErrorKind::Io(err),
156                key,
157            })?;
158
159        let data_file_entry: LogFileEntry =
160            bincode::deserialize_from(reader).map_err(|err| GetError {
161                kind: GetErrorKind::Deserialize(err),
162                key,
163            })?;
164
165        assert_eq!(
166            &data_file_entry.key, key,
167            "The deserialized entries key does not match the key passed to get. The data store could corrupted."
168        );
169
170        Ok(Some(data_file_entry.value.expect(
171            "We returned a tombstone value from get. We should have instead returned None. 
172            The data store may not be corrupted - this indicates a programming bug.",
173        )))
174    }
175
176    /// Removes a key-value pair from the database.
177    ///
178    /// This function takes a `key` as input and removes the corresponding key-value pair from the
179    /// database. If the key exists, it returns the previously associated value. If the key does not
180    /// exist, it returns `None`.
181    ///
182    /// # Arguments
183    ///
184    /// * `key` - The key to remove, as a `Vec<u8>`.
185    ///
186    /// # Returns
187    ///
188    /// * `Ok(Some(value))` if the key existed and was removed, containing the previously associated
189    ///   value.
190    /// * `Ok(None)` if the key did not exist in the database.
191    /// * `Err(RemoveError)` if there was an error removing the key.
192    ///
193    /// # Errors
194    ///
195    /// This function may return a `RemoveError` if:
196    ///
197    /// * There was an I/O error seeking or reading from the data file (`RemoveErrorKind::Io`).
198    /// * There was an error deserializing the log entry from the data file (`RemoveErrorKind::Deserialize`).
199    ///
200    /// # Panics
201    ///
202    /// This function will panic if another thread crashed while holding the lock on the key directory.
203    pub fn remove(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, RemoveError> {
204        trace!(
205            "Remove called with key (as UTF 8) {}",
206            String::from_utf8_lossy(&key)
207        );
208        let mut writer = self
209            .writer
210            .lock()
211            .expect("Another thread crashed while holding the writer lock. Panicking.");
212
213        writer.remove(key)
214    }
215
216    /// Compacts the rustcask directory be writing active key-value pairs
217    /// to a new set of data files, and removes old data files which may have contained
218    /// dead values.
219    /// 
220    /// # Errors
221    ///
222    /// This function may return a `MergeError` with the following variants:
223    ///
224    /// * `MergeErrorKind::OutsideMergeWindow` - The merge operation was attempted outside of the allowed merge window.
225    ///   The `merge_generation` field in this case indicates the next generation number when a merge will be allowed.
226    /// * `MergeErrorKind::Io(err)` - An I/O error occurred while reading or writing data files during the merge operation.
227    /// 
228    /// Reads can be performed concurrently with merges. However, writes will be blocked
229    /// until the merge is complete.
230    pub fn merge(&mut self) -> Result<(), MergeError> {
231        // TODO [RyanStan 07/08/24] Instead of relying on the user to call merge,
232        //   the open function should spawn a background thread that performs merging based on
233        //   a configured interval.
234
235        // Locking the writer prevents concurrent writes
236        let mut writer = self
237            .writer
238            .lock()
239            .expect("Another thread crashed while holding the writer lock. Panicking.");
240
241        if !writer.can_merge() {
242            return Err(MergeError {
243                kind: MergeErrorKind::OutsideMergeWindow,
244                merge_generation: writer.get_active_generation() + 1,
245            });
246        }
247
248        writer.merge()?;
249
250        // TODO [RyanStan 07/17/24] Output stats about the number of bytes saved.
251        info!("Merged data files.");
252
253        Ok(())
254    }
255
256    // Get active generation and get active data file size are for testing
257    fn get_active_generation(&self) -> GenerationNumber {
258        let writer = self.writer.lock().expect(
259            "Another thread crashed while holding the writer lock. \
260                Panicking because the write lock is required to get the active generation.",
261        );
262        writer.get_active_generation()
263    }
264
265    fn get_active_data_file_size(&self) -> u64 {
266        let writer = self.writer.lock().expect(
267            "Another thread crashed while holding the writer lock. \
268                Panicking because the write lock is required to get the active data file size.",
269        );
270        writer.get_active_data_file_size()
271    }
272}
273
274/// Simplifies configuration and creation of Rustcask instances.
275/// 
276/// # Example
277/// ```
278/// # use rustcask::Rustcask;
279/// # use tempfile::TempDir;
280/// # let temp_dir = TempDir::new().unwrap();
281/// # let rustcask_dir = temp_dir.path();
282/// let store = Rustcask::builder()
283///     .set_sync_mode(true)
284///     .open(rustcask_dir);
285/// ```
286pub struct RustcaskBuilder {
287    max_data_file_size: u64,
288
289    /// When sync mode is true, writes to the data file
290    /// are fsync'ed before returning to the user.
291    /// This guarantees that data is durable and persisted to disk immediately,
292    /// at the expense of reduced performance
293    sync_mode: bool,
294}
295
296impl Default for RustcaskBuilder {
297    fn default() -> Self {
298        Self {
299            max_data_file_size: MAX_DATA_FILE_SIZE,
300            sync_mode: false,
301        }
302    }
303}
304
305impl RustcaskBuilder {
306    /// Sets the maximum data file size. When the active data file
307    /// surpasses this size, it will be marked read-only and a new active data file
308    /// will be created.
309    pub fn set_max_data_file_size(mut self, max_size: u64) -> Self {
310        self.max_data_file_size = max_size;
311        self
312    }
313
314    /// When sync mode is set to true, writes to the data file
315    /// are fsync'ed before returning to the user.
316    /// This guarantees that data is durable and persisted to disk immediately,
317    /// at the expense of reduced performance
318    pub fn set_sync_mode(mut self, sync_mode: bool) -> Self {
319        self.sync_mode = sync_mode;
320        self
321    }
322
323    /// Generates a Rustcask instance.
324    pub fn open(self, rustcask_dir: &Path) -> Result<Rustcask, OpenError> {
325        trace!(
326            "Open called on directory {}",
327            rustcask_dir.to_string_lossy().to_string()
328        );
329        let rustcask_dir = Arc::new(PathBuf::from(rustcask_dir));
330
331        if !rustcask_dir.is_dir() {
332            return Err(OpenError {
333                kind: OpenErrorKind::BadDirectory,
334                rustcask_dir: rustcask_dir.to_string_lossy().to_string(),
335            });
336        }
337
338        let data_file_readers = Readers::new(rustcask_dir.clone()).map_err(|err| OpenError {
339            kind: OpenErrorKind::Io(err),
340            rustcask_dir: rustcask_dir.to_string_lossy().to_string(),
341        })?;
342
343        let keydir = Arc::new(RwLock::new(KeyDir::new(&rustcask_dir)?));
344
345        let writer = Arc::new(Mutex::new(Writer::new(
346            self.sync_mode,
347            self.max_data_file_size,
348            rustcask_dir.clone(),
349            keydir.clone(),
350            data_file_readers.clone(),
351        )?));
352
353        info!(
354            "Opened Rustcask directory {}. Max data file size: {}. Number of existing data files: {}. Active generation: {}. Sync mode: {}.",
355            rustcask_dir.to_string_lossy().to_string(),
356            self.max_data_file_size,
357            data_file_readers.data_file_readers.len(),
358            writer.lock().unwrap().get_active_generation(),
359            self.sync_mode
360        );
361
362        Ok(Rustcask {
363            readers: data_file_readers,
364            directory: rustcask_dir,
365            keydir,
366            sync_mode: self.sync_mode,
367            writer,
368        })
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use std::
375        fs::File
376    ;
377
378    use super::*;
379    use logfile::LogFileIterator;
380    use tempfile::{tempdir, TempDir};
381    use utils::{list_generations, tests::{file_names, get_keys, get_keys_values}};
382
383    #[test]
384    fn test_open() {
385        let dir = tempdir().unwrap();
386
387        for number in 1..=5 {
388            File::create(dir.path().join(format!("{}.rustcask.data", number))).unwrap();
389            File::create(dir.path().join(format!("{}.rustcask.hint", number))).unwrap();
390        }
391
392        let rustcask = Rustcask::builder().open(dir.path()).unwrap();
393
394        assert_eq!(rustcask.get_active_generation(), 5);
395    }
396
397    #[test]
398    fn test_open_on_empty_dir() {
399        let dir = tempdir().unwrap();
400        let rustcask = Rustcask::builder().open(dir.path()).unwrap();
401        assert_eq!(rustcask.get_active_generation(), 0);
402    }
403
404    #[test]
405    fn test_open_non_existent_dir() {
406        let dir = tempdir().unwrap();
407        let invalid_dir = dir.path().join("invalid-dir");
408        let rustcask = Rustcask::builder().open(&invalid_dir);
409        assert!(matches!(
410            rustcask,
411            Err(OpenError {
412                kind: OpenErrorKind::BadDirectory,
413                ..
414            })
415        ));
416    }
417
418    #[test]
419    fn test_data_file_rotation() {
420        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
421        let temp_dir_path = temp_dir.path();
422        // Force log file rotation by setting the max data file size to one byte
423        let mut store = Rustcask::builder()
424            .set_max_data_file_size(1)
425            .open(temp_dir_path)
426            .unwrap();
427
428        let keys = ["key1".as_bytes().to_vec(), "key2".as_bytes().to_vec()];
429        let values = ["value1".as_bytes().to_vec(), "value2".as_bytes().to_vec()];
430
431        assert_eq!(store.get_active_generation(), 0);
432        assert_eq!(store.get_active_data_file_size(), 0);
433
434        store.set(keys[0].clone(), values[0].clone()).unwrap();
435
436        assert_eq!(store.get_active_generation(), 1);
437        assert_eq!(store.get_active_data_file_size(), 0);
438        assert_eq!(
439            store.get(&keys[0].clone()).unwrap(),
440            Some(values[0].clone())
441        );
442
443        let data_files = file_names(temp_dir_path);
444        assert!(
445            data_files.contains(&String::from("0.rustcask.data"))
446                && data_files.contains(&String::from("1.rustcask.data"))
447        );
448    }
449
450    #[test]
451    fn test_merge_internal() {
452        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
453        let temp_dir_path = temp_dir.path();
454        let mut store = Rustcask::builder().open(temp_dir_path).unwrap();
455
456        store
457            .set(
458                "leader".as_bytes().to_vec(),
459                "instance-a".as_bytes().to_vec(),
460            )
461            .unwrap();
462        store
463            .set(
464                "leader".as_bytes().to_vec(),
465                "instance-b".as_bytes().to_vec(),
466            )
467            .unwrap();
468
469        let expected_data_files = vec!["0.rustcask.data"];
470        let data_files = file_names(temp_dir_path);
471        assert_eq!(data_files, expected_data_files);
472
473        let log_file_keys = get_keys(temp_dir_path, &data_files[0]);
474        assert_eq!(log_file_keys.len(), 2);
475        assert_eq!(
476            log_file_keys,
477            vec!["leader".as_bytes().to_vec(), "leader".as_bytes().to_vec()]
478        );
479
480        store.merge().unwrap();
481
482        let expected_data_files = vec!["1.rustcask.data"];
483        let data_files = file_names(temp_dir_path);
484        assert_eq!(data_files, expected_data_files);
485
486        let log_file_iter = LogFileIterator::new(temp_dir_path.join("1.rustcask.data")).unwrap();
487
488        let log_file_entries: Vec<(Vec<u8>, Vec<u8>)> = log_file_iter
489            .map(|x| (x.0.key, x.0.value.unwrap()))
490            .collect();
491
492        assert_eq!(log_file_entries.len(), 1);
493        assert_eq!(log_file_entries[0].0, "leader".as_bytes().to_vec());
494        assert_eq!(log_file_entries[0].1, "instance-b".as_bytes().to_vec());
495    }
496
497    #[test]
498    fn test_data_file_rotation_cloned_stores() {
499        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
500        let temp_dir_path = temp_dir.path();
501        // Force log file rotation by setting the max data file size to one byte
502        let mut store = Rustcask::builder()
503            .set_max_data_file_size(1)
504            .open(temp_dir_path)
505            .unwrap();
506        let mut store_clone = store.clone();
507
508        store
509            .set("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())
510            .unwrap();
511        store_clone
512            .set("key2".as_bytes().to_vec(), "value2".as_bytes().to_vec())
513            .unwrap();
514
515        let log_file_keys = get_keys_values(temp_dir_path, &String::from("0.rustcask.data"));
516        assert_eq!(log_file_keys.len(), 1);
517        assert_eq!(
518            log_file_keys,
519            vec![("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())]
520        );
521
522        let log_file_keys = get_keys_values(temp_dir_path, &String::from("1.rustcask.data"));
523        assert_eq!(log_file_keys.len(), 1);
524        assert_eq!(
525            log_file_keys,
526            vec![("key2".as_bytes().to_vec(), "value2".as_bytes().to_vec())]
527        );
528    }
529
530    #[test]
531    fn test_merge_with_rotate() {
532        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
533        let temp_dir_path = temp_dir.path();
534        let mut store = Rustcask::builder()
535            .set_max_data_file_size(1)
536            .open(temp_dir_path)
537            .unwrap();
538
539        store
540            .set(
541                "leader".as_bytes().to_vec(),
542                "instance-a".as_bytes().to_vec(),
543            )
544            .unwrap();
545        store
546            .set(
547                "last-election-ts".as_bytes().to_vec(),
548                "00:00".as_bytes().to_vec(),
549            )
550            .unwrap();
551        store
552            .set(
553                "leader".as_bytes().to_vec(),
554                "instance-b".as_bytes().to_vec(),
555            )
556            .unwrap();
557
558        check_generations(temp_dir_path, vec![0, 1, 2, 3]);
559        store.merge().unwrap();
560        check_generations(temp_dir_path, vec![4, 5, 6]);
561
562        drop(store);
563        let mut store = Rustcask::builder()
564            .set_max_data_file_size(1)
565            .open(temp_dir_path)
566            .unwrap();
567        assert_eq!(
568            store.get(&"leader".as_bytes().to_vec()).unwrap(),
569            Some("instance-b".as_bytes().to_vec())
570        );
571        assert_eq!(
572            store.get(&"last-election-ts".as_bytes().to_vec()).unwrap(),
573            Some("00:00".as_bytes().to_vec())
574        );
575    }
576
577    #[test]
578    fn test_active_gen_update() {
579        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
580        let temp_dir_path = temp_dir.path();
581        let mut store = Rustcask::builder()
582            .set_max_data_file_size(1)
583            .open(temp_dir_path)
584            .unwrap();
585
586        let mut store_b = store.clone();
587
588        store
589            .set(
590                "leader".as_bytes().to_vec(),
591                "instance-a".as_bytes().to_vec(),
592            )
593            .unwrap();
594        assert_eq!(store.get_active_generation(), 1); // Both stores share the same Writer, so they should see the same active generation
595        assert_eq!(store_b.get_active_generation(), 1);
596
597        // If the active generation is not correctly shared among stores, then
598        // this will update the keydir with the incorrect generation. Thus, the following get for the key
599        // will read the wrong data file.
600        store_b
601            .set("key".as_bytes().to_vec(), "value".as_bytes().to_vec())
602            .unwrap();
603        assert_eq!(
604            store.get(&"key".as_bytes().to_vec()).unwrap(),
605            Some("value".as_bytes().to_vec()),
606        )
607    }
608
609    fn check_generations(temp_dir_path: &Path, expected_generations: Vec<GenerationNumber>) {
610        let mut generations: Vec<GenerationNumber> = list_generations(temp_dir_path).unwrap();
611        generations.sort_unstable();
612        assert_eq!(generations, expected_generations);
613    }
614}