1use crate::db_command::{Command, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14#[derive(Clone)]
34pub struct GrausDb {
35 index: Arc<SkipMap<String, CommandPos>>,
37 writer: Arc<Mutex<LogWriter>>,
39 reader: LogReader,
41}
42
43impl GrausDb {
44 pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
52 let path = Arc::new(path.into());
53 fs::create_dir_all(&*path)?;
54
55 let mut readers = HashMap::new();
56 let index = Arc::new(SkipMap::new());
57
58 let log_ids = get_log_ids(&path)?;
59 let mut uncompacted = 0;
60
61 for &log_id in &log_ids {
62 let log_path = log_path(&path, log_id);
63 let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
64 uncompacted += load_log(log_id, &mut reader, &*index)?;
65 readers.insert(log_id, reader);
66 }
67
68 let new_log_id = log_ids.last().unwrap_or(&0) + 1;
69 let writer = new_log_file(&path, new_log_id)?;
70 let safe_point = Arc::new(AtomicU64::new(0));
71
72 let reader = LogReader {
73 path: Arc::clone(&path),
74 safe_point,
75 readers: RefCell::new(readers),
76 };
77
78 let writer = LogWriter {
79 writer,
80 index: Arc::clone(&index),
81 reader: reader.clone(),
82 current_log_id: new_log_id,
83 uncompacted,
84 path: Arc::clone(&path),
85 };
86
87 Ok(GrausDb {
88 reader,
89 index,
90 writer: Arc::new(Mutex::new(writer)),
91 })
92 }
93
94 pub fn set(&self, key: String, value: String) -> Result<()> {
98 self.writer.lock().unwrap().set(key, value)
99 }
100
101 pub fn get(&self, key: String) -> Result<Option<String>> {
105 if let Some(cmd_pos) = self.index.get(&key) {
106 if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
107 Ok(Some(value))
108 } else {
109 Err(GrausError::UnexpectedCommandType)
110 }
111 } else {
112 Ok(None)
113 }
114 }
115
116 pub fn remove(&self, key: String) -> Result<()> {
120 self.writer.lock().unwrap().remove(key)
121 }
122
123 pub fn update_if<F, P>(
128 &self,
129 key: String,
130 update_fn: F,
131 predicate_key: Option<String>,
132 predicate: Option<P>,
133 ) -> Result<()>
134 where
135 F: FnOnce(String) -> String,
136 P: FnOnce(String) -> bool,
137 {
138 let mut writer = self.writer.lock().unwrap();
139 let current_value = self.get(key.to_owned())?;
140 let Some(current_value) = current_value else {
141 return Err(GrausError::KeyNotFound);
142 };
143
144 if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
145 let current_predicate_key_value = self.get(predicate_key)?;
146 let Some(current_predicate_key_value) = current_predicate_key_value else {
147 return Err(GrausError::KeyNotFound);
148 };
149 if !predicate(current_predicate_key_value) {
150 return Err(GrausError::PredicateNotSatisfied);
151 }
152 }
153
154 let updated_value = update_fn(current_value);
155 writer.set(key, updated_value)
156 }
157}