bitcasky_database/data_storage/
mod.rs

1pub mod mmap_data_storage;
2
3use log::{debug, error};
4use std::{
5    fs::{File, Metadata},
6    ops::Deref,
7    path::{Path, PathBuf},
8    sync::Arc,
9};
10use thiserror::Error;
11
12use bitcasky_common::{
13    create_file,
14    formatter::{
15        self, get_formatter_from_file, BitcaskyFormatter, FormatterError, RowToWrite,
16        FILE_HEADER_SIZE,
17    },
18    fs::{self, FileType},
19    options::BitcaskyOptions,
20    storage_id::StorageId,
21};
22
23use self::mmap_data_storage::MmapDataStorage;
24
25use super::{common::RowToRead, RowLocation, TimedValue};
26
27#[derive(Error, Debug)]
28#[error("{}")]
29pub enum DataStorageError {
30    #[error("Write data file with id: {0} failed. error: {1}")]
31    WriteRowFailed(StorageId, String),
32    #[error("Read data file with id: {0} failed. error: {1}")]
33    ReadRowFailed(StorageId, String),
34    #[error("Flush writing storage with id: {0} failed. error: {1}")]
35    FlushStorageFailed(StorageId, String),
36    #[error("Rewind storage with id: {0} failed. error: {1}")]
37    RewindFailed(StorageId, String),
38    #[error("Storage with id: {0} overflow, need replace with a new one")]
39    StorageOverflow(StorageId),
40    #[error("No permission to write storage with id: {0}")]
41    PermissionDenied(StorageId),
42    #[error("Got IO Error: {0}")]
43    IoError(#[from] std::io::Error),
44    #[error("Got IO Error: {0}")]
45    DataStorageFormatter(#[from] FormatterError),
46    #[error("Failed to read file header for storage with id: {1}")]
47    ReadFileHeaderError(#[source] FormatterError, StorageId),
48    #[error("Read end of file")]
49    EofError(),
50}
51
52pub type Result<T> = std::result::Result<T, DataStorageError>;
53
54pub trait DataStorageWriter {
55    fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
56        &mut self,
57        row: &RowToWrite<K, V>,
58    ) -> Result<RowLocation>;
59
60    fn rewind(&mut self) -> Result<()>;
61
62    fn flush(&mut self) -> Result<()>;
63}
64
65pub trait DataStorageReader {
66    /// Read value from this storage at row_offset
67    fn read_value(&mut self, row_offset: usize) -> Result<Option<TimedValue<Vec<u8>>>>;
68
69    /// Read next value from this storage
70    fn read_next_row(&mut self) -> Result<Option<RowToRead>>;
71
72    fn seek_to_end(&mut self) -> Result<()>;
73
74    fn offset(&self) -> usize;
75}
76
77#[derive(Debug)]
78enum DataStorageImpl {
79    MmapStorage(MmapDataStorage),
80}
81
82#[derive(Debug)]
83pub struct DataStorageTelemetry {
84    pub storage_id: StorageId,
85    pub formatter_version: u8,
86    pub capacity: usize,
87    pub offset: usize,
88    pub usage: f64,
89    pub read_value_times: u64,
90    pub write_times: u64,
91}
92
93#[derive(Debug)]
94pub struct DataStorage {
95    database_dir: PathBuf,
96    storage_id: StorageId,
97    storage_impl: DataStorageImpl,
98    options: Arc<BitcaskyOptions>,
99    formatter: Arc<BitcaskyFormatter>,
100    dirty: bool,
101}
102
103impl DataStorage {
104    pub fn new<P: AsRef<Path>>(
105        database_dir: P,
106        storage_id: StorageId,
107        formatter: Arc<BitcaskyFormatter>,
108        options: Arc<BitcaskyOptions>,
109    ) -> Result<Self> {
110        let path = database_dir.as_ref().to_path_buf();
111        let data_file = create_file(
112            &path,
113            FileType::DataFile,
114            Some(storage_id),
115            &formatter,
116            options.database.storage.init_data_file_capacity,
117        )?;
118
119        debug!(
120            "Create storage under path: {:?} with storage id: {}",
121            &path, storage_id
122        );
123        let meta = data_file.metadata()?;
124
125        DataStorage::open_by_file(
126            &path,
127            storage_id,
128            data_file,
129            meta,
130            FILE_HEADER_SIZE,
131            formatter,
132            options,
133        )
134    }
135
136    pub fn open<P: AsRef<Path>>(
137        database_dir: P,
138        storage_id: StorageId,
139        options: Arc<BitcaskyOptions>,
140    ) -> Result<Self> {
141        let path = database_dir.as_ref().to_path_buf();
142        let mut data_file = fs::open_file(&path, FileType::DataFile, Some(storage_id))?;
143        debug!(
144            "Open storage under path: {:?} with storage id: {}",
145            &path, storage_id
146        );
147        let meta = data_file.file.metadata()?;
148        let formatter = Arc::new(get_formatter_from_file(&mut data_file.file)?);
149
150        DataStorage::open_by_file(
151            &path,
152            storage_id,
153            data_file.file,
154            meta,
155            FILE_HEADER_SIZE,
156            formatter,
157            options,
158        )
159    }
160
161    pub fn storage_id(&self) -> StorageId {
162        self.storage_id
163    }
164
165    pub fn is_dirty(&mut self) -> bool {
166        self.dirty
167    }
168
169    pub fn iter(&self) -> Result<StorageIter> {
170        let mut data_file = fs::open_file(
171            &self.database_dir,
172            FileType::DataFile,
173            Some(self.storage_id),
174        )?;
175        debug!(
176            "Create iterator under path: {:?} with storage id: {}",
177            &self.database_dir, self.storage_id
178        );
179        let formatter = Arc::new(
180            formatter::get_formatter_from_file(&mut data_file.file)
181                .map_err(|e| DataStorageError::ReadFileHeaderError(e, self.storage_id))?,
182        );
183        let meta = data_file.file.metadata()?;
184        Ok(StorageIter {
185            storage: DataStorage::open_by_file(
186                &self.database_dir,
187                self.storage_id,
188                data_file.file,
189                meta,
190                FILE_HEADER_SIZE,
191                formatter,
192                self.options.clone(),
193            )?,
194        })
195    }
196
197    pub fn get_telemetry_data(&self) -> DataStorageTelemetry {
198        match &self.storage_impl {
199            DataStorageImpl::MmapStorage(s) => DataStorageTelemetry {
200                storage_id: self.storage_id,
201                formatter_version: self.formatter.version(),
202                capacity: s.capacity,
203                offset: s.offset,
204                usage: s.offset as f64 / s.capacity as f64,
205                read_value_times: s.read_value_times,
206                write_times: s.write_times,
207            },
208        }
209    }
210
211    fn open_by_file(
212        database_dir: &Path,
213        storage_id: StorageId,
214        data_file: File,
215        meta: Metadata,
216        write_offset: usize,
217        formatter: Arc<BitcaskyFormatter>,
218        options: Arc<BitcaskyOptions>,
219    ) -> Result<Self> {
220        let capacity = meta.len() as usize;
221        let storage_impl = DataStorageImpl::MmapStorage(MmapDataStorage::new(
222            storage_id,
223            data_file,
224            write_offset,
225            capacity,
226            formatter.clone(),
227            options.clone(),
228        )?);
229        Ok(DataStorage {
230            storage_impl,
231            storage_id,
232            database_dir: database_dir.to_path_buf(),
233            options,
234            formatter,
235            dirty: false,
236        })
237    }
238}
239
240impl DataStorageWriter for DataStorage {
241    fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
242        &mut self,
243        row: &RowToWrite<K, V>,
244    ) -> Result<RowLocation> {
245        let r = match &mut self.storage_impl {
246            DataStorageImpl::MmapStorage(s) => s.write_row(row),
247        }?;
248        self.dirty = true;
249        Ok(r)
250    }
251
252    fn rewind(&mut self) -> Result<()> {
253        match &mut self.storage_impl {
254            DataStorageImpl::MmapStorage(s) => {
255                let storage_id = self.storage_id;
256                s.rewind()
257                    .map_err(|e| DataStorageError::RewindFailed(storage_id, e.to_string()))
258            }
259        }
260    }
261
262    fn flush(&mut self) -> Result<()> {
263        match &mut self.storage_impl {
264            DataStorageImpl::MmapStorage(s) => s
265                .flush()
266                .map_err(|e| DataStorageError::FlushStorageFailed(self.storage_id, e.to_string())),
267        }
268    }
269}
270
271impl DataStorageReader for DataStorage {
272    fn read_value(&mut self, row_offset: usize) -> Result<Option<TimedValue<Vec<u8>>>> {
273        match &mut self.storage_impl {
274            DataStorageImpl::MmapStorage(s) => s
275                .read_value(row_offset)
276                .map_err(|e| DataStorageError::ReadRowFailed(self.storage_id, e.to_string())),
277        }
278    }
279
280    fn read_next_row(&mut self) -> Result<Option<RowToRead>> {
281        match &mut self.storage_impl {
282            DataStorageImpl::MmapStorage(s) => s.read_next_row(),
283        }
284    }
285
286    fn seek_to_end(&mut self) -> Result<()> {
287        match &mut self.storage_impl {
288            DataStorageImpl::MmapStorage(s) => s.seek_to_end(),
289        }
290    }
291
292    fn offset(&self) -> usize {
293        match &self.storage_impl {
294            DataStorageImpl::MmapStorage(s) => s.offset(),
295        }
296    }
297}
298
299#[derive(Debug)]
300pub struct StorageIter {
301    storage: DataStorage,
302}
303
304impl Iterator for StorageIter {
305    type Item = Result<RowToRead>;
306
307    fn next(&mut self) -> Option<Self::Item> {
308        let ret = self.storage.read_next_row();
309        match ret {
310            Ok(o) => o.map(Ok),
311            Err(e) => {
312                error!(target: "Storage", "Data file with file id {} was corrupted. Error: {}", 
313                self.storage.storage_id(), &e);
314                None
315            }
316        }
317    }
318}