graus_db/
graus_db.rs

1use crate::db_command::{Command, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14/// The `GrausDb` stores string key/value pairs.
15///
16/// Key/value pairs are persisted to disk in log files. Log files are named after
17/// monotonically increasing generation numbers with a `log` extension name.
18/// A `SkipMap` in memory stores the keys and the value locations for fast query.
19///
20/// GrausDb is thead-safe. It can be cloned to use it on new threads.
21///
22/// ```rust
23/// # use graus_db::{GrausDb, Result};
24/// # fn try_main() -> Result<()> {
25/// use std::env::current_dir;
26/// let store = GrausDb::open(current_dir()?)?;
27/// store.set("key".to_owned(), "value".to_owned())?;
28/// let val = store.get("key".to_owned())?;
29/// assert_eq!(val, Some("value".to_owned()));
30/// # Ok(())
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct GrausDb {
35    // Index that maps every Key to a position in a log file.
36    index: Arc<SkipMap<String, CommandPos>>,
37    // Writes new data into the file system logs. Protected by a mutex.
38    writer: Arc<Mutex<LogWriter>>,
39    // Reads data from the file system logs.
40    reader: LogReader,
41}
42
43impl GrausDb {
44    /// Opens a `GrausDb` with the given path.
45    ///
46    /// This will create a new directory if the given one does not exist.
47    ///
48    /// # Errors
49    ///
50    /// It propagates I/O or deserialization errors during the log replay.
51    pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
52        let path = Arc::new(path.into());
53        fs::create_dir_all(&*path)?;
54
55        let mut readers = HashMap::new();
56        let index = Arc::new(SkipMap::new());
57
58        let log_ids = get_log_ids(&path)?;
59        let mut uncompacted = 0;
60
61        for &log_id in &log_ids {
62            let log_path = log_path(&path, log_id);
63            let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
64            uncompacted += load_log(log_id, &mut reader, &*index)?;
65            readers.insert(log_id, reader);
66        }
67
68        let new_log_id = log_ids.last().unwrap_or(&0) + 1;
69        let writer = new_log_file(&path, new_log_id)?;
70        let safe_point = Arc::new(AtomicU64::new(0));
71
72        let reader = LogReader {
73            path: Arc::clone(&path),
74            safe_point,
75            readers: RefCell::new(readers),
76        };
77
78        let writer = LogWriter {
79            writer,
80            index: Arc::clone(&index),
81            reader: reader.clone(),
82            current_log_id: new_log_id,
83            uncompacted,
84            path: Arc::clone(&path),
85        };
86
87        Ok(GrausDb {
88            reader,
89            index,
90            writer: Arc::new(Mutex::new(writer)),
91        })
92    }
93
94    /// Sets the value of a string key to a string.
95    ///
96    /// If the key already exists, the previous value will be overwritten.
97    pub fn set(&self, key: String, value: String) -> Result<()> {
98        self.writer.lock().unwrap().set(key, value)
99    }
100
101    /// Gets the string value of a given string key.
102    ///
103    /// Returns `None` if the given key does not exist.
104    pub fn get(&self, key: String) -> Result<Option<String>> {
105        if let Some(cmd_pos) = self.index.get(&key) {
106            if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
107                Ok(Some(value))
108            } else {
109                Err(GrausError::UnexpectedCommandType)
110            }
111        } else {
112            Ok(None)
113        }
114    }
115
116    /// Removes a given key.
117    ///
118    /// Returns GrausError::KeyNotFound if the key does not exist.
119    pub fn remove(&self, key: String) -> Result<()> {
120        self.writer.lock().unwrap().remove(key)
121    }
122
123    /// Updates atomically an existing value.
124    ///
125    /// If predicate_key and predicate are provided, it won´t update the value if the predicate
126    /// is not satisfied for predicate_key.
127    pub fn update_if<F, P>(
128        &self,
129        key: String,
130        update_fn: F,
131        predicate_key: Option<String>,
132        predicate: Option<P>,
133    ) -> Result<()>
134    where
135        F: FnOnce(String) -> String,
136        P: FnOnce(String) -> bool,
137    {
138        let mut writer = self.writer.lock().unwrap();
139        let current_value = self.get(key.to_owned())?;
140        let Some(current_value) = current_value else {
141            return Err(GrausError::KeyNotFound);
142        };
143
144        if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
145            let current_predicate_key_value = self.get(predicate_key)?;
146            let Some(current_predicate_key_value) = current_predicate_key_value else {
147                return Err(GrausError::KeyNotFound);
148            };
149            if !predicate(current_predicate_key_value) {
150                return Err(GrausError::PredicateNotSatisfied);
151            }
152        }
153
154        let updated_value = update_fn(current_value);
155        writer.set(key, updated_value)
156    }
157}