d_engine/storage/adaptors/file/
file_storage_engine.rs1use std::collections::BTreeMap;
2use std::collections::HashMap;
3use std::fs::File;
4use std::fs::OpenOptions;
5use std::fs::{self};
6use std::io::Read;
7use std::io::Seek;
8use std::io::SeekFrom;
9use std::io::Write;
10use std::ops::RangeInclusive;
11use std::path::Path;
12use std::path::PathBuf;
13use std::sync::atomic::AtomicU64;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use std::sync::Mutex;
17
18use prost::Message;
19use tonic::async_trait;
20use tracing::info;
21
22use crate::proto::common::Entry;
23use crate::proto::common::LogId;
24use crate::Error;
25use crate::HardState;
26use crate::LogStore;
27use crate::MetaStore;
28use crate::StorageEngine;
29use crate::StorageError;
30
31const HARD_STATE_FILE_NAME: &str = "hard_state.bin";
33pub(crate) const HARD_STATE_KEY: &[u8] = b"hard_state";
34
35#[derive(Debug)]
37pub struct FileLogStore {
38 #[allow(unused)]
39 data_dir: PathBuf,
40
41 entries: Mutex<BTreeMap<u64, Entry>>,
42 last_index: AtomicU64,
43 file_handle: Mutex<File>,
44
45 index_positions: Mutex<BTreeMap<u64, u64>>, }
47
48#[derive(Debug)]
50pub struct FileMetaStore {
51 data_dir: PathBuf,
52 data: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
53}
54
55#[derive(Debug)]
57pub struct FileStorageEngine {
58 log_store: Arc<FileLogStore>,
59 meta_store: Arc<FileMetaStore>,
60 data_dir: PathBuf,
61}
62
63impl StorageEngine for FileStorageEngine {
64 type LogStore = FileLogStore;
65 type MetaStore = FileMetaStore;
66
67 #[inline]
68 fn log_store(&self) -> Arc<Self::LogStore> {
69 self.log_store.clone()
70 }
71
72 #[inline]
73 fn meta_store(&self) -> Arc<Self::MetaStore> {
74 self.meta_store.clone()
75 }
76}
77
78impl FileStorageEngine {
79 pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
81 fs::create_dir_all(&data_dir)?;
83
84 let log_store = Arc::new(FileLogStore::new(data_dir.join("logs"))?);
86
87 let meta_store = Arc::new(FileMetaStore::new(data_dir.join("meta"))?);
89
90 Ok(Self {
91 log_store,
92 meta_store,
93 data_dir,
94 })
95 }
96
97 pub fn data_dir(&self) -> &Path {
99 &self.data_dir
100 }
101}
102
103impl FileLogStore {
104 pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
106 fs::create_dir_all(&data_dir)?;
108
109 let log_file_path = data_dir.join("log.data");
111 let file = OpenOptions::new()
112 .read(true)
113 .write(true)
114 .create(true)
115 .truncate(false)
116 .open(log_file_path)?;
117
118 let entries = Mutex::new(BTreeMap::new());
120 let last_index = AtomicU64::new(0);
121 let index_positions = Mutex::new(BTreeMap::new());
122
123 let store = Self {
124 data_dir,
125 entries,
126 last_index,
127 file_handle: Mutex::new(file),
128 index_positions,
129 };
130
131 store.load_from_file()?;
133
134 Ok(store)
135 }
136
137 fn load_from_file(&self) -> Result<(), Error> {
139 let mut file = self.file_handle.lock().unwrap();
140 file.seek(SeekFrom::Start(0))?;
141
142 let mut entries = self.entries.lock().unwrap();
143 let mut index_positions = self.index_positions.lock().unwrap();
144 let mut buffer = Vec::new();
145 file.read_to_end(&mut buffer)?;
146
147 let mut pos = 0;
148 let mut max_index = 0;
149
150 while pos < buffer.len() {
151 let entry_position = pos as u64;
153
154 if pos + 8 > buffer.len() {
156 break;
157 }
158
159 let len_bytes = &buffer[pos..pos + 8];
160 let entry_len = u64::from_be_bytes([
161 len_bytes[0],
162 len_bytes[1],
163 len_bytes[2],
164 len_bytes[3],
165 len_bytes[4],
166 len_bytes[5],
167 len_bytes[6],
168 len_bytes[7],
169 ]) as usize;
170
171 pos += 8;
172
173 if pos + entry_len > buffer.len() {
175 break;
176 }
177
178 let entry_data = &buffer[pos..pos + entry_len];
179 match Entry::decode(entry_data) {
180 Ok(entry) => {
181 entries.insert(entry.index, entry.clone());
182 index_positions.insert(entry.index, entry_position);
183 max_index = max_index.max(entry.index);
184 }
185 Err(e) => {
186 eprintln!("Failed to decode entry: {e}",);
187 }
189 }
190
191 pos += entry_len;
192 }
193
194 self.last_index.store(max_index, Ordering::SeqCst);
195 Ok(())
196 }
197
198 fn append_to_file(
200 &self,
201 entry: &Entry,
202 ) -> Result<(), Error> {
203 let mut file = self.file_handle.lock().unwrap();
204
205 let position = file.seek(SeekFrom::End(0))?;
207
208 let encoded = entry.encode_to_vec();
209
210 let len = encoded.len() as u64;
212 file.write_all(&len.to_be_bytes())?;
213
214 file.write_all(&encoded)?;
216
217 file.flush()?;
218
219 let mut index_positions = self.index_positions.lock().unwrap();
221 index_positions.insert(entry.index, position);
222
223 Ok(())
224 }
225
226 #[cfg(test)]
227 pub fn reset_sync(&self) -> Result<(), Error> {
228 {
229 let mut file = self.file_handle.lock().unwrap();
230 file.set_len(0)?;
231 file.seek(SeekFrom::Start(0))?;
232 file.flush()?;
233 }
234 {
235 let mut store = self.entries.lock().unwrap();
236 store.clear();
237 }
238 {
239 let mut index_positions = self.index_positions.lock().unwrap();
240 index_positions.clear();
241 }
242 self.last_index.store(0, Ordering::SeqCst);
243 Ok(())
244 }
245}
246
247#[async_trait]
248impl LogStore for FileLogStore {
249 async fn persist_entries(
250 &self,
251 entries: Vec<Entry>,
252 ) -> Result<(), Error> {
253 let mut max_index = 0;
254
255 for entry in entries {
256 self.append_to_file(&entry)?;
258
259 {
261 let mut store = self.entries.lock().unwrap();
262 store.insert(entry.index, entry.clone());
263 }
264
265 max_index = max_index.max(entry.index);
266 }
267
268 if max_index > 0 {
269 self.last_index.store(max_index, Ordering::SeqCst);
270 }
271
272 Ok(())
273 }
274
275 async fn entry(
276 &self,
277 index: u64,
278 ) -> Result<Option<Entry>, Error> {
279 let store = self.entries.lock().unwrap();
280 Ok(store.get(&index).cloned())
281 }
282
283 fn get_entries(
284 &self,
285 range: RangeInclusive<u64>,
286 ) -> Result<Vec<Entry>, Error> {
287 let store = self.entries.lock().unwrap();
288 let mut result = Vec::new();
289
290 for (_, entry) in store.range(range) {
291 result.push(entry.clone());
292 }
293
294 Ok(result)
295 }
296
297 async fn purge(
298 &self,
299 cutoff_index: LogId,
300 ) -> Result<(), Error> {
301 let indexes_to_remove: Vec<u64> = {
302 let index_positions = self.index_positions.lock().unwrap();
303 index_positions.range(0..=cutoff_index.index).map(|(k, _)| *k).collect()
304 };
305
306 {
308 let mut entries = self.entries.lock().unwrap();
309 for index in &indexes_to_remove {
310 entries.remove(index);
311 }
312 }
313
314 {
316 let mut index_positions = self.index_positions.lock().unwrap();
317 for index in &indexes_to_remove {
318 index_positions.remove(index);
319 }
320 }
321
322 Ok(())
325 }
326
327 async fn truncate(
328 &self,
329 from_index: u64,
330 ) -> Result<(), Error> {
331 let indexes_to_remove: Vec<u64> = {
332 let index_positions = self.index_positions.lock().unwrap();
333 index_positions.range(from_index..).map(|(k, _)| *k).collect()
334 };
335
336 {
338 let mut entries = self.entries.lock().unwrap();
339 for index in &indexes_to_remove {
340 entries.remove(index);
341 }
342 }
343
344 {
346 let mut index_positions = self.index_positions.lock().unwrap();
347 for index in &indexes_to_remove {
348 index_positions.remove(index);
349 }
350 }
351
352 if let Some(last_keep_position) = self
354 .index_positions
355 .lock()
356 .unwrap()
357 .range(..from_index)
358 .next_back()
359 .map(|(_, pos)| *pos)
360 {
361 let mut file = self.file_handle.lock().unwrap();
362
363 file.seek(SeekFrom::Start(last_keep_position))?;
365 let mut len_buffer = [0u8; 8];
366 file.read_exact(&mut len_buffer)?;
367 let entry_len = u64::from_be_bytes(len_buffer);
368
369 let truncate_pos = last_keep_position + 8 + entry_len;
371
372 file.set_len(truncate_pos)?;
374 } else {
375 let file = self.file_handle.lock().unwrap();
377 file.set_len(0)?;
378 }
379
380 if let Some(new_last_index) = self.index_positions.lock().unwrap().keys().next_back() {
382 self.last_index.store(*new_last_index, Ordering::SeqCst);
383 } else {
384 self.last_index.store(0, Ordering::SeqCst);
385 }
386
387 Ok(())
388 }
389
390 fn flush(&self) -> Result<(), Error> {
391 let mut file = self.file_handle.lock().unwrap();
392 file.flush()?;
393 file.sync_all()?;
394 Ok(())
395 }
396
397 async fn flush_async(&self) -> Result<(), Error> {
398 self.flush()
399 }
400
401 async fn reset(&self) -> Result<(), Error> {
402 {
403 let mut file = self.file_handle.lock().unwrap();
404 file.set_len(0)?;
405 file.seek(SeekFrom::Start(0))?;
406 file.flush()?;
407 }
408 {
409 let mut store = self.entries.lock().unwrap();
410 store.clear();
411 }
412 {
413 let mut index_positions = self.index_positions.lock().unwrap();
414 index_positions.clear();
415 }
416 self.last_index.store(0, Ordering::SeqCst);
417 Ok(())
418 }
419
420 fn last_index(&self) -> u64 {
421 self.last_index.load(Ordering::SeqCst)
422 }
423}
424
425impl FileMetaStore {
426 pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
428 fs::create_dir_all(&data_dir)?;
430
431 let store = Self {
432 data_dir,
433 data: Mutex::new(HashMap::new()),
434 };
435
436 store.load_from_file()?;
438
439 Ok(store)
440 }
441
442 fn load_from_file(&self) -> Result<(), Error> {
444 let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
445
446 if hard_state_path.exists() {
447 let mut file = File::open(hard_state_path)?;
448 let mut buffer = Vec::new();
449 file.read_to_end(&mut buffer)?;
450
451 match bincode::deserialize::<HardState>(&buffer) {
452 Ok(_hard_state) => {
453 let mut data = self.data.lock().unwrap();
454 data.insert(HARD_STATE_KEY.to_vec(), buffer);
455 info!("Loaded hard state from file");
456 }
457 Err(e) => {
458 eprintln!("Failed to decode hard state: {e}",);
459 }
460 }
461 }
462
463 Ok(())
464 }
465
466 fn save_to_file(
468 &self,
469 key: &[u8],
470 value: &[u8],
471 ) -> Result<(), Error> {
472 if key == HARD_STATE_KEY {
473 let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
474 let mut file = File::create(hard_state_path)?;
475 file.write_all(value)?;
476 file.flush()?;
477 }
478
479 Ok(())
480 }
481}
482
483#[async_trait]
484impl MetaStore for FileMetaStore {
485 fn save_hard_state(
486 &self,
487 state: &HardState,
488 ) -> Result<(), Error> {
489 let serialized = bincode::serialize(state).map_err(StorageError::BincodeError)?;
490
491 let mut data = self.data.lock().unwrap();
492 data.insert(HARD_STATE_KEY.to_vec(), serialized.clone());
493
494 self.save_to_file(HARD_STATE_KEY, &serialized)?;
495
496 info!("Persisted hard state to file");
497 Ok(())
498 }
499
500 fn load_hard_state(&self) -> Result<Option<HardState>, Error> {
501 let data = self.data.lock().unwrap();
502
503 match data.get(HARD_STATE_KEY) {
504 Some(bytes) => {
505 let state = bincode::deserialize(bytes).map_err(StorageError::BincodeError)?;
506 info!("Loaded hard state from memory");
507 Ok(Some(state))
508 }
509 None => {
510 info!("No hard state found");
511 Ok(None)
512 }
513 }
514 }
515
516 fn flush(&self) -> Result<(), Error> {
517 Ok(())
519 }
520
521 async fn flush_async(&self) -> Result<(), Error> {
522 self.flush()
523 }
524}