graus_db/
graus_db.rs

1use crate::db_command::{CommandOwned, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14/// The `GrausDb` stores string key/value pairs.
15///
16/// Key/value pairs are persisted to disk in log files. Log files are named after
17/// monotonically increasing generation numbers with a `log` extension name.
18/// A `SkipMap` in memory stores the keys and the value locations for fast query.
19///
20/// GrausDb is thead-safe. It can be cloned to use it on new threads.
21///
22/// ```rust
23/// # use graus_db::{GrausDb, Result};
24/// # fn try_main() -> Result<()> {
25/// use std::env::current_dir;
26///
27/// let store = GrausDb::open(current_dir()?)?;
28/// store.set(b"key".to_vec(), b"value")?;
29/// let val = store.get(b"key")?;
30/// assert_eq!(val, Some(b"value".to_vec()));
31/// # Ok(())
32/// # }
33/// ```
34#[derive(Clone)]
35pub struct GrausDb {
36    // Index that maps every Key to a position in a log file.
37    index: Arc<SkipMap<Vec<u8>, CommandPos>>,
38    // Writes new data into the file system logs. Protected by a mutex.
39    writer: Arc<Mutex<LogWriter>>,
40    // Reads data from the file system logs.
41    reader: LogReader,
42}
43
44impl GrausDb {
45    /// Opens a `GrausDb` with the given path.
46    ///
47    /// This will create a new directory if the given one does not exist.
48    ///
49    /// # Errors
50    ///
51    /// It propagates I/O or deserialization errors during the log replay.
52    pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
53        let path = Arc::new(path.into());
54        fs::create_dir_all(&*path)?;
55
56        let mut readers = HashMap::new();
57        let index = Arc::new(SkipMap::new());
58
59        let log_ids = get_log_ids(&path)?;
60        let mut uncompacted = 0;
61
62        for &log_id in &log_ids {
63            let log_path = log_path(&path, log_id);
64            let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
65            uncompacted += load_log(log_id, &mut reader, &index)?;
66            readers.insert(log_id, reader);
67        }
68
69        let new_log_id = log_ids.last().unwrap_or(&0) + 1;
70        let writer = new_log_file(&path, new_log_id)?;
71        let safe_point = Arc::new(AtomicU64::new(0));
72
73        let reader = LogReader {
74            path: Arc::clone(&path),
75            safe_point,
76            readers: RefCell::new(readers),
77        };
78
79        let writer = LogWriter {
80            writer,
81            index: Arc::clone(&index),
82            reader: reader.clone(),
83            current_log_id: new_log_id,
84            uncompacted,
85            path: Arc::clone(&path),
86        };
87
88        Ok(GrausDb {
89            reader,
90            index,
91            writer: Arc::new(Mutex::new(writer)),
92        })
93    }
94
95    /// Sets the value of a string key to a string.
96    ///
97    /// If the key already exists, the previous value will be overwritten.
98    pub fn set(&self, key: Vec<u8>, value: &[u8]) -> Result<()> {
99        self.writer.lock().unwrap().set(key, value)
100    }
101
102    /// Gets the string value of a given string key.
103    ///
104    /// Returns `None` if the given key does not exist.
105    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
106        if let Some(cmd_pos) = self.index.get(key) {
107            if let CommandOwned::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
108                Ok(Some(value))
109            } else {
110                Err(GrausError::UnexpectedCommandType)
111            }
112        } else {
113            Ok(None)
114        }
115    }
116
117    /// Removes a given key.
118    ///
119    /// Returns GrausError::KeyNotFound if the key does not exist.
120    pub fn remove(&self, key: &[u8]) -> Result<()> {
121        self.writer.lock().unwrap().remove(key)
122    }
123
124    /// Updates atomically an existing value.
125    ///
126    /// If predicate_key and predicate are provided, it won´t update the value if the predicate
127    /// is not satisfied for predicate_key.
128    pub fn update_if<F, P>(
129        &self,
130        key: Vec<u8>,
131        update_fn: F,
132        predicate_key: Option<&[u8]>,
133        predicate: Option<P>,
134    ) -> Result<()>
135    where
136        F: FnOnce(&mut Vec<u8>),
137        P: FnOnce(&[u8]) -> bool,
138    {
139        let mut writer = self.writer.lock().unwrap();
140        let current_value = self.get(&key)?;
141        let Some(current_value) = current_value else {
142            return Err(GrausError::KeyNotFound);
143        };
144
145        if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
146            let current_predicate_key_value = self.get(predicate_key)?;
147            let Some(current_predicate_key_value) = current_predicate_key_value else {
148                return Err(GrausError::KeyNotFound);
149            };
150            if !predicate(&current_predicate_key_value) {
151                return Err(GrausError::PredicateNotSatisfied);
152            }
153        }
154
155        let mut current_value_mut = current_value;
156        update_fn(&mut current_value_mut);
157        writer.set(key, &current_value_mut)
158    }
159}