1use crate::db_command::{CommandOwned, CommandPos};
2use crate::io_types::BufReaderWithPos;
3use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
4use crate::log_storage::log_reader::LogReader;
5use crate::log_storage::log_writer::LogWriter;
6use crate::{GrausError, Result};
7use crossbeam_skiplist::SkipMap;
8use std::cell::RefCell;
9use std::fs::{self, File};
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12use std::{collections::HashMap, path::PathBuf};
13
14#[derive(Clone)]
35pub struct GrausDb {
36 index: Arc<SkipMap<Vec<u8>, CommandPos>>,
38 writer: Arc<Mutex<LogWriter>>,
40 reader: LogReader,
42}
43
44impl GrausDb {
45 pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
53 let path = Arc::new(path.into());
54 fs::create_dir_all(&*path)?;
55
56 let mut readers = HashMap::new();
57 let index = Arc::new(SkipMap::new());
58
59 let log_ids = get_log_ids(&path)?;
60 let mut uncompacted = 0;
61
62 for &log_id in &log_ids {
63 let log_path = log_path(&path, log_id);
64 let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
65 uncompacted += load_log(log_id, &mut reader, &index)?;
66 readers.insert(log_id, reader);
67 }
68
69 let new_log_id = log_ids.last().unwrap_or(&0) + 1;
70 let writer = new_log_file(&path, new_log_id)?;
71 let safe_point = Arc::new(AtomicU64::new(0));
72
73 let reader = LogReader {
74 path: Arc::clone(&path),
75 safe_point,
76 readers: RefCell::new(readers),
77 };
78
79 let writer = LogWriter {
80 writer,
81 index: Arc::clone(&index),
82 reader: reader.clone(),
83 current_log_id: new_log_id,
84 uncompacted,
85 path: Arc::clone(&path),
86 };
87
88 Ok(GrausDb {
89 reader,
90 index,
91 writer: Arc::new(Mutex::new(writer)),
92 })
93 }
94
95 pub fn set(&self, key: Vec<u8>, value: &[u8]) -> Result<()> {
99 self.writer.lock().unwrap().set(key, value)
100 }
101
102 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
106 if let Some(cmd_pos) = self.index.get(key) {
107 if let CommandOwned::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
108 Ok(Some(value))
109 } else {
110 Err(GrausError::UnexpectedCommandType)
111 }
112 } else {
113 Ok(None)
114 }
115 }
116
117 pub fn remove(&self, key: &[u8]) -> Result<()> {
121 self.writer.lock().unwrap().remove(key)
122 }
123
124 pub fn update_if<F, P>(
129 &self,
130 key: Vec<u8>,
131 update_fn: F,
132 predicate_key: Option<&[u8]>,
133 predicate: Option<P>,
134 ) -> Result<()>
135 where
136 F: FnOnce(&mut Vec<u8>),
137 P: FnOnce(&[u8]) -> bool,
138 {
139 let mut writer = self.writer.lock().unwrap();
140 let current_value = self.get(&key)?;
141 let Some(current_value) = current_value else {
142 return Err(GrausError::KeyNotFound);
143 };
144
145 if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
146 let current_predicate_key_value = self.get(predicate_key)?;
147 let Some(current_predicate_key_value) = current_predicate_key_value else {
148 return Err(GrausError::KeyNotFound);
149 };
150 if !predicate(¤t_predicate_key_value) {
151 return Err(GrausError::PredicateNotSatisfied);
152 }
153 }
154
155 let mut current_value_mut = current_value;
156 update_fn(&mut current_value_mut);
157 writer.set(key, ¤t_value_mut)
158 }
159}