bitkv_rs/data/
data_file.rs

1use bytes::{Buf, BytesMut};
2use parking_lot::RwLock;
3use prost::{decode_length_delimiter, length_delimiter_len};
4use std::{
5  path::{Path, PathBuf},
6  sync::Arc,
7};
8
9use super::log_record::{LogRecord, LogRecordPos, LogRecordType, ReadLogRecord};
10use crate::{
11  data::log_record::max_log_record_header_size,
12  errors::{Errors, Result},
13  fio::{new_io_manager, IOManager},
14  option::IOManagerType,
15};
16
17pub const DATA_FILE_NAME_SUFFIX: &str = ".data";
18pub const HINT_FILE_NAME: &str = "hint-index";
19pub const MERGE_FINISHED_FILE_NAME: &str = "merge-finished";
20pub const SEQ_NO_FILE_NAME: &str = "seq-no";
21
22#[macro_export]
23macro_rules! new_data_file {
24  () => {
25      pub fn new<P: AsRef<std::path::Path>>(dir_path: P, file_id: u32, io_type: IOManagerType) -> Result<Self> {
26          let file_name = get_data_file_name(&dir_path, file_id);
27          let io_manager = new_io_manager(&file_name, &io_type);
28          Ok(Self {
29              file_id: std::sync::Arc::new(parking_lot::RwLock::new(file_id)),
30              write_off: std::sync::Arc::new(parking_lot::RwLock::new(0)),
31              io_manager,
32          })
33      }
34  };
35  ($($name:ident, $file_id:expr, $io_type:expr, $file_name:expr);*;) => {
36      $(
37          pub fn $name<P: AsRef<std::path::Path>>(dir_path: P) -> Result<Self> {
38              let file_name = $file_name.map_or_else(
39                  || panic!("File name must be provided"),
40                  |name| dir_path.as_ref().join(name),
41              );
42              let io_manager = new_io_manager(&file_name, &$io_type);
43              Ok(Self {
44                  file_id: std::sync::Arc::new(parking_lot::RwLock::new($file_id)),
45                  write_off: std::sync::Arc::new(parking_lot::RwLock::new(0)),
46                  io_manager,
47              })
48          }
49      )*
50  };
51}
52
53pub struct DataFile {
54  file_id: Arc<RwLock<u32>>,      // data file id
55  write_off: Arc<RwLock<u64>>, // current write offset, used for recording appending write position
56  io_manager: Box<dyn IOManager>, // IO manager interface
57}
58
59impl DataFile {
60  // create or open a new data file
61  new_data_file!();
62
63  // create or open hint file, merge finished file and sequence number file
64  new_data_file!(
65    new_hint_file,
66    0,
67    IOManagerType::StandardFileIO,
68    Some(HINT_FILE_NAME);
69    new_merge_fin_file,
70    0,
71    IOManagerType::StandardFileIO,
72    Some(MERGE_FINISHED_FILE_NAME);
73    new_seq_no_file,
74    0,
75    IOManagerType::StandardFileIO,
76    Some(SEQ_NO_FILE_NAME);
77  );
78  pub fn file_size(&self) -> u64 {
79    self.io_manager.size()
80  }
81
82  pub fn get_write_off(&self) -> u64 {
83    let read_guard = self.write_off.read();
84    *read_guard
85  }
86
87  pub fn set_write_off(&self, offset: u64) {
88    let mut write_guard = self.write_off.write();
89    *write_guard = offset;
90  }
91
92  pub fn get_file_id(&self) -> u32 {
93    let read_guard = self.file_id.read();
94    *read_guard
95  }
96
97  // read log record by offset
98  pub fn read_log_record(&self, offset: u64) -> Result<ReadLogRecord> {
99    // read header
100    let mut header_buf = BytesMut::zeroed(max_log_record_header_size());
101    self.io_manager.read(&mut header_buf, offset)?;
102
103    // Retrieve first byte of header, which is the type of log record
104    let rec_type = header_buf.get_u8();
105
106    // Retrieve the length of the key and value
107    let key_size = decode_length_delimiter(&mut header_buf).unwrap();
108    let value_size = decode_length_delimiter(&mut header_buf).unwrap();
109
110    // if key_size and value_size are 0, EOF then return error
111    if key_size == 0 && value_size == 0 {
112      return Err(Errors::ReadDataFileEOF);
113    }
114
115    // get actual data size
116    let actual_header_size = length_delimiter_len(key_size) + length_delimiter_len(value_size) + 1;
117
118    // read actual key and value, last 4 bytes is crc32 checksum
119    let mut kv_buf = BytesMut::zeroed(key_size + value_size + 4);
120    self
121      .io_manager
122      .read(&mut kv_buf, offset + actual_header_size as u64)?;
123
124    // construct log record
125    let log_record = LogRecord {
126      key: kv_buf.get(..key_size).unwrap().to_vec(),
127      value: kv_buf.get(key_size..kv_buf.len() - 4).unwrap().to_vec(),
128      rec_type: LogRecordType::from_u8(rec_type),
129    };
130
131    // advance to last 4 bytes, read crc32 checksum
132    kv_buf.advance(key_size + value_size);
133
134    if kv_buf.get_u32() != log_record.get_crc() {
135      return Err(Errors::InvalidLogRecordCrc);
136    }
137
138    Ok(ReadLogRecord {
139      record: log_record,
140      size: actual_header_size + key_size + value_size + 4,
141    })
142  }
143
144  pub fn write(&self, buf: &[u8]) -> Result<usize> {
145    let n_bytes = self.io_manager.write(buf)?;
146
147    //update write_off
148    let mut write_off = self.write_off.write();
149    *write_off += n_bytes as u64;
150
151    Ok(n_bytes)
152  }
153
154  // write hint record into hint file
155  pub fn write_hint_record(&self, key: Vec<u8>, pos: LogRecordPos) -> Result<()> {
156    let hint_record = LogRecord {
157      key,
158      value: pos.encode(),
159      rec_type: LogRecordType::Normal,
160    };
161    let enc_record = hint_record.encode();
162    self.write(&enc_record)?;
163    Ok(())
164  }
165
166  pub fn sync(&self) -> Result<()> {
167    self.io_manager.sync()
168  }
169
170  pub fn set_io_manager<P>(&mut self, dir_path: P, io_type: IOManagerType)
171  where
172    P: AsRef<Path>,
173  {
174    self.io_manager = new_io_manager(&get_data_file_name(dir_path, self.get_file_id()), &io_type);
175  }
176}
177
178/// get filename
179pub fn get_data_file_name<P>(dir_path: P, file_id: u32) -> PathBuf
180where
181  P: AsRef<Path>,
182{
183  let name = format!("{:09}", file_id) + DATA_FILE_NAME_SUFFIX;
184  dir_path.as_ref().join(name)
185}
186
187#[cfg(test)]
188mod tests {
189  use super::*;
190
191  #[test]
192  fn test_new_data_file() {
193    let dir_path = std::env::temp_dir();
194    let data_file_res = DataFile::new(&dir_path, 0, IOManagerType::StandardFileIO);
195    assert!(data_file_res.is_ok());
196    let data_file = data_file_res.unwrap();
197    assert_eq!(data_file.get_file_id(), 0);
198
199    let data_file_res2 = DataFile::new(&dir_path, 0, IOManagerType::StandardFileIO);
200    assert!(data_file_res2.is_ok());
201    let data_file2 = data_file_res2.unwrap();
202    assert_eq!(data_file2.get_file_id(), 0);
203
204    let data_file_res3 = DataFile::new(&dir_path, 160, IOManagerType::StandardFileIO);
205    assert!(data_file_res3.is_ok());
206    let data_file3 = data_file_res3.unwrap();
207    assert_eq!(data_file3.get_file_id(), 160);
208  }
209
210  #[test]
211  fn test_data_file_write() {
212    let dir_path = std::env::temp_dir();
213    let data_file_res = DataFile::new(&dir_path, 2, IOManagerType::StandardFileIO);
214    assert!(data_file_res.is_ok());
215    let data_file = data_file_res.unwrap();
216    assert_eq!(data_file.get_file_id(), 2);
217
218    let write_res1 = data_file.write("aaa".as_bytes());
219    assert!(write_res1.is_ok());
220    assert_eq!(3 as usize, write_res1.ok().unwrap());
221
222    let write_res2 = data_file.write("bbb".as_bytes());
223    assert!(write_res2.is_ok());
224    assert_eq!(3 as usize, write_res2.ok().unwrap());
225  }
226
227  #[test]
228  fn test_data_file_sync() {
229    let dir_path = std::env::temp_dir();
230    let data_file_res = DataFile::new(&dir_path, 3, IOManagerType::StandardFileIO);
231    assert!(data_file_res.is_ok());
232    let data_file = data_file_res.unwrap();
233    assert_eq!(data_file.get_file_id(), 3);
234
235    let sync_res = data_file.sync();
236    assert!(sync_res.is_ok());
237  }
238
239  #[test]
240  fn test_data_file_read_log_record() {
241    let dir_path = std::env::temp_dir();
242    let data_file_res = DataFile::new(&dir_path, 600, IOManagerType::StandardFileIO);
243    assert!(data_file_res.is_ok());
244    let data_file = data_file_res.unwrap();
245    assert_eq!(data_file.get_file_id(), 600);
246
247    let enc1 = LogRecord {
248      key: "key-a".as_bytes().to_vec(),
249      value: "value-a".as_bytes().to_vec(),
250      rec_type: LogRecordType::Normal,
251    };
252    let buf1 = enc1.encode();
253    let write_res1: std::prelude::v1::Result<usize, Errors> = data_file.write(&buf1);
254    assert!(write_res1.is_ok());
255
256    // read from offset 0
257    let read_res1 = data_file.read_log_record(0);
258    assert!(read_res1.is_ok());
259    let read_enc1 = read_res1.ok().unwrap();
260    assert_eq!(enc1.key, read_enc1.record.key);
261    assert_eq!(enc1.value, read_enc1.record.value);
262    assert_eq!(enc1.rec_type, read_enc1.record.rec_type);
263
264    // multiple log records
265    let enc2 = LogRecord {
266      key: "key-b".as_bytes().to_vec(),
267      value: "value-b".as_bytes().to_vec(),
268      rec_type: LogRecordType::Normal,
269    };
270    let enc3 = LogRecord {
271      key: "key-c".as_bytes().to_vec(),
272      value: "value-c".as_bytes().to_vec(),
273      rec_type: LogRecordType::Normal,
274    };
275
276    // Read from current write offset
277    let buf2 = enc2.encode();
278    let buf3 = enc3.encode();
279
280    let write_res2 = data_file.write(&buf2);
281    assert!(write_res2.is_ok());
282    let write_res3 = data_file.write(&buf3);
283
284    let read_res2 = data_file.read_log_record(19);
285    assert!(read_res2.is_ok());
286    let read_enc2 = read_res2.ok().unwrap();
287    assert_eq!(enc2.key, read_enc2.record.key);
288    assert_eq!(enc2.value, read_enc2.record.value);
289    assert_eq!(enc2.rec_type, read_enc2.record.rec_type);
290
291    let read_res3 = data_file.read_log_record(19 + read_enc2.size as u64);
292    assert!(read_res3.is_ok());
293    let read_enc3 = read_res3.ok().unwrap();
294    assert_eq!(enc3.key, read_enc3.record.key);
295    assert_eq!(enc3.value, read_enc3.record.value);
296    assert_eq!(enc3.rec_type, read_enc3.record.rec_type);
297
298    // read record type deleted
299    let enc4 = LogRecord {
300      key: "key-d".as_bytes().to_vec(),
301      value: "value-d".as_bytes().to_vec(),
302      rec_type: LogRecordType::Deleted,
303    };
304
305    let buf4 = enc4.encode();
306    assert!(write_res3.is_ok());
307    let write_res4: std::prelude::v1::Result<usize, Errors> = data_file.write(&buf4);
308    assert!(write_res4.is_ok());
309
310    let read_res4 = data_file.read_log_record(19 + read_enc2.size as u64 + read_enc3.size as u64);
311    assert!(read_res4.is_ok());
312    let read_enc4 = read_res4.ok().unwrap();
313    assert_eq!(enc4.key, read_enc4.record.key);
314    assert_eq!(enc4.value, read_enc4.record.value);
315    assert_eq!(enc4.rec_type, read_enc4.record.rec_type);
316  }
317}