sochdb_storage/
manifest.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! LSM MANIFEST File Implementation (Gap #13 Fix)
16//!
17//! Provides atomic tracking of SSTable state for crash-safe recovery.
18//! Inspired by LevelDB/RocksDB MANIFEST file design.
19//!
20//! The MANIFEST tracks:
21//! - Current version number
22//! - List of active SSTables per level
23//! - Compaction state
24//!
25//! On startup, the LSM tree reads the MANIFEST to determine which SSTables
26//! are valid, rather than blindly scanning the filesystem.
27
28use serde::{Deserialize, Serialize};
29use std::collections::HashSet;
30use std::fs::{self, File, OpenOptions};
31use std::io::{BufRead, BufReader, BufWriter, Write};
32use std::path::{Path, PathBuf};
33use sochdb_core::Result;
34use tracing::{debug, info, warn};
35
36/// Name of the MANIFEST file
37#[allow(dead_code)]
38const MANIFEST_FILENAME: &str = "MANIFEST";
39
40/// Name of the CURRENT file (pointer to active MANIFEST)
41const CURRENT_FILENAME: &str = "CURRENT";
42
43/// A version edit represents a change to the LSM state
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct VersionEdit {
46    /// Version number (monotonically increasing)
47    pub version: u64,
48    /// SSTables added in this edit
49    pub added_files: Vec<FileMetadata>,
50    /// SSTables removed in this edit
51    pub removed_files: Vec<FileMetadata>,
52    /// Log sequence number
53    pub log_number: Option<u64>,
54    /// Next file ID
55    pub next_file_id: Option<u64>,
56}
57
58/// Metadata about an SSTable file
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
60pub struct FileMetadata {
61    /// Level number (0-6)
62    pub level: usize,
63    /// File ID (used for filename generation)
64    pub file_id: u64,
65    /// File size in bytes
66    pub file_size: u64,
67    /// Smallest key in this SSTable (hex-encoded edge_id)
68    pub smallest_key: Option<String>,
69    /// Largest key in this SSTable (hex-encoded edge_id)
70    pub largest_key: Option<String>,
71}
72
73/// Current LSM state derived from MANIFEST
74#[derive(Debug, Default)]
75pub struct LsmState {
76    /// Current version number
77    pub version: u64,
78    /// Active files per level (level -> set of file_ids)
79    pub active_files: Vec<HashSet<u64>>,
80    /// Log sequence number
81    pub log_number: u64,
82    /// Next file ID to use
83    pub next_file_id: u64,
84}
85
86impl LsmState {
87    /// Create a new empty state
88    pub fn new() -> Self {
89        Self {
90            version: 0,
91            active_files: vec![HashSet::new(); 7], // 7 levels
92            log_number: 0,
93            next_file_id: 1,
94        }
95    }
96
97    /// Apply a version edit to the state
98    pub fn apply(&mut self, edit: &VersionEdit) {
99        self.version = edit.version;
100
101        // Add new files
102        for file in &edit.added_files {
103            if file.level < self.active_files.len() {
104                self.active_files[file.level].insert(file.file_id);
105            }
106        }
107
108        // Remove deleted files
109        for file in &edit.removed_files {
110            if file.level < self.active_files.len() {
111                self.active_files[file.level].remove(&file.file_id);
112            }
113        }
114
115        // Update metadata
116        if let Some(log_num) = edit.log_number {
117            self.log_number = log_num;
118        }
119        if let Some(next_id) = edit.next_file_id {
120            self.next_file_id = next_id;
121        }
122    }
123
124    /// Check if a file is active
125    pub fn is_file_active(&self, level: usize, file_id: u64) -> bool {
126        self.active_files
127            .get(level)
128            .map(|files| files.contains(&file_id))
129            .unwrap_or(false)
130    }
131}
132
133/// MANIFEST file manager
134pub struct Manifest {
135    /// Directory containing the MANIFEST
136    data_dir: PathBuf,
137    /// Current MANIFEST file number
138    manifest_number: u64,
139    /// Writer for appending edits
140    writer: Option<BufWriter<File>>,
141    /// Current LSM state
142    state: LsmState,
143}
144
145impl Manifest {
146    /// Open or create a MANIFEST in the given directory
147    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
148        let data_dir = data_dir.as_ref().to_path_buf();
149        fs::create_dir_all(&data_dir)?;
150
151        // Check for CURRENT file
152        let current_path = data_dir.join(CURRENT_FILENAME);
153        let (manifest_number, state) = if current_path.exists() {
154            // Read existing MANIFEST
155            let current_content = fs::read_to_string(&current_path)?;
156            let manifest_name = current_content.trim();
157            let manifest_number = parse_manifest_number(manifest_name).unwrap_or(1);
158            let manifest_path = data_dir.join(manifest_name);
159
160            if manifest_path.exists() {
161                let state = Self::read_manifest(&manifest_path)?;
162                info!(
163                    "Loaded MANIFEST-{:06} with version {}",
164                    manifest_number, state.version
165                );
166                (manifest_number, state)
167            } else {
168                warn!("MANIFEST file {} not found, starting fresh", manifest_name);
169                (manifest_number + 1, LsmState::new())
170            }
171        } else {
172            // No CURRENT file, start fresh
173            debug!("No CURRENT file found, creating new MANIFEST");
174            (1, LsmState::new())
175        };
176
177        let mut manifest = Self {
178            data_dir,
179            manifest_number,
180            writer: None,
181            state,
182        };
183
184        // Open writer for appending
185        manifest.open_writer()?;
186
187        Ok(manifest)
188    }
189
190    /// Read all edits from a MANIFEST file
191    fn read_manifest(path: &Path) -> Result<LsmState> {
192        let file = File::open(path)?;
193        let reader = BufReader::new(file);
194        let mut state = LsmState::new();
195
196        for line in reader.lines() {
197            let line = line?;
198            if line.is_empty() {
199                continue;
200            }
201
202            match serde_json::from_str::<VersionEdit>(&line) {
203                Ok(edit) => state.apply(&edit),
204                Err(e) => {
205                    warn!("Failed to parse MANIFEST line: {}", e);
206                    // Continue reading - partial corruption shouldn't fail entire load
207                }
208            }
209        }
210
211        Ok(state)
212    }
213
214    /// Open writer for the current MANIFEST
215    fn open_writer(&mut self) -> Result<()> {
216        let manifest_name = format!("MANIFEST-{:06}", self.manifest_number);
217        let manifest_path = self.data_dir.join(&manifest_name);
218
219        let file = OpenOptions::new()
220            .create(true)
221            .append(true)
222            .open(&manifest_path)?;
223
224        self.writer = Some(BufWriter::new(file));
225
226        // Update CURRENT file atomically
227        let current_path = self.data_dir.join(CURRENT_FILENAME);
228        let temp_path = self.data_dir.join("CURRENT.tmp");
229        fs::write(&temp_path, &manifest_name)?;
230        fs::rename(&temp_path, &current_path)?;
231
232        Ok(())
233    }
234
235    /// Log a version edit to the MANIFEST
236    pub fn log_edit(&mut self, edit: &VersionEdit) -> Result<()> {
237        // Apply to in-memory state
238        self.state.apply(edit);
239
240        // Write to MANIFEST file
241        if let Some(ref mut writer) = self.writer {
242            let line = serde_json::to_string(edit)
243                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
244            writeln!(writer, "{}", line)?;
245            writer.flush()?;
246            debug!("Logged version edit: version={}", edit.version);
247        }
248
249        Ok(())
250    }
251
252    /// Record adding new SSTables
253    pub fn add_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
254        let version = self.state.version + 1;
255        let edit = VersionEdit {
256            version,
257            added_files: files,
258            removed_files: vec![],
259            log_number: None,
260            next_file_id: None,
261        };
262        self.log_edit(&edit)?;
263        Ok(version)
264    }
265
266    /// Record removing SSTables (after compaction)
267    pub fn remove_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
268        let version = self.state.version + 1;
269        let edit = VersionEdit {
270            version,
271            added_files: vec![],
272            removed_files: files,
273            log_number: None,
274            next_file_id: None,
275        };
276        self.log_edit(&edit)?;
277        Ok(version)
278    }
279
280    /// Record a compaction: add new files, remove old files
281    pub fn log_compaction(
282        &mut self,
283        added: Vec<FileMetadata>,
284        removed: Vec<FileMetadata>,
285    ) -> Result<u64> {
286        let version = self.state.version + 1;
287        let edit = VersionEdit {
288            version,
289            added_files: added,
290            removed_files: removed,
291            log_number: None,
292            next_file_id: None,
293        };
294        self.log_edit(&edit)?;
295        Ok(version)
296    }
297
298    /// Update next file ID
299    pub fn set_next_file_id(&mut self, next_id: u64) -> Result<()> {
300        let version = self.state.version + 1;
301        let edit = VersionEdit {
302            version,
303            added_files: vec![],
304            removed_files: vec![],
305            log_number: None,
306            next_file_id: Some(next_id),
307        };
308        self.log_edit(&edit)
309    }
310
311    /// Get the current LSM state
312    pub fn state(&self) -> &LsmState {
313        &self.state
314    }
315
316    /// Get the current version number
317    pub fn version(&self) -> u64 {
318        self.state.version
319    }
320
321    /// Get active files for a level
322    pub fn active_files(&self, level: usize) -> Option<&HashSet<u64>> {
323        self.state.active_files.get(level)
324    }
325
326    /// Sync to disk
327    pub fn sync(&mut self) -> Result<()> {
328        if let Some(ref mut writer) = self.writer {
329            writer.flush()?;
330            writer.get_ref().sync_all()?;
331        }
332        Ok(())
333    }
334
335    /// Create a new MANIFEST file (for periodic compaction of the MANIFEST itself)
336    pub fn rotate(&mut self) -> Result<()> {
337        // Sync current MANIFEST
338        self.sync()?;
339
340        // Create new MANIFEST
341        self.manifest_number += 1;
342        self.open_writer()?;
343
344        // Write a snapshot of current state
345        let snapshot = VersionEdit {
346            version: self.state.version,
347            added_files: self.collect_all_files(),
348            removed_files: vec![],
349            log_number: Some(self.state.log_number),
350            next_file_id: Some(self.state.next_file_id),
351        };
352        self.log_edit(&snapshot)?;
353
354        Ok(())
355    }
356
357    /// Collect all active files as FileMetadata
358    fn collect_all_files(&self) -> Vec<FileMetadata> {
359        let mut files = Vec::new();
360        for (level, file_ids) in self.state.active_files.iter().enumerate() {
361            for &file_id in file_ids {
362                files.push(FileMetadata {
363                    level,
364                    file_id,
365                    file_size: 0, // Size not tracked in state
366                    smallest_key: None,
367                    largest_key: None,
368                });
369            }
370        }
371        files
372    }
373}
374
375/// Parse manifest number from filename like "MANIFEST-000001"
376fn parse_manifest_number(name: &str) -> Option<u64> {
377    name.strip_prefix("MANIFEST-").and_then(|n| n.parse().ok())
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use tempfile::TempDir;
384
385    #[test]
386    fn test_manifest_basic() {
387        let temp_dir = TempDir::new().unwrap();
388        let mut manifest = Manifest::open(temp_dir.path()).unwrap();
389
390        // Add some files
391        let files = vec![
392            FileMetadata {
393                level: 0,
394                file_id: 1,
395                file_size: 1000,
396                smallest_key: None,
397                largest_key: None,
398            },
399            FileMetadata {
400                level: 0,
401                file_id: 2,
402                file_size: 2000,
403                smallest_key: None,
404                largest_key: None,
405            },
406        ];
407        manifest.add_files(files).unwrap();
408
409        assert_eq!(manifest.version(), 1);
410        assert!(manifest.state.is_file_active(0, 1));
411        assert!(manifest.state.is_file_active(0, 2));
412        assert!(!manifest.state.is_file_active(0, 3));
413    }
414
415    #[test]
416    fn test_manifest_recovery() {
417        let temp_dir = TempDir::new().unwrap();
418
419        // Create and populate manifest
420        {
421            let mut manifest = Manifest::open(temp_dir.path()).unwrap();
422            manifest
423                .add_files(vec![FileMetadata {
424                    level: 0,
425                    file_id: 1,
426                    file_size: 1000,
427                    smallest_key: None,
428                    largest_key: None,
429                }])
430                .unwrap();
431            manifest.sync().unwrap();
432        }
433
434        // Reopen and verify state
435        {
436            let manifest = Manifest::open(temp_dir.path()).unwrap();
437            assert!(manifest.state.is_file_active(0, 1));
438        }
439    }
440
441    #[test]
442    fn test_compaction_tracking() {
443        let temp_dir = TempDir::new().unwrap();
444        let mut manifest = Manifest::open(temp_dir.path()).unwrap();
445
446        // Add L0 files
447        manifest
448            .add_files(vec![
449                FileMetadata {
450                    level: 0,
451                    file_id: 1,
452                    file_size: 1000,
453                    smallest_key: None,
454                    largest_key: None,
455                },
456                FileMetadata {
457                    level: 0,
458                    file_id: 2,
459                    file_size: 1000,
460                    smallest_key: None,
461                    largest_key: None,
462                },
463            ])
464            .unwrap();
465
466        // Simulate compaction: merge L0 -> L1
467        manifest
468            .log_compaction(
469                vec![FileMetadata {
470                    level: 1,
471                    file_id: 3,
472                    file_size: 2000,
473                    smallest_key: None,
474                    largest_key: None,
475                }],
476                vec![
477                    FileMetadata {
478                        level: 0,
479                        file_id: 1,
480                        file_size: 1000,
481                        smallest_key: None,
482                        largest_key: None,
483                    },
484                    FileMetadata {
485                        level: 0,
486                        file_id: 2,
487                        file_size: 1000,
488                        smallest_key: None,
489                        largest_key: None,
490                    },
491                ],
492            )
493            .unwrap();
494
495        // Verify state
496        assert!(!manifest.state.is_file_active(0, 1));
497        assert!(!manifest.state.is_file_active(0, 2));
498        assert!(manifest.state.is_file_active(1, 3));
499    }
500}