graus_db/
graus_db.rs

1use crate::db_command::{Command, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14/// The `GrausDb` stores string key/value pairs.
15///
16/// Key/value pairs are persisted to disk in log files. Log files are named after
17/// monotonically increasing generation numbers with a `log` extension name.
18/// A `SkipMap` in memory stores the keys and the value locations for fast query.
19///
20/// GrausDb is thead-safe. It can be cloned to use it on new threads.
21///
22/// ```rust
23/// # use graus_db::{GrausDb, Result};
24/// # fn try_main() -> Result<()> {
25/// use std::env::current_dir;
26///
27/// let store = GrausDb::open(current_dir()?)?;
28/// store.set(b"key".to_vec(), b"value".to_vec())?;
29/// let val = store.get(b"key")?;
30/// assert_eq!(val, Some(b"value".to_vec()));
31/// # Ok(())
32/// # }
33/// ```
34#[derive(Clone)]
35pub struct GrausDb {
36    // Index that maps every Key to a position in a log file.
37    index: Arc<SkipMap<Vec<u8>, CommandPos>>,
38    // Writes new data into the file system logs. Protected by a mutex.
39    writer: Arc<Mutex<LogWriter>>,
40    // Reads data from the file system logs.
41    reader: LogReader,
42}
43
44// TODO Ricardo update DOCS as now we don't use Strings
45
46// TODO Ricardo add clippy as linter
47
48impl GrausDb {
49    /// Opens a `GrausDb` with the given path.
50    ///
51    /// This will create a new directory if the given one does not exist.
52    ///
53    /// # Errors
54    ///
55    /// It propagates I/O or deserialization errors during the log replay.
56    pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
57        let path = Arc::new(path.into());
58        fs::create_dir_all(&*path)?;
59
60        let mut readers = HashMap::new();
61        let index = Arc::new(SkipMap::new());
62
63        let log_ids = get_log_ids(&path)?;
64        let mut uncompacted = 0;
65
66        for &log_id in &log_ids {
67            let log_path = log_path(&path, log_id);
68            let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
69            uncompacted += load_log(log_id, &mut reader, &index)?;
70            readers.insert(log_id, reader);
71        }
72
73        let new_log_id = log_ids.last().unwrap_or(&0) + 1;
74        let writer = new_log_file(&path, new_log_id)?;
75        let safe_point = Arc::new(AtomicU64::new(0));
76
77        let reader = LogReader {
78            path: Arc::clone(&path),
79            safe_point,
80            readers: RefCell::new(readers),
81        };
82
83        let writer = LogWriter {
84            writer,
85            index: Arc::clone(&index),
86            reader: reader.clone(),
87            current_log_id: new_log_id,
88            uncompacted,
89            path: Arc::clone(&path),
90        };
91
92        Ok(GrausDb {
93            reader,
94            index,
95            writer: Arc::new(Mutex::new(writer)),
96        })
97    }
98
99    /// Sets the value of a string key to a string.
100    ///
101    /// If the key already exists, the previous value will be overwritten.
102    pub fn set(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
103        self.writer.lock().unwrap().set(key, value)
104    }
105
106    /// Gets the string value of a given string key.
107    ///
108    /// Returns `None` if the given key does not exist.
109    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
110        if let Some(cmd_pos) = self.index.get(key) {
111            if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
112                Ok(Some(value))
113            } else {
114                Err(GrausError::UnexpectedCommandType)
115            }
116        } else {
117            Ok(None)
118        }
119    }
120
121    /// Removes a given key.
122    ///
123    /// Returns GrausError::KeyNotFound if the key does not exist.
124    pub fn remove(&self, key: Vec<u8>) -> Result<()> {
125        self.writer.lock().unwrap().remove(key)
126    }
127
128    /// Updates atomically an existing value.
129    ///
130    /// If predicate_key and predicate are provided, it won´t update the value if the predicate
131    /// is not satisfied for predicate_key.
132    pub fn update_if<F, P>(
133        &self,
134        key: Vec<u8>,
135        update_fn: F,
136        predicate_key: Option<&[u8]>,
137        predicate: Option<P>,
138    ) -> Result<()>
139    where
140        F: FnOnce(&mut Vec<u8>),
141        P: FnOnce(&[u8]) -> bool,
142    {
143        let mut writer = self.writer.lock().unwrap();
144        let current_value = self.get(&key)?;
145        let Some(current_value) = current_value else {
146            return Err(GrausError::KeyNotFound);
147        };
148
149        if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
150            let current_predicate_key_value = self.get(predicate_key)?;
151            let Some(current_predicate_key_value) = current_predicate_key_value else {
152                return Err(GrausError::KeyNotFound);
153            };
154            if !predicate(&current_predicate_key_value) {
155                return Err(GrausError::PredicateNotSatisfied);
156            }
157        }
158
159        let mut current_value_mut = current_value;
160        update_fn(&mut current_value_mut);
161        writer.set(key, current_value_mut)
162    }
163}