1use crate::db_command::{Command, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14#[derive(Clone)]
35pub struct GrausDb {
36 index: Arc<SkipMap<Vec<u8>, CommandPos>>,
38 writer: Arc<Mutex<LogWriter>>,
40 reader: LogReader,
42}
43
44impl GrausDb {
49 pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
57 let path = Arc::new(path.into());
58 fs::create_dir_all(&*path)?;
59
60 let mut readers = HashMap::new();
61 let index = Arc::new(SkipMap::new());
62
63 let log_ids = get_log_ids(&path)?;
64 let mut uncompacted = 0;
65
66 for &log_id in &log_ids {
67 let log_path = log_path(&path, log_id);
68 let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
69 uncompacted += load_log(log_id, &mut reader, &index)?;
70 readers.insert(log_id, reader);
71 }
72
73 let new_log_id = log_ids.last().unwrap_or(&0) + 1;
74 let writer = new_log_file(&path, new_log_id)?;
75 let safe_point = Arc::new(AtomicU64::new(0));
76
77 let reader = LogReader {
78 path: Arc::clone(&path),
79 safe_point,
80 readers: RefCell::new(readers),
81 };
82
83 let writer = LogWriter {
84 writer,
85 index: Arc::clone(&index),
86 reader: reader.clone(),
87 current_log_id: new_log_id,
88 uncompacted,
89 path: Arc::clone(&path),
90 };
91
92 Ok(GrausDb {
93 reader,
94 index,
95 writer: Arc::new(Mutex::new(writer)),
96 })
97 }
98
99 pub fn set(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
103 self.writer.lock().unwrap().set(key, value)
104 }
105
106 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
110 if let Some(cmd_pos) = self.index.get(key) {
111 if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
112 Ok(Some(value))
113 } else {
114 Err(GrausError::UnexpectedCommandType)
115 }
116 } else {
117 Ok(None)
118 }
119 }
120
121 pub fn remove(&self, key: Vec<u8>) -> Result<()> {
125 self.writer.lock().unwrap().remove(key)
126 }
127
128 pub fn update_if<F, P>(
133 &self,
134 key: Vec<u8>,
135 update_fn: F,
136 predicate_key: Option<&[u8]>,
137 predicate: Option<P>,
138 ) -> Result<()>
139 where
140 F: FnOnce(&mut Vec<u8>),
141 P: FnOnce(&[u8]) -> bool,
142 {
143 let mut writer = self.writer.lock().unwrap();
144 let current_value = self.get(&key)?;
145 let Some(current_value) = current_value else {
146 return Err(GrausError::KeyNotFound);
147 };
148
149 if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
150 let current_predicate_key_value = self.get(predicate_key)?;
151 let Some(current_predicate_key_value) = current_predicate_key_value else {
152 return Err(GrausError::KeyNotFound);
153 };
154 if !predicate(¤t_predicate_key_value) {
155 return Err(GrausError::PredicateNotSatisfied);
156 }
157 }
158
159 let mut current_value_mut = current_value;
160 update_fn(&mut current_value_mut);
161 writer.set(key, current_value_mut)
162 }
163}