d_engine/storage/adaptors/file/
file_storage_engine.rs

1use std::collections::BTreeMap;
2use std::collections::HashMap;
3use std::fs::File;
4use std::fs::OpenOptions;
5use std::fs::{self};
6use std::io::Read;
7use std::io::Seek;
8use std::io::SeekFrom;
9use std::io::Write;
10use std::ops::RangeInclusive;
11use std::path::Path;
12use std::path::PathBuf;
13use std::sync::atomic::AtomicU64;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use std::sync::Mutex;
17
18use prost::Message;
19use tonic::async_trait;
20use tracing::info;
21
22use crate::proto::common::Entry;
23use crate::proto::common::LogId;
24use crate::Error;
25use crate::HardState;
26use crate::LogStore;
27use crate::MetaStore;
28use crate::StorageEngine;
29use crate::StorageError;
30
31// Constants for file structure
32const HARD_STATE_FILE_NAME: &str = "hard_state.bin";
33pub(crate) const HARD_STATE_KEY: &[u8] = b"hard_state";
34
35/// File-based log store implementation
36#[derive(Debug)]
37pub struct FileLogStore {
38    #[allow(unused)]
39    data_dir: PathBuf,
40
41    entries: Mutex<BTreeMap<u64, Entry>>,
42    last_index: AtomicU64,
43    file_handle: Mutex<File>,
44
45    index_positions: Mutex<BTreeMap<u64, u64>>, // Maps index to file position
46}
47
48/// File-based metadata store implementation
49#[derive(Debug)]
50pub struct FileMetaStore {
51    data_dir: PathBuf,
52    data: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
53}
54
55/// Unified file-based storage engine
56#[derive(Debug)]
57pub struct FileStorageEngine {
58    log_store: Arc<FileLogStore>,
59    meta_store: Arc<FileMetaStore>,
60    data_dir: PathBuf,
61}
62
63impl StorageEngine for FileStorageEngine {
64    type LogStore = FileLogStore;
65    type MetaStore = FileMetaStore;
66
67    #[inline]
68    fn log_store(&self) -> Arc<Self::LogStore> {
69        self.log_store.clone()
70    }
71
72    #[inline]
73    fn meta_store(&self) -> Arc<Self::MetaStore> {
74        self.meta_store.clone()
75    }
76}
77
78impl FileStorageEngine {
79    /// Creates new file-based storage engine
80    pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
81        // Ensure data directory exists
82        fs::create_dir_all(&data_dir)?;
83
84        // Create log store
85        let log_store = Arc::new(FileLogStore::new(data_dir.join("logs"))?);
86
87        // Create meta store
88        let meta_store = Arc::new(FileMetaStore::new(data_dir.join("meta"))?);
89
90        Ok(Self {
91            log_store,
92            meta_store,
93            data_dir,
94        })
95    }
96
97    /// Get the data directory path
98    pub fn data_dir(&self) -> &Path {
99        &self.data_dir
100    }
101}
102
103impl FileLogStore {
104    /// Creates new file-based log store
105    pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
106        // Ensure directory exists
107        fs::create_dir_all(&data_dir)?;
108
109        // Open or create the log file
110        let log_file_path = data_dir.join("log.data");
111        let file = OpenOptions::new()
112            .read(true)
113            .write(true)
114            .create(true)
115            .truncate(false)
116            .open(log_file_path)?;
117
118        // Load existing entries from file
119        let entries = Mutex::new(BTreeMap::new());
120        let last_index = AtomicU64::new(0);
121        let index_positions = Mutex::new(BTreeMap::new());
122
123        let store = Self {
124            data_dir,
125            entries,
126            last_index,
127            file_handle: Mutex::new(file),
128            index_positions,
129        };
130
131        // Load existing data
132        store.load_from_file()?;
133
134        Ok(store)
135    }
136
137    /// Load entries from file
138    fn load_from_file(&self) -> Result<(), Error> {
139        let mut file = self.file_handle.lock().unwrap();
140        file.seek(SeekFrom::Start(0))?;
141
142        let mut entries = self.entries.lock().unwrap();
143        let mut index_positions = self.index_positions.lock().unwrap();
144        let mut buffer = Vec::new();
145        file.read_to_end(&mut buffer)?;
146
147        let mut pos = 0;
148        let mut max_index = 0;
149
150        while pos < buffer.len() {
151            // Record the position of this entry
152            let entry_position = pos as u64;
153
154            // Read entry length
155            if pos + 8 > buffer.len() {
156                break;
157            }
158
159            let len_bytes = &buffer[pos..pos + 8];
160            let entry_len = u64::from_be_bytes([
161                len_bytes[0],
162                len_bytes[1],
163                len_bytes[2],
164                len_bytes[3],
165                len_bytes[4],
166                len_bytes[5],
167                len_bytes[6],
168                len_bytes[7],
169            ]) as usize;
170
171            pos += 8;
172
173            // Read entry data
174            if pos + entry_len > buffer.len() {
175                break;
176            }
177
178            let entry_data = &buffer[pos..pos + entry_len];
179            match Entry::decode(entry_data) {
180                Ok(entry) => {
181                    entries.insert(entry.index, entry.clone());
182                    index_positions.insert(entry.index, entry_position);
183                    max_index = max_index.max(entry.index);
184                }
185                Err(e) => {
186                    eprintln!("Failed to decode entry: {e}",);
187                    // Continue with next entry
188                }
189            }
190
191            pos += entry_len;
192        }
193
194        self.last_index.store(max_index, Ordering::SeqCst);
195        Ok(())
196    }
197
198    /// Append entry to file
199    fn append_to_file(
200        &self,
201        entry: &Entry,
202    ) -> Result<(), Error> {
203        let mut file = self.file_handle.lock().unwrap();
204
205        // Get current file position
206        let position = file.seek(SeekFrom::End(0))?;
207
208        let encoded = entry.encode_to_vec();
209
210        // Write entry length (8 bytes)
211        let len = encoded.len() as u64;
212        file.write_all(&len.to_be_bytes())?;
213
214        // Write entry data
215        file.write_all(&encoded)?;
216
217        file.flush()?;
218
219        // Update position index
220        let mut index_positions = self.index_positions.lock().unwrap();
221        index_positions.insert(entry.index, position);
222
223        Ok(())
224    }
225
226    #[cfg(test)]
227    pub fn reset_sync(&self) -> Result<(), Error> {
228        {
229            let mut file = self.file_handle.lock().unwrap();
230            file.set_len(0)?;
231            file.seek(SeekFrom::Start(0))?;
232            file.flush()?;
233        }
234        {
235            let mut store = self.entries.lock().unwrap();
236            store.clear();
237        }
238        {
239            let mut index_positions = self.index_positions.lock().unwrap();
240            index_positions.clear();
241        }
242        self.last_index.store(0, Ordering::SeqCst);
243        Ok(())
244    }
245}
246
247#[async_trait]
248impl LogStore for FileLogStore {
249    async fn persist_entries(
250        &self,
251        entries: Vec<Entry>,
252    ) -> Result<(), Error> {
253        let mut max_index = 0;
254
255        for entry in entries {
256            // Append to file
257            self.append_to_file(&entry)?;
258
259            // Add to memory
260            {
261                let mut store = self.entries.lock().unwrap();
262                store.insert(entry.index, entry.clone());
263            }
264
265            max_index = max_index.max(entry.index);
266        }
267
268        if max_index > 0 {
269            self.last_index.store(max_index, Ordering::SeqCst);
270        }
271
272        Ok(())
273    }
274
275    async fn entry(
276        &self,
277        index: u64,
278    ) -> Result<Option<Entry>, Error> {
279        let store = self.entries.lock().unwrap();
280        Ok(store.get(&index).cloned())
281    }
282
283    fn get_entries(
284        &self,
285        range: RangeInclusive<u64>,
286    ) -> Result<Vec<Entry>, Error> {
287        let store = self.entries.lock().unwrap();
288        let mut result = Vec::new();
289
290        for (_, entry) in store.range(range) {
291            result.push(entry.clone());
292        }
293
294        Ok(result)
295    }
296
297    async fn purge(
298        &self,
299        cutoff_index: LogId,
300    ) -> Result<(), Error> {
301        let indexes_to_remove: Vec<u64> = {
302            let index_positions = self.index_positions.lock().unwrap();
303            index_positions.range(0..=cutoff_index.index).map(|(k, _)| *k).collect()
304        };
305
306        // Remove from memory
307        {
308            let mut entries = self.entries.lock().unwrap();
309            for index in &indexes_to_remove {
310                entries.remove(index);
311            }
312        }
313
314        // Remove from position index
315        {
316            let mut index_positions = self.index_positions.lock().unwrap();
317            for index in &indexes_to_remove {
318                index_positions.remove(index);
319            }
320        }
321
322        // Note: We don't actually remove from file for performance
323        // The file will be compacted during the next snapshot
324        Ok(())
325    }
326
327    async fn truncate(
328        &self,
329        from_index: u64,
330    ) -> Result<(), Error> {
331        let indexes_to_remove: Vec<u64> = {
332            let index_positions = self.index_positions.lock().unwrap();
333            index_positions.range(from_index..).map(|(k, _)| *k).collect()
334        };
335
336        // Remove from memory
337        {
338            let mut entries = self.entries.lock().unwrap();
339            for index in &indexes_to_remove {
340                entries.remove(index);
341            }
342        }
343
344        // Remove from position index
345        {
346            let mut index_positions = self.index_positions.lock().unwrap();
347            for index in &indexes_to_remove {
348                index_positions.remove(index);
349            }
350        }
351
352        // Truncate the file
353        if let Some(last_keep_position) = self
354            .index_positions
355            .lock()
356            .unwrap()
357            .range(..from_index)
358            .next_back()
359            .map(|(_, pos)| *pos)
360        {
361            let mut file = self.file_handle.lock().unwrap();
362
363            // Find the end of the last entry to keep
364            file.seek(SeekFrom::Start(last_keep_position))?;
365            let mut len_buffer = [0u8; 8];
366            file.read_exact(&mut len_buffer)?;
367            let entry_len = u64::from_be_bytes(len_buffer);
368
369            // Calculate the position after this entry
370            let truncate_pos = last_keep_position + 8 + entry_len;
371
372            // Truncate the file
373            file.set_len(truncate_pos)?;
374        } else {
375            // No entries to keep, truncate entire file
376            let file = self.file_handle.lock().unwrap();
377            file.set_len(0)?;
378        }
379
380        // Update last index
381        if let Some(new_last_index) = self.index_positions.lock().unwrap().keys().next_back() {
382            self.last_index.store(*new_last_index, Ordering::SeqCst);
383        } else {
384            self.last_index.store(0, Ordering::SeqCst);
385        }
386
387        Ok(())
388    }
389
390    fn flush(&self) -> Result<(), Error> {
391        let mut file = self.file_handle.lock().unwrap();
392        file.flush()?;
393        file.sync_all()?;
394        Ok(())
395    }
396
397    async fn flush_async(&self) -> Result<(), Error> {
398        self.flush()
399    }
400
401    async fn reset(&self) -> Result<(), Error> {
402        {
403            let mut file = self.file_handle.lock().unwrap();
404            file.set_len(0)?;
405            file.seek(SeekFrom::Start(0))?;
406            file.flush()?;
407        }
408        {
409            let mut store = self.entries.lock().unwrap();
410            store.clear();
411        }
412        {
413            let mut index_positions = self.index_positions.lock().unwrap();
414            index_positions.clear();
415        }
416        self.last_index.store(0, Ordering::SeqCst);
417        Ok(())
418    }
419
420    fn last_index(&self) -> u64 {
421        self.last_index.load(Ordering::SeqCst)
422    }
423}
424
425impl FileMetaStore {
426    /// Creates new file-based metadata store
427    pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
428        // Ensure directory exists
429        fs::create_dir_all(&data_dir)?;
430
431        let store = Self {
432            data_dir,
433            data: Mutex::new(HashMap::new()),
434        };
435
436        // Load existing data
437        store.load_from_file()?;
438
439        Ok(store)
440    }
441
442    /// Load metadata from file
443    fn load_from_file(&self) -> Result<(), Error> {
444        let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
445
446        if hard_state_path.exists() {
447            let mut file = File::open(hard_state_path)?;
448            let mut buffer = Vec::new();
449            file.read_to_end(&mut buffer)?;
450
451            match bincode::deserialize::<HardState>(&buffer) {
452                Ok(_hard_state) => {
453                    let mut data = self.data.lock().unwrap();
454                    data.insert(HARD_STATE_KEY.to_vec(), buffer);
455                    info!("Loaded hard state from file");
456                }
457                Err(e) => {
458                    eprintln!("Failed to decode hard state: {e}",);
459                }
460            }
461        }
462
463        Ok(())
464    }
465
466    /// Save metadata to file
467    fn save_to_file(
468        &self,
469        key: &[u8],
470        value: &[u8],
471    ) -> Result<(), Error> {
472        if key == HARD_STATE_KEY {
473            let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
474            let mut file = File::create(hard_state_path)?;
475            file.write_all(value)?;
476            file.flush()?;
477        }
478
479        Ok(())
480    }
481}
482
483#[async_trait]
484impl MetaStore for FileMetaStore {
485    fn save_hard_state(
486        &self,
487        state: &HardState,
488    ) -> Result<(), Error> {
489        let serialized = bincode::serialize(state).map_err(StorageError::BincodeError)?;
490
491        let mut data = self.data.lock().unwrap();
492        data.insert(HARD_STATE_KEY.to_vec(), serialized.clone());
493
494        self.save_to_file(HARD_STATE_KEY, &serialized)?;
495
496        info!("Persisted hard state to file");
497        Ok(())
498    }
499
500    fn load_hard_state(&self) -> Result<Option<HardState>, Error> {
501        let data = self.data.lock().unwrap();
502
503        match data.get(HARD_STATE_KEY) {
504            Some(bytes) => {
505                let state = bincode::deserialize(bytes).map_err(StorageError::BincodeError)?;
506                info!("Loaded hard state from memory");
507                Ok(Some(state))
508            }
509            None => {
510                info!("No hard state found");
511                Ok(None)
512            }
513        }
514    }
515
516    fn flush(&self) -> Result<(), Error> {
517        // No-op for file-based store as we flush on each write
518        Ok(())
519    }
520
521    async fn flush_async(&self) -> Result<(), Error> {
522        self.flush()
523    }
524}