leetup_cache/
kvstore.rs

1use anyhow;
2use serde::{Deserialize, Serialize};
3use serde_json::{self, Deserializer};
4use std::collections::{BTreeMap, HashMap};
5use std::ffi::OsStr;
6use std::fs::{self, File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
8use std::ops::Range;
9use std::path::{Path, PathBuf};
10
11pub type Result<T> = anyhow::Result<T>;
12
13// This constant is used for invoking log compaction
14const COMPACTION_THRESHOLD: u64 = 1024 * 1024;
15
16/// The `KvStore` stores string key/value pairs.
17///
18/// Key/value pairs are persisted to disk in log files. Log files are named after
19/// monotonically increasing generation numbers with a `log` extension name.
20/// A `BTreeMap` in memory stores the keys and the value locations for fast query.
21///
22/// ```rust
23/// # use yakv::{KvStore, Result};
24/// # fn try_main() -> Result<()> {
25/// use std::env::current_dir;
26/// let mut store = KvStore::open(current_dir()?)?;
27/// store.set("key".to_owned(), "value".to_owned())?;
28/// let val = store.get("key".to_owne())?;
29/// assert_eq!(val, Some("value".to_owned()));
30/// # Ok(())
31/// # }
32/// ```
33pub struct KvStore {
34    path: PathBuf,
35    current_id: u64,
36    writer: BufWriterWithPos<File>,
37    readers: HashMap<u64, BufReaderWithPos<File>>,
38    index: BTreeMap<String, CommandPos>,
39    stale_data: u64,
40}
41
42impl KvStore {
43    /// Opens a KvStore with the given path.
44    pub fn open<T: Into<PathBuf>>(path: T) -> Result<Self> {
45        // try to load all log files in the given path
46        // if it failed then create a log file with an id suffix-ed to the file
47        // e.g. key-1.log, key-2.log, key-3.log, etc
48        // after loading all the logs, build the index in-memory
49        let path = path.into();
50        fs::create_dir_all(&path)?;
51
52        let mut readers = HashMap::new();
53        let mut index = BTreeMap::new();
54        let mut stale_data = 0;
55
56        let ids = sorted_ids(&path)?;
57        // println!("IDS: {:?}", ids);
58        for &id in &ids {
59            let mut reader = BufReaderWithPos::new(File::open(log_path(&path, id))?)?;
60            stale_data += load_log(id, &mut reader, &mut index)?;
61            readers.insert(id, reader);
62        }
63
64        let current_id = ids.last().unwrap_or(&0) + 1;
65        let writer = create_log_file(current_id, &path, &mut readers)?;
66
67        Ok(KvStore {
68            path,
69            current_id,
70            writer,
71            readers,
72            index,
73            stale_data,
74        })
75    }
76
77    /// Sets the value of s string key to a string.
78    pub fn set(&mut self, key: String, value: String) -> Result<()> {
79        let cmd = Command::set(key, value);
80        let pos = self.writer.pos;
81        serde_json::to_writer(&mut self.writer, &cmd)?;
82        self.writer.flush()?;
83
84        if let Command::Set { key, .. } = cmd {
85            if let Some(old_cmd) = self.index.insert(
86                key,
87                CommandPos::from((self.current_id, pos..self.writer.pos)),
88            ) {
89                self.stale_data += old_cmd.len;
90            }
91        }
92
93        // Handle log compaction
94        if self.stale_data > COMPACTION_THRESHOLD {
95            self.compact()?;
96        }
97
98        Ok(())
99    }
100
101    /// Gets the string value for a given key.
102    pub fn get(&mut self, key: String) -> Result<Option<String>> {
103        // println!("{:?}", self.index);
104        if let Some(cmd_pos) = self.index.get(&key) {
105            let reader = self
106                .readers
107                .get_mut(&cmd_pos.id)
108                .expect("Cannot find reader");
109
110            reader.seek(SeekFrom::Start(cmd_pos.pos))?;
111            let cmd_reader = reader.take(cmd_pos.len);
112            if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? {
113                return Ok(Some(value));
114            } else {
115                return Err(anyhow::Error::msg("Unexpected command"));
116            }
117        }
118        Ok(None)
119    }
120
121    /// Check if key exists in the cache
122    pub fn has_key(&self, key: String) -> bool {
123        self.index.contains_key(&key)
124    }
125
126    /// Removes the given key.
127    pub fn remove(&mut self, key: String) -> Result<()> {
128        // check if key exist in index and delete if from the log file
129        if self.index.contains_key(&key) {
130            let cmd = Command::remove(key.to_owned());
131            serde_json::to_writer(&mut self.writer, &cmd)?;
132            self.writer.flush()?;
133            let old_cmd = self.index.remove(&key).expect("Key not found");
134            self.stale_data += old_cmd.len;
135            Ok(())
136        } else {
137            Err(anyhow::Error::msg("Key not found"))
138        }
139    }
140
141    fn compact(&mut self) -> Result<()> {
142        // increment id by 1
143        // this will be used by compaction writer
144        let compaction_id = self.current_id + 1;
145        self.current_id += 2;
146        self.writer = create_log_file(self.current_id, &self.path, &mut self.readers)?;
147        let mut compaction_writer = create_log_file(compaction_id, &self.path, &mut self.readers)?;
148
149        let mut new_pos = 0;
150        for cmd_pos in &mut self.index.values_mut() {
151            let cmd_reader = self.readers.get_mut(&cmd_pos.id).expect("reader not found");
152            if cmd_reader.pos != cmd_pos.pos {
153                cmd_reader.seek(SeekFrom::Start(cmd_pos.pos))?;
154            }
155
156            let mut cmd_reader = cmd_reader.take(cmd_pos.len);
157            let len = io::copy(&mut cmd_reader, &mut compaction_writer)?;
158            *cmd_pos = CommandPos::from((compaction_id, new_pos..new_pos + len));
159            new_pos += len;
160        }
161        compaction_writer.flush()?;
162
163        let stale_ids: Vec<_> = self
164            .readers
165            .keys()
166            .filter(|id| id < &&compaction_id)
167            .cloned()
168            .collect();
169
170        for stale_id in stale_ids {
171            self.readers.remove(&stale_id);
172            fs::remove_file(log_path(&self.path, stale_id))?;
173        }
174        self.stale_data = 0;
175
176        Ok(())
177    }
178}
179
180fn log_path<T: AsRef<Path>>(path: T, id: u64) -> PathBuf {
181    path.as_ref().join(format!("{}.log", id))
182}
183
184fn create_log_file(
185    id: u64,
186    path: &Path,
187    readers: &mut HashMap<u64, BufReaderWithPos<File>>,
188) -> Result<BufWriterWithPos<File>> {
189    let path = log_path(&path, id);
190    let writer = BufWriterWithPos::new(OpenOptions::new().create(true).append(true).open(&path)?)?;
191    readers.insert(id, BufReaderWithPos::new(File::open(&path)?)?);
192    Ok(writer)
193}
194
195// load a log and build index
196fn load_log(
197    id: u64,
198    reader: &mut BufReaderWithPos<File>,
199    index: &mut BTreeMap<String, CommandPos>,
200) -> Result<u64> {
201    let mut pos = reader.seek(SeekFrom::Start(0))?;
202    let mut stream = Deserializer::from_reader(reader).into_iter::<Command>();
203    let mut stale_data = 0;
204    // println!("ID: {}", id);
205    while let Some(cmd) = stream.next() {
206        let new_pos = stream.byte_offset() as u64;
207        match cmd? {
208            Command::Set { key, .. } => {
209                if let Some(old_cmd) = index.insert(key, CommandPos::from((id, pos..new_pos))) {
210                    stale_data += old_cmd.len;
211                }
212            }
213            Command::Remove { key } => {
214                if let Some(old_cmd) = index.remove(&key) {
215                    stale_data += old_cmd.len;
216                }
217
218                stale_data += new_pos - pos;
219            }
220        }
221        pos = new_pos;
222    }
223    Ok(stale_data)
224}
225
226// get all ids from the log files in a given path
227//
228// Returns sorted id numbers
229fn sorted_ids(path: &Path) -> Result<Vec<u64>> {
230    let mut ids: Vec<u64> = fs::read_dir(&path)?
231        .flat_map(|dir_entry| -> Result<_> { Ok(dir_entry?.path()) })
232        .filter(|path| path.is_file() && path.extension() == Some("log".as_ref()))
233        .filter_map(|path| {
234            path.file_name()
235                .and_then(OsStr::to_str)
236                .map(|s| s.trim_end_matches(".log"))
237                .map(str::parse::<u64>)
238        })
239        .flatten()
240        .collect();
241    ids.sort();
242    Ok(ids)
243}
244
245pub struct BufReaderWithPos<T: Read + Seek> {
246    reader: BufReader<T>,
247    pos: u64,
248}
249
250impl<T: Read + Seek> BufReaderWithPos<T> {
251    fn new(mut file: T) -> Result<Self> {
252        let pos = file.seek(SeekFrom::Current(0))?;
253        Ok(BufReaderWithPos {
254            reader: BufReader::new(file),
255            pos,
256        })
257    }
258}
259
260impl<T: Read + Seek> Read for BufReaderWithPos<T> {
261    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
262        let len = self.reader.read(buf)?;
263        self.pos += len as u64;
264        Ok(len)
265    }
266}
267
268impl<T: Read + Seek> Seek for BufReaderWithPos<T> {
269    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
270        self.pos = self.reader.seek(pos)?;
271        Ok(self.pos)
272    }
273}
274
275struct BufWriterWithPos<T: Write + Seek> {
276    writer: BufWriter<T>,
277    pos: u64,
278}
279
280impl<T: Write + Seek> BufWriterWithPos<T> {
281    fn new(mut file: T) -> Result<Self> {
282        let pos = file.seek(SeekFrom::Current(0))?;
283        Ok(BufWriterWithPos {
284            writer: BufWriter::new(file),
285            pos,
286        })
287    }
288}
289
290impl<T: Write + Seek> Write for BufWriterWithPos<T> {
291    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
292        let len = self.writer.write(buf)?;
293        self.pos += len as u64;
294        Ok(len)
295    }
296
297    fn flush(&mut self) -> io::Result<()> {
298        self.writer.flush()
299    }
300}
301
302impl<T: Write + Seek> Seek for BufWriterWithPos<T> {
303    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
304        self.pos = self.writer.seek(pos)?;
305        Ok(self.pos)
306    }
307}
308
309/// Represent KV store commands
310#[derive(Serialize, Deserialize, Debug)]
311enum Command {
312    Set { key: String, value: String },
313    Remove { key: String },
314}
315
316impl Command {
317    fn set(key: String, value: String) -> Self {
318        Command::Set { key, value }
319    }
320
321    fn remove(key: String) -> Self {
322        Command::Remove { key }
323    }
324}
325
326/// Position for Command in log file
327///
328/// Stores log file id, offset, and length
329#[derive(Debug)]
330struct CommandPos {
331    id: u64,
332    pos: u64,
333    len: u64,
334}
335
336impl From<(u64, Range<u64>)> for CommandPos {
337    fn from((id, range): (u64, Range<u64>)) -> Self {
338        CommandPos {
339            id,
340            pos: range.start,
341            len: range.end - range.start,
342        }
343    }
344}