bitcasky_database/data_storage/
mod.rs1pub mod mmap_data_storage;
2
3use log::{debug, error};
4use std::{
5 fs::{File, Metadata},
6 ops::Deref,
7 path::{Path, PathBuf},
8 sync::Arc,
9};
10use thiserror::Error;
11
12use bitcasky_common::{
13 create_file,
14 formatter::{
15 self, get_formatter_from_file, BitcaskyFormatter, FormatterError, RowToWrite,
16 FILE_HEADER_SIZE,
17 },
18 fs::{self, FileType},
19 options::BitcaskyOptions,
20 storage_id::StorageId,
21};
22
23use self::mmap_data_storage::MmapDataStorage;
24
25use super::{common::RowToRead, RowLocation, TimedValue};
26
27#[derive(Error, Debug)]
28#[error("{}")]
29pub enum DataStorageError {
30 #[error("Write data file with id: {0} failed. error: {1}")]
31 WriteRowFailed(StorageId, String),
32 #[error("Read data file with id: {0} failed. error: {1}")]
33 ReadRowFailed(StorageId, String),
34 #[error("Flush writing storage with id: {0} failed. error: {1}")]
35 FlushStorageFailed(StorageId, String),
36 #[error("Rewind storage with id: {0} failed. error: {1}")]
37 RewindFailed(StorageId, String),
38 #[error("Storage with id: {0} overflow, need replace with a new one")]
39 StorageOverflow(StorageId),
40 #[error("No permission to write storage with id: {0}")]
41 PermissionDenied(StorageId),
42 #[error("Got IO Error: {0}")]
43 IoError(#[from] std::io::Error),
44 #[error("Got IO Error: {0}")]
45 DataStorageFormatter(#[from] FormatterError),
46 #[error("Failed to read file header for storage with id: {1}")]
47 ReadFileHeaderError(#[source] FormatterError, StorageId),
48 #[error("Read end of file")]
49 EofError(),
50}
51
52pub type Result<T> = std::result::Result<T, DataStorageError>;
53
54pub trait DataStorageWriter {
55 fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
56 &mut self,
57 row: &RowToWrite<K, V>,
58 ) -> Result<RowLocation>;
59
60 fn rewind(&mut self) -> Result<()>;
61
62 fn flush(&mut self) -> Result<()>;
63}
64
65pub trait DataStorageReader {
66 fn read_value(&mut self, row_offset: usize) -> Result<Option<TimedValue<Vec<u8>>>>;
68
69 fn read_next_row(&mut self) -> Result<Option<RowToRead>>;
71
72 fn seek_to_end(&mut self) -> Result<()>;
73
74 fn offset(&self) -> usize;
75}
76
77#[derive(Debug)]
78enum DataStorageImpl {
79 MmapStorage(MmapDataStorage),
80}
81
82#[derive(Debug)]
83pub struct DataStorageTelemetry {
84 pub storage_id: StorageId,
85 pub formatter_version: u8,
86 pub capacity: usize,
87 pub offset: usize,
88 pub usage: f64,
89 pub read_value_times: u64,
90 pub write_times: u64,
91}
92
93#[derive(Debug)]
94pub struct DataStorage {
95 database_dir: PathBuf,
96 storage_id: StorageId,
97 storage_impl: DataStorageImpl,
98 options: Arc<BitcaskyOptions>,
99 formatter: Arc<BitcaskyFormatter>,
100 dirty: bool,
101}
102
103impl DataStorage {
104 pub fn new<P: AsRef<Path>>(
105 database_dir: P,
106 storage_id: StorageId,
107 formatter: Arc<BitcaskyFormatter>,
108 options: Arc<BitcaskyOptions>,
109 ) -> Result<Self> {
110 let path = database_dir.as_ref().to_path_buf();
111 let data_file = create_file(
112 &path,
113 FileType::DataFile,
114 Some(storage_id),
115 &formatter,
116 options.database.storage.init_data_file_capacity,
117 )?;
118
119 debug!(
120 "Create storage under path: {:?} with storage id: {}",
121 &path, storage_id
122 );
123 let meta = data_file.metadata()?;
124
125 DataStorage::open_by_file(
126 &path,
127 storage_id,
128 data_file,
129 meta,
130 FILE_HEADER_SIZE,
131 formatter,
132 options,
133 )
134 }
135
136 pub fn open<P: AsRef<Path>>(
137 database_dir: P,
138 storage_id: StorageId,
139 options: Arc<BitcaskyOptions>,
140 ) -> Result<Self> {
141 let path = database_dir.as_ref().to_path_buf();
142 let mut data_file = fs::open_file(&path, FileType::DataFile, Some(storage_id))?;
143 debug!(
144 "Open storage under path: {:?} with storage id: {}",
145 &path, storage_id
146 );
147 let meta = data_file.file.metadata()?;
148 let formatter = Arc::new(get_formatter_from_file(&mut data_file.file)?);
149
150 DataStorage::open_by_file(
151 &path,
152 storage_id,
153 data_file.file,
154 meta,
155 FILE_HEADER_SIZE,
156 formatter,
157 options,
158 )
159 }
160
161 pub fn storage_id(&self) -> StorageId {
162 self.storage_id
163 }
164
165 pub fn is_dirty(&mut self) -> bool {
166 self.dirty
167 }
168
169 pub fn iter(&self) -> Result<StorageIter> {
170 let mut data_file = fs::open_file(
171 &self.database_dir,
172 FileType::DataFile,
173 Some(self.storage_id),
174 )?;
175 debug!(
176 "Create iterator under path: {:?} with storage id: {}",
177 &self.database_dir, self.storage_id
178 );
179 let formatter = Arc::new(
180 formatter::get_formatter_from_file(&mut data_file.file)
181 .map_err(|e| DataStorageError::ReadFileHeaderError(e, self.storage_id))?,
182 );
183 let meta = data_file.file.metadata()?;
184 Ok(StorageIter {
185 storage: DataStorage::open_by_file(
186 &self.database_dir,
187 self.storage_id,
188 data_file.file,
189 meta,
190 FILE_HEADER_SIZE,
191 formatter,
192 self.options.clone(),
193 )?,
194 })
195 }
196
197 pub fn get_telemetry_data(&self) -> DataStorageTelemetry {
198 match &self.storage_impl {
199 DataStorageImpl::MmapStorage(s) => DataStorageTelemetry {
200 storage_id: self.storage_id,
201 formatter_version: self.formatter.version(),
202 capacity: s.capacity,
203 offset: s.offset,
204 usage: s.offset as f64 / s.capacity as f64,
205 read_value_times: s.read_value_times,
206 write_times: s.write_times,
207 },
208 }
209 }
210
211 fn open_by_file(
212 database_dir: &Path,
213 storage_id: StorageId,
214 data_file: File,
215 meta: Metadata,
216 write_offset: usize,
217 formatter: Arc<BitcaskyFormatter>,
218 options: Arc<BitcaskyOptions>,
219 ) -> Result<Self> {
220 let capacity = meta.len() as usize;
221 let storage_impl = DataStorageImpl::MmapStorage(MmapDataStorage::new(
222 storage_id,
223 data_file,
224 write_offset,
225 capacity,
226 formatter.clone(),
227 options.clone(),
228 )?);
229 Ok(DataStorage {
230 storage_impl,
231 storage_id,
232 database_dir: database_dir.to_path_buf(),
233 options,
234 formatter,
235 dirty: false,
236 })
237 }
238}
239
240impl DataStorageWriter for DataStorage {
241 fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
242 &mut self,
243 row: &RowToWrite<K, V>,
244 ) -> Result<RowLocation> {
245 let r = match &mut self.storage_impl {
246 DataStorageImpl::MmapStorage(s) => s.write_row(row),
247 }?;
248 self.dirty = true;
249 Ok(r)
250 }
251
252 fn rewind(&mut self) -> Result<()> {
253 match &mut self.storage_impl {
254 DataStorageImpl::MmapStorage(s) => {
255 let storage_id = self.storage_id;
256 s.rewind()
257 .map_err(|e| DataStorageError::RewindFailed(storage_id, e.to_string()))
258 }
259 }
260 }
261
262 fn flush(&mut self) -> Result<()> {
263 match &mut self.storage_impl {
264 DataStorageImpl::MmapStorage(s) => s
265 .flush()
266 .map_err(|e| DataStorageError::FlushStorageFailed(self.storage_id, e.to_string())),
267 }
268 }
269}
270
271impl DataStorageReader for DataStorage {
272 fn read_value(&mut self, row_offset: usize) -> Result<Option<TimedValue<Vec<u8>>>> {
273 match &mut self.storage_impl {
274 DataStorageImpl::MmapStorage(s) => s
275 .read_value(row_offset)
276 .map_err(|e| DataStorageError::ReadRowFailed(self.storage_id, e.to_string())),
277 }
278 }
279
280 fn read_next_row(&mut self) -> Result<Option<RowToRead>> {
281 match &mut self.storage_impl {
282 DataStorageImpl::MmapStorage(s) => s.read_next_row(),
283 }
284 }
285
286 fn seek_to_end(&mut self) -> Result<()> {
287 match &mut self.storage_impl {
288 DataStorageImpl::MmapStorage(s) => s.seek_to_end(),
289 }
290 }
291
292 fn offset(&self) -> usize {
293 match &self.storage_impl {
294 DataStorageImpl::MmapStorage(s) => s.offset(),
295 }
296 }
297}
298
299#[derive(Debug)]
300pub struct StorageIter {
301 storage: DataStorage,
302}
303
304impl Iterator for StorageIter {
305 type Item = Result<RowToRead>;
306
307 fn next(&mut self) -> Option<Self::Item> {
308 let ret = self.storage.read_next_row();
309 match ret {
310 Ok(o) => o.map(Ok),
311 Err(e) => {
312 error!(target: "Storage", "Data file with file id {} was corrupted. Error: {}",
313 self.storage.storage_id(), &e);
314 None
315 }
316 }
317 }
318}