1use bytes::{Buf, BytesMut};
2use parking_lot::RwLock;
3use prost::{decode_length_delimiter, length_delimiter_len};
4use std::{
5 path::{Path, PathBuf},
6 sync::Arc,
7};
8
9use super::log_record::{LogRecord, LogRecordPos, LogRecordType, ReadLogRecord};
10use crate::{
11 data::log_record::max_log_record_header_size,
12 errors::{Errors, Result},
13 fio::{new_io_manager, IOManager},
14 option::IOManagerType,
15};
16
17pub const DATA_FILE_NAME_SUFFIX: &str = ".data";
18pub const HINT_FILE_NAME: &str = "hint-index";
19pub const MERGE_FINISHED_FILE_NAME: &str = "merge-finished";
20pub const SEQ_NO_FILE_NAME: &str = "seq-no";
21
22#[macro_export]
23macro_rules! new_data_file {
24 () => {
25 pub fn new<P: AsRef<std::path::Path>>(dir_path: P, file_id: u32, io_type: IOManagerType) -> Result<Self> {
26 let file_name = get_data_file_name(&dir_path, file_id);
27 let io_manager = new_io_manager(&file_name, &io_type);
28 Ok(Self {
29 file_id: std::sync::Arc::new(parking_lot::RwLock::new(file_id)),
30 write_off: std::sync::Arc::new(parking_lot::RwLock::new(0)),
31 io_manager,
32 })
33 }
34 };
35 ($($name:ident, $file_id:expr, $io_type:expr, $file_name:expr);*;) => {
36 $(
37 pub fn $name<P: AsRef<std::path::Path>>(dir_path: P) -> Result<Self> {
38 let file_name = $file_name.map_or_else(
39 || panic!("File name must be provided"),
40 |name| dir_path.as_ref().join(name),
41 );
42 let io_manager = new_io_manager(&file_name, &$io_type);
43 Ok(Self {
44 file_id: std::sync::Arc::new(parking_lot::RwLock::new($file_id)),
45 write_off: std::sync::Arc::new(parking_lot::RwLock::new(0)),
46 io_manager,
47 })
48 }
49 )*
50 };
51}
52
53pub struct DataFile {
54 file_id: Arc<RwLock<u32>>, write_off: Arc<RwLock<u64>>, io_manager: Box<dyn IOManager>, }
58
59impl DataFile {
60 new_data_file!();
62
63 new_data_file!(
65 new_hint_file,
66 0,
67 IOManagerType::StandardFileIO,
68 Some(HINT_FILE_NAME);
69 new_merge_fin_file,
70 0,
71 IOManagerType::StandardFileIO,
72 Some(MERGE_FINISHED_FILE_NAME);
73 new_seq_no_file,
74 0,
75 IOManagerType::StandardFileIO,
76 Some(SEQ_NO_FILE_NAME);
77 );
78 pub fn file_size(&self) -> u64 {
79 self.io_manager.size()
80 }
81
82 pub fn get_write_off(&self) -> u64 {
83 let read_guard = self.write_off.read();
84 *read_guard
85 }
86
87 pub fn set_write_off(&self, offset: u64) {
88 let mut write_guard = self.write_off.write();
89 *write_guard = offset;
90 }
91
92 pub fn get_file_id(&self) -> u32 {
93 let read_guard = self.file_id.read();
94 *read_guard
95 }
96
97 pub fn read_log_record(&self, offset: u64) -> Result<ReadLogRecord> {
99 let mut header_buf = BytesMut::zeroed(max_log_record_header_size());
101 self.io_manager.read(&mut header_buf, offset)?;
102
103 let rec_type = header_buf.get_u8();
105
106 let key_size = decode_length_delimiter(&mut header_buf).unwrap();
108 let value_size = decode_length_delimiter(&mut header_buf).unwrap();
109
110 if key_size == 0 && value_size == 0 {
112 return Err(Errors::ReadDataFileEOF);
113 }
114
115 let actual_header_size = length_delimiter_len(key_size) + length_delimiter_len(value_size) + 1;
117
118 let mut kv_buf = BytesMut::zeroed(key_size + value_size + 4);
120 self
121 .io_manager
122 .read(&mut kv_buf, offset + actual_header_size as u64)?;
123
124 let log_record = LogRecord {
126 key: kv_buf.get(..key_size).unwrap().to_vec(),
127 value: kv_buf.get(key_size..kv_buf.len() - 4).unwrap().to_vec(),
128 rec_type: LogRecordType::from_u8(rec_type),
129 };
130
131 kv_buf.advance(key_size + value_size);
133
134 if kv_buf.get_u32() != log_record.get_crc() {
135 return Err(Errors::InvalidLogRecordCrc);
136 }
137
138 Ok(ReadLogRecord {
139 record: log_record,
140 size: actual_header_size + key_size + value_size + 4,
141 })
142 }
143
144 pub fn write(&self, buf: &[u8]) -> Result<usize> {
145 let n_bytes = self.io_manager.write(buf)?;
146
147 let mut write_off = self.write_off.write();
149 *write_off += n_bytes as u64;
150
151 Ok(n_bytes)
152 }
153
154 pub fn write_hint_record(&self, key: Vec<u8>, pos: LogRecordPos) -> Result<()> {
156 let hint_record = LogRecord {
157 key,
158 value: pos.encode(),
159 rec_type: LogRecordType::Normal,
160 };
161 let enc_record = hint_record.encode();
162 self.write(&enc_record)?;
163 Ok(())
164 }
165
166 pub fn sync(&self) -> Result<()> {
167 self.io_manager.sync()
168 }
169
170 pub fn set_io_manager<P>(&mut self, dir_path: P, io_type: IOManagerType)
171 where
172 P: AsRef<Path>,
173 {
174 self.io_manager = new_io_manager(&get_data_file_name(dir_path, self.get_file_id()), &io_type);
175 }
176}
177
178pub fn get_data_file_name<P>(dir_path: P, file_id: u32) -> PathBuf
180where
181 P: AsRef<Path>,
182{
183 let name = format!("{:09}", file_id) + DATA_FILE_NAME_SUFFIX;
184 dir_path.as_ref().join(name)
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_new_data_file() {
193 let dir_path = std::env::temp_dir();
194 let data_file_res = DataFile::new(&dir_path, 0, IOManagerType::StandardFileIO);
195 assert!(data_file_res.is_ok());
196 let data_file = data_file_res.unwrap();
197 assert_eq!(data_file.get_file_id(), 0);
198
199 let data_file_res2 = DataFile::new(&dir_path, 0, IOManagerType::StandardFileIO);
200 assert!(data_file_res2.is_ok());
201 let data_file2 = data_file_res2.unwrap();
202 assert_eq!(data_file2.get_file_id(), 0);
203
204 let data_file_res3 = DataFile::new(&dir_path, 160, IOManagerType::StandardFileIO);
205 assert!(data_file_res3.is_ok());
206 let data_file3 = data_file_res3.unwrap();
207 assert_eq!(data_file3.get_file_id(), 160);
208 }
209
210 #[test]
211 fn test_data_file_write() {
212 let dir_path = std::env::temp_dir();
213 let data_file_res = DataFile::new(&dir_path, 2, IOManagerType::StandardFileIO);
214 assert!(data_file_res.is_ok());
215 let data_file = data_file_res.unwrap();
216 assert_eq!(data_file.get_file_id(), 2);
217
218 let write_res1 = data_file.write("aaa".as_bytes());
219 assert!(write_res1.is_ok());
220 assert_eq!(3 as usize, write_res1.ok().unwrap());
221
222 let write_res2 = data_file.write("bbb".as_bytes());
223 assert!(write_res2.is_ok());
224 assert_eq!(3 as usize, write_res2.ok().unwrap());
225 }
226
227 #[test]
228 fn test_data_file_sync() {
229 let dir_path = std::env::temp_dir();
230 let data_file_res = DataFile::new(&dir_path, 3, IOManagerType::StandardFileIO);
231 assert!(data_file_res.is_ok());
232 let data_file = data_file_res.unwrap();
233 assert_eq!(data_file.get_file_id(), 3);
234
235 let sync_res = data_file.sync();
236 assert!(sync_res.is_ok());
237 }
238
239 #[test]
240 fn test_data_file_read_log_record() {
241 let dir_path = std::env::temp_dir();
242 let data_file_res = DataFile::new(&dir_path, 600, IOManagerType::StandardFileIO);
243 assert!(data_file_res.is_ok());
244 let data_file = data_file_res.unwrap();
245 assert_eq!(data_file.get_file_id(), 600);
246
247 let enc1 = LogRecord {
248 key: "key-a".as_bytes().to_vec(),
249 value: "value-a".as_bytes().to_vec(),
250 rec_type: LogRecordType::Normal,
251 };
252 let buf1 = enc1.encode();
253 let write_res1: std::prelude::v1::Result<usize, Errors> = data_file.write(&buf1);
254 assert!(write_res1.is_ok());
255
256 let read_res1 = data_file.read_log_record(0);
258 assert!(read_res1.is_ok());
259 let read_enc1 = read_res1.ok().unwrap();
260 assert_eq!(enc1.key, read_enc1.record.key);
261 assert_eq!(enc1.value, read_enc1.record.value);
262 assert_eq!(enc1.rec_type, read_enc1.record.rec_type);
263
264 let enc2 = LogRecord {
266 key: "key-b".as_bytes().to_vec(),
267 value: "value-b".as_bytes().to_vec(),
268 rec_type: LogRecordType::Normal,
269 };
270 let enc3 = LogRecord {
271 key: "key-c".as_bytes().to_vec(),
272 value: "value-c".as_bytes().to_vec(),
273 rec_type: LogRecordType::Normal,
274 };
275
276 let buf2 = enc2.encode();
278 let buf3 = enc3.encode();
279
280 let write_res2 = data_file.write(&buf2);
281 assert!(write_res2.is_ok());
282 let write_res3 = data_file.write(&buf3);
283
284 let read_res2 = data_file.read_log_record(19);
285 assert!(read_res2.is_ok());
286 let read_enc2 = read_res2.ok().unwrap();
287 assert_eq!(enc2.key, read_enc2.record.key);
288 assert_eq!(enc2.value, read_enc2.record.value);
289 assert_eq!(enc2.rec_type, read_enc2.record.rec_type);
290
291 let read_res3 = data_file.read_log_record(19 + read_enc2.size as u64);
292 assert!(read_res3.is_ok());
293 let read_enc3 = read_res3.ok().unwrap();
294 assert_eq!(enc3.key, read_enc3.record.key);
295 assert_eq!(enc3.value, read_enc3.record.value);
296 assert_eq!(enc3.rec_type, read_enc3.record.rec_type);
297
298 let enc4 = LogRecord {
300 key: "key-d".as_bytes().to_vec(),
301 value: "value-d".as_bytes().to_vec(),
302 rec_type: LogRecordType::Deleted,
303 };
304
305 let buf4 = enc4.encode();
306 assert!(write_res3.is_ok());
307 let write_res4: std::prelude::v1::Result<usize, Errors> = data_file.write(&buf4);
308 assert!(write_res4.is_ok());
309
310 let read_res4 = data_file.read_log_record(19 + read_enc2.size as u64 + read_enc3.size as u64);
311 assert!(read_res4.is_ok());
312 let read_enc4 = read_res4.ok().unwrap();
313 assert_eq!(enc4.key, read_enc4.record.key);
314 assert_eq!(enc4.value, read_enc4.record.value);
315 assert_eq!(enc4.rec_type, read_enc4.record.rec_type);
316 }
317}