1use anyhow;
2use serde::{Deserialize, Serialize};
3use serde_json::{self, Deserializer};
4use std::collections::{BTreeMap, HashMap};
5use std::ffi::OsStr;
6use std::fs::{self, File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
8use std::ops::Range;
9use std::path::{Path, PathBuf};
10
11pub type Result<T> = anyhow::Result<T>;
12
13const COMPACTION_THRESHOLD: u64 = 1024 * 1024;
15
16pub struct KvStore {
34 path: PathBuf,
35 current_id: u64,
36 writer: BufWriterWithPos<File>,
37 readers: HashMap<u64, BufReaderWithPos<File>>,
38 index: BTreeMap<String, CommandPos>,
39 stale_data: u64,
40}
41
42impl KvStore {
43 pub fn open<T: Into<PathBuf>>(path: T) -> Result<Self> {
45 let path = path.into();
50 fs::create_dir_all(&path)?;
51
52 let mut readers = HashMap::new();
53 let mut index = BTreeMap::new();
54 let mut stale_data = 0;
55
56 let ids = sorted_ids(&path)?;
57 for &id in &ids {
59 let mut reader = BufReaderWithPos::new(File::open(log_path(&path, id))?)?;
60 stale_data += load_log(id, &mut reader, &mut index)?;
61 readers.insert(id, reader);
62 }
63
64 let current_id = ids.last().unwrap_or(&0) + 1;
65 let writer = create_log_file(current_id, &path, &mut readers)?;
66
67 Ok(KvStore {
68 path,
69 current_id,
70 writer,
71 readers,
72 index,
73 stale_data,
74 })
75 }
76
77 pub fn set(&mut self, key: String, value: String) -> Result<()> {
79 let cmd = Command::set(key, value);
80 let pos = self.writer.pos;
81 serde_json::to_writer(&mut self.writer, &cmd)?;
82 self.writer.flush()?;
83
84 if let Command::Set { key, .. } = cmd {
85 if let Some(old_cmd) = self.index.insert(
86 key,
87 CommandPos::from((self.current_id, pos..self.writer.pos)),
88 ) {
89 self.stale_data += old_cmd.len;
90 }
91 }
92
93 if self.stale_data > COMPACTION_THRESHOLD {
95 self.compact()?;
96 }
97
98 Ok(())
99 }
100
101 pub fn get(&mut self, key: String) -> Result<Option<String>> {
103 if let Some(cmd_pos) = self.index.get(&key) {
105 let reader = self
106 .readers
107 .get_mut(&cmd_pos.id)
108 .expect("Cannot find reader");
109
110 reader.seek(SeekFrom::Start(cmd_pos.pos))?;
111 let cmd_reader = reader.take(cmd_pos.len);
112 if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? {
113 return Ok(Some(value));
114 } else {
115 return Err(anyhow::Error::msg("Unexpected command"));
116 }
117 }
118 Ok(None)
119 }
120
121 pub fn has_key(&self, key: String) -> bool {
123 self.index.contains_key(&key)
124 }
125
126 pub fn remove(&mut self, key: String) -> Result<()> {
128 if self.index.contains_key(&key) {
130 let cmd = Command::remove(key.to_owned());
131 serde_json::to_writer(&mut self.writer, &cmd)?;
132 self.writer.flush()?;
133 let old_cmd = self.index.remove(&key).expect("Key not found");
134 self.stale_data += old_cmd.len;
135 Ok(())
136 } else {
137 Err(anyhow::Error::msg("Key not found"))
138 }
139 }
140
141 fn compact(&mut self) -> Result<()> {
142 let compaction_id = self.current_id + 1;
145 self.current_id += 2;
146 self.writer = create_log_file(self.current_id, &self.path, &mut self.readers)?;
147 let mut compaction_writer = create_log_file(compaction_id, &self.path, &mut self.readers)?;
148
149 let mut new_pos = 0;
150 for cmd_pos in &mut self.index.values_mut() {
151 let cmd_reader = self.readers.get_mut(&cmd_pos.id).expect("reader not found");
152 if cmd_reader.pos != cmd_pos.pos {
153 cmd_reader.seek(SeekFrom::Start(cmd_pos.pos))?;
154 }
155
156 let mut cmd_reader = cmd_reader.take(cmd_pos.len);
157 let len = io::copy(&mut cmd_reader, &mut compaction_writer)?;
158 *cmd_pos = CommandPos::from((compaction_id, new_pos..new_pos + len));
159 new_pos += len;
160 }
161 compaction_writer.flush()?;
162
163 let stale_ids: Vec<_> = self
164 .readers
165 .keys()
166 .filter(|id| id < &&compaction_id)
167 .cloned()
168 .collect();
169
170 for stale_id in stale_ids {
171 self.readers.remove(&stale_id);
172 fs::remove_file(log_path(&self.path, stale_id))?;
173 }
174 self.stale_data = 0;
175
176 Ok(())
177 }
178}
179
180fn log_path<T: AsRef<Path>>(path: T, id: u64) -> PathBuf {
181 path.as_ref().join(format!("{}.log", id))
182}
183
184fn create_log_file(
185 id: u64,
186 path: &Path,
187 readers: &mut HashMap<u64, BufReaderWithPos<File>>,
188) -> Result<BufWriterWithPos<File>> {
189 let path = log_path(&path, id);
190 let writer = BufWriterWithPos::new(OpenOptions::new().create(true).append(true).open(&path)?)?;
191 readers.insert(id, BufReaderWithPos::new(File::open(&path)?)?);
192 Ok(writer)
193}
194
195fn load_log(
197 id: u64,
198 reader: &mut BufReaderWithPos<File>,
199 index: &mut BTreeMap<String, CommandPos>,
200) -> Result<u64> {
201 let mut pos = reader.seek(SeekFrom::Start(0))?;
202 let mut stream = Deserializer::from_reader(reader).into_iter::<Command>();
203 let mut stale_data = 0;
204 while let Some(cmd) = stream.next() {
206 let new_pos = stream.byte_offset() as u64;
207 match cmd? {
208 Command::Set { key, .. } => {
209 if let Some(old_cmd) = index.insert(key, CommandPos::from((id, pos..new_pos))) {
210 stale_data += old_cmd.len;
211 }
212 }
213 Command::Remove { key } => {
214 if let Some(old_cmd) = index.remove(&key) {
215 stale_data += old_cmd.len;
216 }
217
218 stale_data += new_pos - pos;
219 }
220 }
221 pos = new_pos;
222 }
223 Ok(stale_data)
224}
225
226fn sorted_ids(path: &Path) -> Result<Vec<u64>> {
230 let mut ids: Vec<u64> = fs::read_dir(&path)?
231 .flat_map(|dir_entry| -> Result<_> { Ok(dir_entry?.path()) })
232 .filter(|path| path.is_file() && path.extension() == Some("log".as_ref()))
233 .filter_map(|path| {
234 path.file_name()
235 .and_then(OsStr::to_str)
236 .map(|s| s.trim_end_matches(".log"))
237 .map(str::parse::<u64>)
238 })
239 .flatten()
240 .collect();
241 ids.sort();
242 Ok(ids)
243}
244
245pub struct BufReaderWithPos<T: Read + Seek> {
246 reader: BufReader<T>,
247 pos: u64,
248}
249
250impl<T: Read + Seek> BufReaderWithPos<T> {
251 fn new(mut file: T) -> Result<Self> {
252 let pos = file.seek(SeekFrom::Current(0))?;
253 Ok(BufReaderWithPos {
254 reader: BufReader::new(file),
255 pos,
256 })
257 }
258}
259
260impl<T: Read + Seek> Read for BufReaderWithPos<T> {
261 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
262 let len = self.reader.read(buf)?;
263 self.pos += len as u64;
264 Ok(len)
265 }
266}
267
268impl<T: Read + Seek> Seek for BufReaderWithPos<T> {
269 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
270 self.pos = self.reader.seek(pos)?;
271 Ok(self.pos)
272 }
273}
274
275struct BufWriterWithPos<T: Write + Seek> {
276 writer: BufWriter<T>,
277 pos: u64,
278}
279
280impl<T: Write + Seek> BufWriterWithPos<T> {
281 fn new(mut file: T) -> Result<Self> {
282 let pos = file.seek(SeekFrom::Current(0))?;
283 Ok(BufWriterWithPos {
284 writer: BufWriter::new(file),
285 pos,
286 })
287 }
288}
289
290impl<T: Write + Seek> Write for BufWriterWithPos<T> {
291 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
292 let len = self.writer.write(buf)?;
293 self.pos += len as u64;
294 Ok(len)
295 }
296
297 fn flush(&mut self) -> io::Result<()> {
298 self.writer.flush()
299 }
300}
301
302impl<T: Write + Seek> Seek for BufWriterWithPos<T> {
303 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
304 self.pos = self.writer.seek(pos)?;
305 Ok(self.pos)
306 }
307}
308
309#[derive(Serialize, Deserialize, Debug)]
311enum Command {
312 Set { key: String, value: String },
313 Remove { key: String },
314}
315
316impl Command {
317 fn set(key: String, value: String) -> Self {
318 Command::Set { key, value }
319 }
320
321 fn remove(key: String) -> Self {
322 Command::Remove { key }
323 }
324}
325
326#[derive(Debug)]
330struct CommandPos {
331 id: u64,
332 pos: u64,
333 len: u64,
334}
335
336impl From<(u64, Range<u64>)> for CommandPos {
337 fn from((id, range): (u64, Range<u64>)) -> Self {
338 CommandPos {
339 id,
340 pos: range.start,
341 len: range.end - range.start,
342 }
343 }
344}