bitcasky_database/data_storage/
mmap_data_storage.rs1use std::{fs::File, io::Write, mem, ops::Deref, sync::Arc, vec};
2
3use bitcasky_common::{
4 clock::Clock,
5 formatter::{padding, BitcaskyFormatter, Formatter, RowMeta, RowToWrite, FILE_HEADER_SIZE},
6 options::BitcaskyOptions,
7 storage_id::StorageId,
8};
9use log::debug;
10use memmap2::{MmapMut, MmapOptions};
11
12use crate::{common::RowToRead, DataStorageError, RowLocation, TimedValue};
13
14use super::{DataStorageReader, DataStorageWriter, Result};
15
16type MetaAndKeyValue<'a> = (RowMeta, &'a [u8], Option<Vec<u8>>);
17
18#[derive(Debug)]
19pub struct MmapDataStorage {
20 pub offset: usize,
21 pub capacity: usize,
22 pub read_value_times: u64,
23 pub write_times: u64,
24 data_file: File,
25 storage_id: StorageId,
26 options: Arc<BitcaskyOptions>,
27 formatter: Arc<BitcaskyFormatter>,
28 map_view: MmapMut,
29}
30
31impl MmapDataStorage {
32 pub fn new(
33 storage_id: StorageId,
34 data_file: File,
35 write_offset: usize,
36 capacity: usize,
37 formatter: Arc<BitcaskyFormatter>,
38 options: Arc<BitcaskyOptions>,
39 ) -> Result<Self> {
40 let mmap = unsafe {
41 MmapOptions::new()
42 .offset(0)
43 .len(capacity)
44 .map_mut(&data_file)?
45 };
46
47 Ok(MmapDataStorage {
48 data_file,
49 storage_id,
50 offset: write_offset,
51 capacity,
52 options,
53 formatter,
54 map_view: mmap,
55 read_value_times: 0,
56 write_times: 0,
57 })
58 }
59
60 fn ensure_capacity<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
61 &mut self,
62 row: &RowToWrite<K, V>,
63 ) -> Result<()> {
64 let mut row_size = self.formatter.net_row_size(row);
65 row_size += padding(row_size);
66 let required_capacity = row_size + self.offset;
67 if required_capacity > self.options.database.storage.max_data_file_size {
68 return Err(DataStorageError::StorageOverflow(self.storage_id));
69 }
70
71 if required_capacity > self.capacity {
72 let mut new_capacity =
73 std::cmp::max(required_capacity + 8, self.capacity + self.capacity / 3);
74 new_capacity = std::cmp::min(
75 new_capacity,
76 self.options.database.storage.max_data_file_size,
77 );
78
79 self.flush()?;
80
81 new_capacity = bitcasky_common::resize_file(&self.data_file, new_capacity)?;
82 debug!(
83 "data file with storage id: {:?}, require {} bytes, resizing from {} to {} bytes. ",
84 self.storage_id, required_capacity, self.capacity, new_capacity
85 );
86 let mut mmap = unsafe {
87 MmapOptions::new()
88 .offset(0)
89 .len(new_capacity)
90 .map_mut(&self.data_file)?
91 };
92 mem::swap(&mut mmap, &mut self.map_view);
93 self.capacity = new_capacity;
94 }
95 Ok(())
96 }
97
98 fn as_mut_slice(&mut self) -> &mut [u8] {
99 &mut self.map_view[0..self.capacity]
100 }
101
102 fn as_slice(&self) -> &[u8] {
103 &self.map_view[0..self.capacity]
104 }
105
106 fn do_read_row(&mut self, offset: usize) -> Result<Option<MetaAndKeyValue>> {
107 if offset > self.capacity {
108 return Err(DataStorageError::EofError());
109 }
110
111 if offset == self.capacity {
112 return Ok(None);
113 }
114
115 let header_size = self.formatter.row_header_size();
116 if offset + header_size >= self.capacity {
117 return Err(DataStorageError::EofError());
118 }
119
120 let header = self.formatter.decode_row_header(
121 &self.as_slice()[offset..(offset + self.formatter.row_header_size())],
122 );
123 if header.meta.key_size == 0 {
124 return Ok(None);
125 }
126
127 if offset + header_size + header.meta.key_size + header.meta.value_size > self.capacity {
128 return Err(DataStorageError::EofError());
129 }
130
131 let net_size =
132 self.formatter.row_header_size() + header.meta.key_size + header.meta.value_size;
133
134 let kv_bs = &self.as_slice()[offset + self.formatter.row_header_size()..offset + net_size];
135
136 self.formatter.validate_key_value(&header, kv_bs)?;
137
138 let k = &kv_bs[0..header.meta.key_size];
139 if header.meta.expire_timestamp != 0
140 && header.meta.expire_timestamp <= self.options.clock.now()
141 {
142 Ok(Some((header.meta, k, None)))
143 } else {
144 let v = Some(kv_bs[header.meta.key_size..].into());
145 Ok(Some((header.meta, k, v)))
146 }
147 }
148}
149
150impl DataStorageWriter for MmapDataStorage {
151 fn write_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
152 &mut self,
153 row: &RowToWrite<K, V>,
154 ) -> super::Result<crate::RowLocation> {
155 self.ensure_capacity(row)?;
156
157 let value_offset = self.offset;
158 let formatter = self.formatter.clone();
159 let net_size = formatter.encode_row(row, &mut self.as_mut_slice()[value_offset..]);
160 self.offset += net_size + padding(net_size);
161 self.write_times += 1;
162
163 Ok(RowLocation {
164 storage_id: self.storage_id,
165 row_offset: value_offset,
166 })
167 }
168
169 fn rewind(&mut self) -> super::Result<()> {
170 self.data_file.flush()?;
171 self.offset = FILE_HEADER_SIZE;
172 Ok(())
173 }
174
175 fn flush(&mut self) -> super::Result<()> {
176 Ok(self.map_view.flush_range(0, self.capacity)?)
177 }
178}
179
180impl DataStorageReader for MmapDataStorage {
181 fn read_value(&mut self, row_offset: usize) -> super::Result<Option<TimedValue<Vec<u8>>>> {
182 let storage_id = self.storage_id;
183 let row = self
184 .do_read_row(row_offset)
185 .map_err(|e| DataStorageError::ReadRowFailed(storage_id, e.to_string()))?;
186 if row.is_none() {
187 return Err(DataStorageError::ReadRowFailed(
188 self.storage_id,
189 format!("no value found at offset: {}", row_offset),
190 ));
191 }
192
193 let ret = {
194 let (meta, _, v_op) = row.unwrap();
195 if let Some(v) = v_op {
196 Ok(TimedValue {
197 value: v,
198 expire_timestamp: meta.expire_timestamp,
199 }
200 .validate())
201 } else {
202 Ok(None)
203 }
204 };
205 self.read_value_times += 1;
206 ret
207 }
208
209 fn read_next_row(&mut self) -> super::Result<Option<RowToRead>> {
210 let row_offset = self.offset;
211 let row = self.do_read_row(row_offset)?;
212 if row.is_none() {
213 return Ok(None);
214 }
215
216 let (meta, key, v) = row.unwrap();
217 let row_to_read = RowToRead {
218 key: key.into(),
219 value: TimedValue::expirable_value(v.unwrap_or(vec![]), meta.expire_timestamp),
220 row_location: RowLocation {
221 storage_id: self.storage_id,
222 row_offset,
223 },
224 };
225
226 let net_size: usize = self.formatter.row_header_size() + meta.key_size + meta.value_size;
227 self.offset += net_size + padding(net_size);
228
229 Ok(Some(row_to_read))
230 }
231
232 fn seek_to_end(&mut self) -> Result<()> {
233 loop {
234 if self.read_next_row()?.is_none() {
235 break;
236 }
237 }
238 Ok(())
239 }
240
241 fn offset(&self) -> usize {
242 self.offset
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use bitcasky_common::{
249 clock::DebugClock, create_file, formatter::FILE_HEADER_SIZE, fs::FileType,
250 options::DataSotrageType,
251 };
252
253 use super::*;
254
255 use test_log::test;
256 use utilities::common::get_temporary_directory_path;
257
258 fn get_options(max_size: usize) -> BitcaskyOptions {
259 BitcaskyOptions::default()
260 .max_data_file_size(max_size)
261 .init_data_file_capacity(max_size)
262 .storage_type(DataSotrageType::Mmap)
263 }
264
265 fn get_file_storage(options: BitcaskyOptions) -> MmapDataStorage {
266 let dir = get_temporary_directory_path();
267 let storage_id = 1;
268 let formatter = Arc::new(BitcaskyFormatter::default());
269 let file = create_file(dir, FileType::DataFile, Some(storage_id), &formatter, 512).unwrap();
270 let meta = file.metadata().unwrap();
271 MmapDataStorage::new(
272 1,
273 file,
274 FILE_HEADER_SIZE,
275 meta.len() as usize,
276 formatter,
277 Arc::new(options),
278 )
279 .unwrap()
280 }
281
282 #[test]
283 fn test_read_write_immotal_value() {
284 let mut storage = get_file_storage(get_options(1024));
285
286 let k1: Vec<u8> = "key1".into();
287 let v1: Vec<u8> = "value1".into();
288 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
289 let row_location1 = storage.write_row(&row_to_write).unwrap();
290
291 let k2: Vec<u8> = "key2".into();
292 let v2: Vec<u8> = "value2".into();
293 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k2, v2.clone());
294 let row_location2 = storage.write_row(&row_to_write).unwrap();
295
296 assert_eq!(
297 v1,
298 *storage
299 .read_value(row_location1.row_offset)
300 .unwrap()
301 .unwrap()
302 );
303 assert_eq!(
304 v2,
305 *storage
306 .read_value(row_location2.row_offset)
307 .unwrap()
308 .unwrap()
309 );
310 }
311
312 #[test]
313 fn test_read_write_expired_value() {
314 let time = 1000;
315 let clock = Arc::new(DebugClock::new(time));
316 let mut storage = get_file_storage(get_options(1024).debug_clock(clock.clone()));
317
318 let k1: Vec<u8> = "key1".into();
319 let v1: Vec<u8> = "value1".into();
320 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> =
321 RowToWrite::new_with_timestamp(k1, v1.clone(), time);
322 let row_location1 = storage.write_row(&row_to_write).unwrap();
323
324 let k2: Vec<u8> = "key2".into();
325 let v2: Vec<u8> = "value2".into();
326 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> =
327 RowToWrite::new_with_timestamp(k2, v2.clone(), time + 1);
328 let row_location2 = storage.write_row(&row_to_write).unwrap();
329
330 assert!(storage
331 .read_value(row_location1.row_offset)
332 .unwrap()
333 .is_none());
334 assert_eq!(
336 v2,
337 *storage
338 .read_value(row_location2.row_offset)
339 .unwrap()
340 .unwrap()
341 );
342
343 clock.set(time + 1);
345 assert!(storage
346 .read_value(row_location2.row_offset)
347 .unwrap()
348 .is_none());
349 }
350
351 #[test]
352 fn test_write_overflow() {
353 let mut storage = get_file_storage(get_options(2));
354
355 let k1: Vec<u8> = "key1".into();
356 let v1: Vec<u8> = "value1".into();
357 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
358 storage.write_row(&row_to_write).expect_err("overflow");
359 }
360
361 #[test]
362 fn test_expand_file_size() {
363 let mut storage = get_file_storage(get_options(2048));
364 let init_size = storage.data_file.metadata().unwrap().len();
365
366 let mut size = 0;
367 for i in 0..100 {
368 if size >= 1800 {
369 break;
370 }
371
372 let k1: Vec<u8> = format!("key{}", i).into();
373 let v1: Vec<u8> = format!("value{}", i).into();
374 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
375 storage.write_row(&row_to_write).unwrap();
376 let net_size = storage.formatter.net_row_size(&row_to_write);
377 size += net_size + padding(net_size);
378 }
379
380 assert!(storage.data_file.metadata().unwrap().len() > init_size);
381 }
382
383 #[test]
384 fn test_read_next_immortal_row() {
385 let mut storage = get_file_storage(get_options(1024));
386
387 let k1: Vec<u8> = "key1".into();
388 let v1: Vec<u8> = "value1".into();
389 let row_to_write: RowToWrite<&[u8], Vec<u8>> = RowToWrite::new(&k1, v1.clone());
390 storage.write_row(&row_to_write).unwrap();
391
392 let k2: Vec<u8> = "key2".into();
393 let v2: Vec<u8> = "value2".into();
394 let row_to_write: RowToWrite<&[u8], Vec<u8>> = RowToWrite::new(&k2, v2.clone());
395 storage.write_row(&row_to_write).unwrap();
396
397 storage.rewind().unwrap();
398
399 let r = storage.read_next_row().unwrap().unwrap();
400
401 assert_eq!(k1, r.key);
402 assert_eq!(v1, r.value.value);
403 let r = storage.read_next_row().unwrap().unwrap();
404 assert_eq!(k2, r.key);
405 assert_eq!(v2, r.value.value);
406 }
407
408 #[test]
409 fn test_read_next_expired_row() {
410 let time = 1000;
411 let clock = Arc::new(DebugClock::new(time));
412 let mut storage = get_file_storage(get_options(1024).debug_clock(clock.clone()));
413
414 let k1: Vec<u8> = "key1".into();
415 let v1: Vec<u8> = "value1".into();
416 let row_to_write: RowToWrite<&[u8], Vec<u8>> =
417 RowToWrite::new_with_timestamp(&k1, v1.clone(), time + 1);
418 storage.write_row(&row_to_write).unwrap();
419
420 let k2: Vec<u8> = "key2".into();
421 let v2: Vec<u8> = "value2".into();
422 let row_to_write: RowToWrite<&[u8], Vec<u8>> =
423 RowToWrite::new_with_timestamp(&k2, v2.clone(), time);
424 storage.write_row(&row_to_write).unwrap();
425
426 storage.rewind().unwrap();
427
428 let r = storage.read_next_row().unwrap().unwrap();
429 assert_eq!(k1, r.key);
430 assert_eq!(v1, r.value.value);
431 assert_eq!(time + 1, r.value.expire_timestamp);
432 let r = storage.read_next_row().unwrap().unwrap();
433 assert_eq!(k2, r.key);
434 assert!(r.value.value.is_empty());
435 assert_eq!(time, r.value.expire_timestamp);
436 assert!(storage.read_next_row().unwrap().is_none());
437
438 clock.set(time + 1);
440 storage.rewind().unwrap();
441 let r = storage.read_next_row().unwrap().unwrap();
442 assert_eq!(k1, r.key);
443 assert!(r.value.is_empty());
444 assert_eq!(time + 1, r.value.expire_timestamp);
445 let r = storage.read_next_row().unwrap().unwrap();
446 assert_eq!(k2, r.key);
447 assert!(r.value.value.is_empty());
448 assert_eq!(time, r.value.expire_timestamp);
449 assert!(storage.read_next_row().unwrap().is_none());
450 }
451
452 #[test]
453 fn test_rewind() {
454 let mut storage = get_file_storage(get_options(1024));
455
456 let k1: Vec<u8> = "key1".into();
457 let v1: Vec<u8> = "value1".into();
458 let row_to_write: RowToWrite<Vec<u8>, Vec<u8>> = RowToWrite::new(k1, v1.clone());
459 let location = storage.write_row(&row_to_write).unwrap();
460 storage.rewind().unwrap();
461
462 if let Some(r) = storage.read_next_row().unwrap() {
463 assert_eq!("key1".as_bytes().to_vec(), r.key);
464 assert_eq!(location, r.row_location);
465 } else {
466 unreachable!();
467 }
468 }
469}