Skip to main content

sochdb_storage/
manifest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! LSM MANIFEST File Implementation (Gap #13 Fix)
19//!
20//! Provides atomic tracking of SSTable state for crash-safe recovery.
21//! Inspired by LevelDB/RocksDB MANIFEST file design.
22//!
23//! The MANIFEST tracks:
24//! - Current version number
25//! - List of active SSTables per level
26//! - Compaction state
27//!
28//! On startup, the LSM tree reads the MANIFEST to determine which SSTables
29//! are valid, rather than blindly scanning the filesystem.
30
31use serde::{Deserialize, Serialize};
32use std::collections::HashSet;
33use std::fs::{self, File, OpenOptions};
34use std::io::{BufRead, BufReader, BufWriter, Write};
35use std::path::{Path, PathBuf};
36use sochdb_core::Result;
37use tracing::{debug, info, warn};
38
39/// Name of the MANIFEST file
40#[allow(dead_code)]
41const MANIFEST_FILENAME: &str = "MANIFEST";
42
43/// Name of the CURRENT file (pointer to active MANIFEST)
44const CURRENT_FILENAME: &str = "CURRENT";
45
46/// A version edit represents a change to the LSM state
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct VersionEdit {
49    /// Version number (monotonically increasing)
50    pub version: u64,
51    /// SSTables added in this edit
52    pub added_files: Vec<FileMetadata>,
53    /// SSTables removed in this edit
54    pub removed_files: Vec<FileMetadata>,
55    /// Log sequence number
56    pub log_number: Option<u64>,
57    /// Next file ID
58    pub next_file_id: Option<u64>,
59}
60
61/// Metadata about an SSTable file
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct FileMetadata {
64    /// Level number (0-6)
65    pub level: usize,
66    /// File ID (used for filename generation)
67    pub file_id: u64,
68    /// File size in bytes
69    pub file_size: u64,
70    /// Smallest key in this SSTable (hex-encoded edge_id)
71    pub smallest_key: Option<String>,
72    /// Largest key in this SSTable (hex-encoded edge_id)
73    pub largest_key: Option<String>,
74}
75
76/// Current LSM state derived from MANIFEST
77#[derive(Debug, Default)]
78pub struct LsmState {
79    /// Current version number
80    pub version: u64,
81    /// Active files per level (level -> set of file_ids)
82    pub active_files: Vec<HashSet<u64>>,
83    /// Log sequence number
84    pub log_number: u64,
85    /// Next file ID to use
86    pub next_file_id: u64,
87}
88
89impl LsmState {
90    /// Create a new empty state
91    pub fn new() -> Self {
92        Self {
93            version: 0,
94            active_files: vec![HashSet::new(); 7], // 7 levels
95            log_number: 0,
96            next_file_id: 1,
97        }
98    }
99
100    /// Apply a version edit to the state
101    pub fn apply(&mut self, edit: &VersionEdit) {
102        self.version = edit.version;
103
104        // Add new files
105        for file in &edit.added_files {
106            if file.level < self.active_files.len() {
107                self.active_files[file.level].insert(file.file_id);
108            }
109        }
110
111        // Remove deleted files
112        for file in &edit.removed_files {
113            if file.level < self.active_files.len() {
114                self.active_files[file.level].remove(&file.file_id);
115            }
116        }
117
118        // Update metadata
119        if let Some(log_num) = edit.log_number {
120            self.log_number = log_num;
121        }
122        if let Some(next_id) = edit.next_file_id {
123            self.next_file_id = next_id;
124        }
125    }
126
127    /// Check if a file is active
128    pub fn is_file_active(&self, level: usize, file_id: u64) -> bool {
129        self.active_files
130            .get(level)
131            .map(|files| files.contains(&file_id))
132            .unwrap_or(false)
133    }
134}
135
136/// MANIFEST file manager
137pub struct Manifest {
138    /// Directory containing the MANIFEST
139    data_dir: PathBuf,
140    /// Current MANIFEST file number
141    manifest_number: u64,
142    /// Writer for appending edits
143    writer: Option<BufWriter<File>>,
144    /// Current LSM state
145    state: LsmState,
146}
147
148impl Manifest {
149    /// Open or create a MANIFEST in the given directory
150    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
151        let data_dir = data_dir.as_ref().to_path_buf();
152        fs::create_dir_all(&data_dir)?;
153
154        // Check for CURRENT file
155        let current_path = data_dir.join(CURRENT_FILENAME);
156        let (manifest_number, state) = if current_path.exists() {
157            // Read existing MANIFEST
158            let current_content = fs::read_to_string(&current_path)?;
159            let manifest_name = current_content.trim();
160            let manifest_number = parse_manifest_number(manifest_name).unwrap_or(1);
161            let manifest_path = data_dir.join(manifest_name);
162
163            if manifest_path.exists() {
164                let state = Self::read_manifest(&manifest_path)?;
165                info!(
166                    "Loaded MANIFEST-{:06} with version {}",
167                    manifest_number, state.version
168                );
169                (manifest_number, state)
170            } else {
171                warn!("MANIFEST file {} not found, starting fresh", manifest_name);
172                (manifest_number + 1, LsmState::new())
173            }
174        } else {
175            // No CURRENT file, start fresh
176            debug!("No CURRENT file found, creating new MANIFEST");
177            (1, LsmState::new())
178        };
179
180        let mut manifest = Self {
181            data_dir,
182            manifest_number,
183            writer: None,
184            state,
185        };
186
187        // Open writer for appending
188        manifest.open_writer()?;
189
190        Ok(manifest)
191    }
192
193    /// Read all edits from a MANIFEST file
194    fn read_manifest(path: &Path) -> Result<LsmState> {
195        let file = File::open(path)?;
196        let reader = BufReader::new(file);
197        let mut state = LsmState::new();
198
199        for line in reader.lines() {
200            let line = line?;
201            if line.is_empty() {
202                continue;
203            }
204
205            match serde_json::from_str::<VersionEdit>(&line) {
206                Ok(edit) => state.apply(&edit),
207                Err(e) => {
208                    warn!("Failed to parse MANIFEST line: {}", e);
209                    // Continue reading - partial corruption shouldn't fail entire load
210                }
211            }
212        }
213
214        Ok(state)
215    }
216
217    /// Open writer for the current MANIFEST
218    fn open_writer(&mut self) -> Result<()> {
219        let manifest_name = format!("MANIFEST-{:06}", self.manifest_number);
220        let manifest_path = self.data_dir.join(&manifest_name);
221
222        let file = OpenOptions::new()
223            .create(true)
224            .append(true)
225            .open(&manifest_path)?;
226
227        self.writer = Some(BufWriter::new(file));
228
229        // Update CURRENT file atomically
230        let current_path = self.data_dir.join(CURRENT_FILENAME);
231        let temp_path = self.data_dir.join("CURRENT.tmp");
232        fs::write(&temp_path, &manifest_name)?;
233        fs::rename(&temp_path, &current_path)?;
234
235        Ok(())
236    }
237
238    /// Log a version edit to the MANIFEST
239    pub fn log_edit(&mut self, edit: &VersionEdit) -> Result<()> {
240        // Apply to in-memory state
241        self.state.apply(edit);
242
243        // Write to MANIFEST file
244        if let Some(ref mut writer) = self.writer {
245            let line = serde_json::to_string(edit)
246                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
247            writeln!(writer, "{}", line)?;
248            writer.flush()?;
249            debug!("Logged version edit: version={}", edit.version);
250        }
251
252        Ok(())
253    }
254
255    /// Record adding new SSTables
256    pub fn add_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
257        let version = self.state.version + 1;
258        let edit = VersionEdit {
259            version,
260            added_files: files,
261            removed_files: vec![],
262            log_number: None,
263            next_file_id: None,
264        };
265        self.log_edit(&edit)?;
266        Ok(version)
267    }
268
269    /// Record removing SSTables (after compaction)
270    pub fn remove_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
271        let version = self.state.version + 1;
272        let edit = VersionEdit {
273            version,
274            added_files: vec![],
275            removed_files: files,
276            log_number: None,
277            next_file_id: None,
278        };
279        self.log_edit(&edit)?;
280        Ok(version)
281    }
282
283    /// Record a compaction: add new files, remove old files
284    pub fn log_compaction(
285        &mut self,
286        added: Vec<FileMetadata>,
287        removed: Vec<FileMetadata>,
288    ) -> Result<u64> {
289        let version = self.state.version + 1;
290        let edit = VersionEdit {
291            version,
292            added_files: added,
293            removed_files: removed,
294            log_number: None,
295            next_file_id: None,
296        };
297        self.log_edit(&edit)?;
298        Ok(version)
299    }
300
301    /// Update next file ID
302    pub fn set_next_file_id(&mut self, next_id: u64) -> Result<()> {
303        let version = self.state.version + 1;
304        let edit = VersionEdit {
305            version,
306            added_files: vec![],
307            removed_files: vec![],
308            log_number: None,
309            next_file_id: Some(next_id),
310        };
311        self.log_edit(&edit)
312    }
313
314    /// Get the current LSM state
315    pub fn state(&self) -> &LsmState {
316        &self.state
317    }
318
319    /// Get the current version number
320    pub fn version(&self) -> u64 {
321        self.state.version
322    }
323
324    /// Get active files for a level
325    pub fn active_files(&self, level: usize) -> Option<&HashSet<u64>> {
326        self.state.active_files.get(level)
327    }
328
329    /// Sync to disk
330    pub fn sync(&mut self) -> Result<()> {
331        if let Some(ref mut writer) = self.writer {
332            writer.flush()?;
333            writer.get_ref().sync_all()?;
334        }
335        Ok(())
336    }
337
338    /// Create a new MANIFEST file (for periodic compaction of the MANIFEST itself)
339    pub fn rotate(&mut self) -> Result<()> {
340        // Sync current MANIFEST
341        self.sync()?;
342
343        // Create new MANIFEST
344        self.manifest_number += 1;
345        self.open_writer()?;
346
347        // Write a snapshot of current state
348        let snapshot = VersionEdit {
349            version: self.state.version,
350            added_files: self.collect_all_files(),
351            removed_files: vec![],
352            log_number: Some(self.state.log_number),
353            next_file_id: Some(self.state.next_file_id),
354        };
355        self.log_edit(&snapshot)?;
356
357        Ok(())
358    }
359
360    /// Collect all active files as FileMetadata
361    fn collect_all_files(&self) -> Vec<FileMetadata> {
362        let mut files = Vec::new();
363        for (level, file_ids) in self.state.active_files.iter().enumerate() {
364            for &file_id in file_ids {
365                files.push(FileMetadata {
366                    level,
367                    file_id,
368                    file_size: 0, // Size not tracked in state
369                    smallest_key: None,
370                    largest_key: None,
371                });
372            }
373        }
374        files
375    }
376}
377
378/// Parse manifest number from filename like "MANIFEST-000001"
379fn parse_manifest_number(name: &str) -> Option<u64> {
380    name.strip_prefix("MANIFEST-").and_then(|n| n.parse().ok())
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use tempfile::TempDir;
387
388    #[test]
389    fn test_manifest_basic() {
390        let temp_dir = TempDir::new().unwrap();
391        let mut manifest = Manifest::open(temp_dir.path()).unwrap();
392
393        // Add some files
394        let files = vec![
395            FileMetadata {
396                level: 0,
397                file_id: 1,
398                file_size: 1000,
399                smallest_key: None,
400                largest_key: None,
401            },
402            FileMetadata {
403                level: 0,
404                file_id: 2,
405                file_size: 2000,
406                smallest_key: None,
407                largest_key: None,
408            },
409        ];
410        manifest.add_files(files).unwrap();
411
412        assert_eq!(manifest.version(), 1);
413        assert!(manifest.state.is_file_active(0, 1));
414        assert!(manifest.state.is_file_active(0, 2));
415        assert!(!manifest.state.is_file_active(0, 3));
416    }
417
418    #[test]
419    fn test_manifest_recovery() {
420        let temp_dir = TempDir::new().unwrap();
421
422        // Create and populate manifest
423        {
424            let mut manifest = Manifest::open(temp_dir.path()).unwrap();
425            manifest
426                .add_files(vec![FileMetadata {
427                    level: 0,
428                    file_id: 1,
429                    file_size: 1000,
430                    smallest_key: None,
431                    largest_key: None,
432                }])
433                .unwrap();
434            manifest.sync().unwrap();
435        }
436
437        // Reopen and verify state
438        {
439            let manifest = Manifest::open(temp_dir.path()).unwrap();
440            assert!(manifest.state.is_file_active(0, 1));
441        }
442    }
443
444    #[test]
445    fn test_compaction_tracking() {
446        let temp_dir = TempDir::new().unwrap();
447        let mut manifest = Manifest::open(temp_dir.path()).unwrap();
448
449        // Add L0 files
450        manifest
451            .add_files(vec![
452                FileMetadata {
453                    level: 0,
454                    file_id: 1,
455                    file_size: 1000,
456                    smallest_key: None,
457                    largest_key: None,
458                },
459                FileMetadata {
460                    level: 0,
461                    file_id: 2,
462                    file_size: 1000,
463                    smallest_key: None,
464                    largest_key: None,
465                },
466            ])
467            .unwrap();
468
469        // Simulate compaction: merge L0 -> L1
470        manifest
471            .log_compaction(
472                vec![FileMetadata {
473                    level: 1,
474                    file_id: 3,
475                    file_size: 2000,
476                    smallest_key: None,
477                    largest_key: None,
478                }],
479                vec![
480                    FileMetadata {
481                        level: 0,
482                        file_id: 1,
483                        file_size: 1000,
484                        smallest_key: None,
485                        largest_key: None,
486                    },
487                    FileMetadata {
488                        level: 0,
489                        file_id: 2,
490                        file_size: 1000,
491                        smallest_key: None,
492                        largest_key: None,
493                    },
494                ],
495            )
496            .unwrap();
497
498        // Verify state
499        assert!(!manifest.state.is_file_active(0, 1));
500        assert!(!manifest.state.is_file_active(0, 2));
501        assert!(manifest.state.is_file_active(1, 3));
502    }
503}