bitcasky_database/data_storage/
mmap_data_storage.rs

1use std::{fs::File, io::Write, mem, ops::Deref, sync::Arc, vec};
2
3use bitcasky_common::{
4    clock::Clock,
5    formatter::{padding, BitcaskyFormatter, Formatter, RowMeta, RowToWrite, FILE_HEADER_SIZE},
6    options::BitcaskyOptions,
7    storage_id::StorageId,
8};
9use log::debug;
10use memmap2::{MmapMut, MmapOptions};
11
12use crate::{common::RowToRead, DataStorageError, RowLocation, TimedValue};
13
14use super::{DataStorageReader, DataStorageWriter, Result};
15
16type MetaAndKeyValue<'a> = (RowMeta, &'a [u8], Option<Vec<u8>>);
17
18#[derive(Debug)]
19pub struct MmapDataStorage {
20    pub offset: usize,
21    pub capacity: usize,
22    pub read_value_times: u64,
23    pub write_times: u64,
24    data_file: File,
25    storage_id: StorageId,
26    options: Arc<BitcaskyOptions>,
27    formatter: Arc<BitcaskyFormatter>,
28    map_view: MmapMut,
29}
30
31impl MmapDataStorage {
32    pub fn new(
33        storage_id: StorageId,
34        data_file: File,
35        write_offset: usize,
36        capacity: usize,
37        formatter: Arc<BitcaskyFormatter>,
38        options: Arc<BitcaskyOptions>,
39    ) -> Result<Self> {
40        let mmap = unsafe {
41            MmapOptions::new()
42                .offset(0)
43                .len(capacity)
44                .map_mut(&data_file)?
45        };
46
47        Ok(MmapDataStorage {
48            data_file,
49            storage_id,
50            offset: write_offset,
51            capacity,
52            options,
53            formatter,
54            map_view: mmap,
55            read_value_times: 0,
56            write_times: 0,
57        })
58    }
59
60    fn ensure_capacity<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
61        &mut self,
62        row: &RowToWrite<K, V>,
63    ) -> Result<()> {
64        let mut row_size = self.formatter.net_row_size(row);
65        row_size += padding(row_size);
66        let required_capacity = row_size + self.offset;
67        if required_capacity > self.options.database.storage.max_data_file_size {
68            return Err(DataStorageError::StorageOverflow(self.storage_id));
69        }
70
71        if required_capacity > self.capacity {
72            let mut new_capacity =
73                std::cmp::max(required_capacity + 8, self.capacity + self.capacity / 3);
74            new_capacity = std::cmp::min(
75                new_capacity,
76                self.options.database.storage.max_data_file_size,
77            );
78
79            self.flush()?;
80
81            new_capacity = bitcasky_common::resize_file(&self.data_file, new_capacity)?;
82            debug!(
83                "data file with storage id: {:?}, require {} bytes, resizing from {} to {} bytes. ",
84                self.storage_id, required_capacity, self.capacity, new_capacity
85            );
86            let mut mmap = unsafe {
87                MmapOptions::new()
88                    .offset(0)
89                    .len(new_capacity)
90                    .map_mut(&self.data_file)?
91            };
92            mem::swap(&mut mmap, &mut self.map_view);
93            self.capacity = new_capacity;
94        }
95        Ok(())
96    }
97
98    fn as_mut_slice(&mut self) -> &mut [u8] {
99        &mut self.map_view[0..self.capacity]
100    }
101
102    fn as_slice(&self) -> &[u8] {
103        &self.map_view[0..self.capacity]
104    }
105
106    fn do_read_row(&mut self, offset: usize) -> Result<Option<MetaAndKeyValue>> {
107        if offset > self.capacity {
108            return Err(DataStorageError::EofError());
109        }
110
111        if offset == self.capacity {
112            return Ok(None);
113        }
114
115        let header_size = self.formatter.row_header_size();
116        if offset + header_size >= self.capacity {
117            return Err(DataStorageError::EofError());
118        }
119
120        let header = self.formatter.decode_row_header(
121            &self.as_slice()[offset..(offset + self.formatter.row_header_size())],
122        );
123        if header.meta.key_size == 0 {
124            return Ok(None);
125        }
126
127        if offset + header_size + header.meta.key_size + header.meta.value_size > self.capacity {
128            return Err(DataStorageError::EofError());
129        }
130
131        let net_size =
132            self.formatter.row_header_size() + header.meta.key_size + header.meta.value_size;
133
134        let kv_bs = &self.as_slice()[offset + self.formatter.row_header_size()..offset + net_size];
135
136        self.formatter.validate_key_value(&header, kv_bs)?;
137
138        let k = &kv_bs[0..header.meta.key_size];
139        if header.meta.expire_timestamp != 0
140            && header.meta.expire_timestamp <= self.options.clock.now()
141        {
142            Ok(Some((header.meta, k, None)))
143        } else {
144            let v = Some(kv_bs[header.meta.key_size..].into());
145            Ok(Some((header.meta, k, v)))
146        }
147    }
148}
149
150impl DataStorageWriter for MmapDataStorage {
151    fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
152        &mut self,
153        row: &RowToWrite<K, V>,
154    ) -> super::Result<crate::RowLocation> {
155        self.ensure_capacity(row)?;
156
157        let value_offset = self.offset;
158        let formatter = self.formatter.clone();
159        let net_size = formatter.encode_row(row, &mut self.as_mut_slice()[value_offset..]);
160        self.offset += net_size + padding(net_size);
161        self.write_times += 1;
162
163        Ok(RowLocation {
164            storage_id: self.storage_id,
165            row_offset: value_offset,
166        })
167    }
168
169    fn rewind(&mut self) -> super::Result<()> {
170        self.data_file.flush()?;
171        self.offset = FILE_HEADER_SIZE;
172        Ok(())
173    }
174
175    fn flush(&mut self) -> super::Result<()> {
176        Ok(self.map_view.flush_range(0, self.capacity)?)
177    }
178}
179
180impl DataStorageReader for MmapDataStorage {
181    fn read_value(&mut self, row_offset: usize) -> super::Result<Option<TimedValue<Vec<u8>>>> {
182        let storage_id = self.storage_id;
183        let row = self
184            .do_read_row(row_offset)
185            .map_err(|e| DataStorageError::ReadRowFailed(storage_id, e.to_string()))?;
186        if row.is_none() {
187            return Err(DataStorageError::ReadRowFailed(
188                self.storage_id,
189                format!("no value found at offset: {}", row_offset),
190            ));
191        }
192
193        let ret = {
194            let (meta, _, v_op) = row.unwrap();
195            if let Some(v) = v_op {
196                Ok(TimedValue {
197                    value: v,
198                    expire_timestamp: meta.expire_timestamp,
199                }
200                .validate())
201            } else {
202                Ok(None)
203            }
204        };
205        self.read_value_times += 1;
206        ret
207    }
208
209    fn read_next_row(&mut self) -> super::Result<Option<RowToRead>> {
210        let row_offset = self.offset;
211        let row = self.do_read_row(row_offset)?;
212        if row.is_none() {
213            return Ok(None);
214        }
215
216        let (meta, key, v) = row.unwrap();
217        let row_to_read = RowToRead {
218            key: key.into(),
219            value: TimedValue::expirable_value(v.unwrap_or(vec![]), meta.expire_timestamp),
220            row_location: RowLocation {
221                storage_id: self.storage_id,
222                row_offset,
223            },
224        };
225
226        let net_size: usize = self.formatter.row_header_size() + meta.key_size + meta.value_size;
227        self.offset += net_size + padding(net_size);
228
229        Ok(Some(row_to_read))
230    }
231
232    fn seek_to_end(&mut self) -> Result<()> {
233        loop {
234            if self.read_next_row()?.is_none() {
235                break;
236            }
237        }
238        Ok(())
239    }
240
241    fn offset(&self) -> usize {
242        self.offset
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use bitcasky_common::{
249        clock::DebugClock, create_file, formatter::FILE_HEADER_SIZE, fs::FileType,
250        options::DataSotrageType,
251    };
252
253    use super::*;
254
255    use test_log::test;
256    use utilities::common::get_temporary_directory_path;
257
258    fn get_options(max_size: usize) -> BitcaskyOptions {
259        BitcaskyOptions::default()
260            .max_data_file_size(max_size)
261            .init_data_file_capacity(max_size)
262            .storage_type(DataSotrageType::Mmap)
263    }
264
265    fn get_file_storage(options: BitcaskyOptions) -> MmapDataStorage {
266        let dir = get_temporary_directory_path();
267        let storage_id = 1;
268        let formatter = Arc::new(BitcaskyFormatter::default());
269        let file = create_file(dir, FileType::DataFile, Some(storage_id), &formatter, 512).unwrap();
270        let meta = file.metadata().unwrap();
271        MmapDataStorage::new(
272            1,
273            file,
274            FILE_HEADER_SIZE,
275            meta.len() as usize,
276            formatter,
277            Arc::new(options),
278        )
279        .unwrap()
280    }
281
282    #[test]
283    fn test_read_write_immotal_value() {
284        let mut storage = get_file_storage(get_options(1024));
285
286        let k1: Vec<u8> = "key1".into();
287        let v1: Vec<u8> = "value1".into();
288        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
289        let row_location1 = storage.write_row(&row_to_write).unwrap();
290
291        let k2: Vec<u8> = "key2".into();
292        let v2: Vec<u8> = "value2".into();
293        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k2, v2.clone());
294        let row_location2 = storage.write_row(&row_to_write).unwrap();
295
296        assert_eq!(
297            v1,
298            *storage
299                .read_value(row_location1.row_offset)
300                .unwrap()
301                .unwrap()
302        );
303        assert_eq!(
304            v2,
305            *storage
306                .read_value(row_location2.row_offset)
307                .unwrap()
308                .unwrap()
309        );
310    }
311
312    #[test]
313    fn test_read_write_expired_value() {
314        let time = 1000;
315        let clock = Arc::new(DebugClock::new(time));
316        let mut storage = get_file_storage(get_options(1024).debug_clock(clock.clone()));
317
318        let k1: Vec<u8> = "key1".into();
319        let v1: Vec<u8> = "value1".into();
320        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> =
321            RowToWrite::new_with_timestamp(k1, v1.clone(), time);
322        let row_location1 = storage.write_row(&row_to_write).unwrap();
323
324        let k2: Vec<u8> = "key2".into();
325        let v2: Vec<u8> = "value2".into();
326        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> =
327            RowToWrite::new_with_timestamp(k2, v2.clone(), time + 1);
328        let row_location2 = storage.write_row(&row_to_write).unwrap();
329
330        assert!(storage
331            .read_value(row_location1.row_offset)
332            .unwrap()
333            .is_none());
334        // v2 still valid
335        assert_eq!(
336            v2,
337            *storage
338                .read_value(row_location2.row_offset)
339                .unwrap()
340                .unwrap()
341        );
342
343        // move clock, let v2 expire
344        clock.set(time + 1);
345        assert!(storage
346            .read_value(row_location2.row_offset)
347            .unwrap()
348            .is_none());
349    }
350
351    #[test]
352    fn test_write_overflow() {
353        let mut storage = get_file_storage(get_options(2));
354
355        let k1: Vec<u8> = "key1".into();
356        let v1: Vec<u8> = "value1".into();
357        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
358        storage.write_row(&row_to_write).expect_err("overflow");
359    }
360
361    #[test]
362    fn test_expand_file_size() {
363        let mut storage = get_file_storage(get_options(2048));
364        let init_size = storage.data_file.metadata().unwrap().len();
365
366        let mut size = 0;
367        for i in 0..100 {
368            if size >= 1800 {
369                break;
370            }
371
372            let k1: Vec<u8> = format!("key{}", i).into();
373            let v1: Vec<u8> = format!("value{}", i).into();
374            let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
375            storage.write_row(&row_to_write).unwrap();
376            let net_size = storage.formatter.net_row_size(&row_to_write);
377            size += net_size + padding(net_size);
378        }
379
380        assert!(storage.data_file.metadata().unwrap().len() > init_size);
381    }
382
383    #[test]
384    fn test_read_next_immortal_row() {
385        let mut storage = get_file_storage(get_options(1024));
386
387        let k1: Vec<u8> = "key1".into();
388        let v1: Vec<u8> = "value1".into();
389        let row_to_write: RowToWrite<&[u8], Vec<u8>> = RowToWrite::new(&k1, v1.clone());
390        storage.write_row(&row_to_write).unwrap();
391
392        let k2: Vec<u8> = "key2".into();
393        let v2: Vec<u8> = "value2".into();
394        let row_to_write: RowToWrite<&[u8], Vec<u8>> = RowToWrite::new(&k2, v2.clone());
395        storage.write_row(&row_to_write).unwrap();
396
397        storage.rewind().unwrap();
398
399        let r = storage.read_next_row().unwrap().unwrap();
400
401        assert_eq!(k1, r.key);
402        assert_eq!(v1, r.value.value);
403        let r = storage.read_next_row().unwrap().unwrap();
404        assert_eq!(k2, r.key);
405        assert_eq!(v2, r.value.value);
406    }
407
408    #[test]
409    fn test_read_next_expired_row() {
410        let time = 1000;
411        let clock = Arc::new(DebugClock::new(time));
412        let mut storage = get_file_storage(get_options(1024).debug_clock(clock.clone()));
413
414        let k1: Vec<u8> = "key1".into();
415        let v1: Vec<u8> = "value1".into();
416        let row_to_write: RowToWrite<&[u8], Vec<u8>> =
417            RowToWrite::new_with_timestamp(&k1, v1.clone(), time + 1);
418        storage.write_row(&row_to_write).unwrap();
419
420        let k2: Vec<u8> = "key2".into();
421        let v2: Vec<u8> = "value2".into();
422        let row_to_write: RowToWrite<&[u8], Vec<u8>> =
423            RowToWrite::new_with_timestamp(&k2, v2.clone(), time);
424        storage.write_row(&row_to_write).unwrap();
425
426        storage.rewind().unwrap();
427
428        let r = storage.read_next_row().unwrap().unwrap();
429        assert_eq!(k1, r.key);
430        assert_eq!(v1, r.value.value);
431        assert_eq!(time + 1, r.value.expire_timestamp);
432        let r = storage.read_next_row().unwrap().unwrap();
433        assert_eq!(k2, r.key);
434        assert!(r.value.value.is_empty());
435        assert_eq!(time, r.value.expire_timestamp);
436        assert!(storage.read_next_row().unwrap().is_none());
437
438        // move clock, no valid value
439        clock.set(time + 1);
440        storage.rewind().unwrap();
441        let r = storage.read_next_row().unwrap().unwrap();
442        assert_eq!(k1, r.key);
443        assert!(r.value.is_empty());
444        assert_eq!(time + 1, r.value.expire_timestamp);
445        let r = storage.read_next_row().unwrap().unwrap();
446        assert_eq!(k2, r.key);
447        assert!(r.value.value.is_empty());
448        assert_eq!(time, r.value.expire_timestamp);
449        assert!(storage.read_next_row().unwrap().is_none());
450    }
451
452    #[test]
453    fn test_rewind() {
454        let mut storage = get_file_storage(get_options(1024));
455
456        let k1: Vec<u8> = "key1".into();
457        let v1: Vec<u8> = "value1".into();
458        let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
459        let location = storage.write_row(&row_to_write).unwrap();
460        storage.rewind().unwrap();
461
462        if let Some(r) = storage.read_next_row().unwrap() {
463            assert_eq!("key1".as_bytes().to_vec(), r.key);
464            assert_eq!(location, r.row_location);
465        } else {
466            unreachable!();
467        }
468    }
469}