rustlite_storage/
manifest.rs

1//! Manifest - Metadata about current SSTables and database state
2//!
3//! The manifest tracks which SSTable files are currently active,
4//! their levels, and the current sequence number. It is used for
5//! recovery and compaction coordination.
6
7use crate::sstable::SSTableMeta;
8use rustlite_core::{Error, Result};
9use serde::{Deserialize, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufReader, BufWriter, Read, Write};
12use std::path::{Path, PathBuf};
13
14/// Manifest file name
15const MANIFEST_FILE: &str = "MANIFEST";
16/// Manifest backup file name
17const MANIFEST_BACKUP: &str = "MANIFEST.bak";
18
19/// Record type for manifest log entries
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub enum ManifestRecord {
22    /// Add a new SSTable
23    AddSSTable {
24        level: u32,
25        path: String,
26        min_key: Vec<u8>,
27        max_key: Vec<u8>,
28        entry_count: u64,
29        file_size: u64,
30        sequence: u64,
31    },
32    /// Remove an SSTable (after compaction)
33    RemoveSSTable { path: String },
34    /// Update the current sequence number
35    UpdateSequence { sequence: u64 },
36    /// Compaction completed
37    CompactionDone {
38        level: u32,
39        inputs: Vec<String>,
40        outputs: Vec<String>,
41    },
42}
43
44/// SSTable entry in the manifest
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ManifestSSTable {
47    /// Level in the LSM tree
48    pub level: u32,
49    /// Path to the SSTable file
50    pub path: String,
51    /// Minimum key
52    pub min_key: Vec<u8>,
53    /// Maximum key
54    pub max_key: Vec<u8>,
55    /// Number of entries
56    pub entry_count: u64,
57    /// File size in bytes
58    pub file_size: u64,
59    /// Sequence number when created
60    pub sequence: u64,
61}
62
63impl ManifestSSTable {
64    /// Convert to SSTableMeta
65    pub fn to_meta(&self) -> SSTableMeta {
66        SSTableMeta {
67            path: PathBuf::from(&self.path),
68            min_key: self.min_key.clone(),
69            max_key: self.max_key.clone(),
70            entry_count: self.entry_count,
71            file_size: self.file_size,
72            level: self.level,
73            sequence: self.sequence,
74        }
75    }
76}
77
78/// Manifest snapshot (complete state)
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ManifestSnapshot {
81    /// Current sequence number
82    pub sequence: u64,
83    /// All active SSTables
84    pub sstables: Vec<ManifestSSTable>,
85    /// Version number for compatibility
86    pub version: u32,
87}
88
89impl Default for ManifestSnapshot {
90    fn default() -> Self {
91        Self {
92            sequence: 0,
93            sstables: Vec::new(),
94            version: 1,
95        }
96    }
97}
98
99/// Manifest manager - tracks database state
100pub struct Manifest {
101    /// Database directory
102    dir: PathBuf,
103    /// Current snapshot
104    snapshot: ManifestSnapshot,
105    /// Log file for incremental updates
106    log_writer: Option<BufWriter<File>>,
107    /// Number of log entries since last snapshot
108    log_entries: usize,
109    /// Threshold for rewriting manifest
110    log_threshold: usize,
111}
112
113impl Manifest {
114    /// Open or create a manifest in the given directory
115    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
116        let dir = dir.as_ref().to_path_buf();
117        fs::create_dir_all(&dir)?;
118
119        let manifest_path = dir.join(MANIFEST_FILE);
120
121        let snapshot = if manifest_path.exists() {
122            Self::load_snapshot(&manifest_path)?
123        } else {
124            ManifestSnapshot::default()
125        };
126
127        // Open log file for appending
128        let log_writer = Some(BufWriter::new(
129            OpenOptions::new()
130                .create(true)
131                .append(true)
132                .open(&manifest_path)?,
133        ));
134
135        Ok(Self {
136            dir,
137            snapshot,
138            log_writer,
139            log_entries: 0,
140            log_threshold: 100, // Rewrite after 100 incremental entries
141        })
142    }
143
144    /// Load a manifest snapshot from disk
145    fn load_snapshot(path: &Path) -> Result<ManifestSnapshot> {
146        let file = File::open(path)?;
147        let mut reader = BufReader::new(file);
148
149        let mut contents = Vec::new();
150        reader.read_to_end(&mut contents)?;
151
152        if contents.is_empty() {
153            return Ok(ManifestSnapshot::default());
154        }
155
156        // Try to deserialize as snapshot
157        match bincode::deserialize::<ManifestSnapshot>(&contents) {
158            Ok(snapshot) => Ok(snapshot),
159            Err(_) => {
160                // Fall back to empty manifest
161                Ok(ManifestSnapshot::default())
162            }
163        }
164    }
165
166    /// Write a record to the manifest log
167    fn write_record(&mut self, record: &ManifestRecord) -> Result<()> {
168        if let Some(ref mut writer) = self.log_writer {
169            let encoded =
170                bincode::serialize(record).map_err(|e| Error::Serialization(e.to_string()))?;
171            let len = encoded.len() as u32;
172
173            writer.write_all(&len.to_le_bytes())?;
174            writer.write_all(&encoded)?;
175            writer.flush()?;
176
177            self.log_entries += 1;
178
179            // Rewrite manifest if threshold reached
180            if self.log_entries >= self.log_threshold {
181                self.rewrite()?;
182            }
183        }
184
185        Ok(())
186    }
187
188    /// Rewrite the manifest as a fresh snapshot
189    pub fn rewrite(&mut self) -> Result<()> {
190        // Close current log writer
191        self.log_writer = None;
192
193        let manifest_path = self.dir.join(MANIFEST_FILE);
194        let backup_path = self.dir.join(MANIFEST_BACKUP);
195
196        // Backup current manifest
197        if manifest_path.exists() {
198            fs::copy(&manifest_path, &backup_path)?;
199        }
200
201        // Write new snapshot
202        let encoded =
203            bincode::serialize(&self.snapshot).map_err(|e| Error::Serialization(e.to_string()))?;
204
205        fs::write(&manifest_path, &encoded)?;
206
207        // Remove backup
208        let _ = fs::remove_file(&backup_path);
209
210        // Reopen log writer
211        self.log_writer = Some(BufWriter::new(
212            OpenOptions::new()
213                .create(true)
214                .write(true)
215                .truncate(true)
216                .open(&manifest_path)?,
217        ));
218
219        // Write the snapshot to the new file
220        if let Some(ref mut writer) = self.log_writer {
221            writer.write_all(&encoded)?;
222            writer.flush()?;
223        }
224
225        self.log_entries = 0;
226
227        Ok(())
228    }
229
230    /// Add an SSTable to the manifest
231    pub fn add_sstable(&mut self, meta: &SSTableMeta) -> Result<()> {
232        let sstable = ManifestSSTable {
233            level: meta.level,
234            path: meta.path.to_string_lossy().to_string(),
235            min_key: meta.min_key.clone(),
236            max_key: meta.max_key.clone(),
237            entry_count: meta.entry_count,
238            file_size: meta.file_size,
239            sequence: meta.sequence,
240        };
241
242        self.snapshot.sstables.push(sstable);
243
244        self.write_record(&ManifestRecord::AddSSTable {
245            level: meta.level,
246            path: meta.path.to_string_lossy().to_string(),
247            min_key: meta.min_key.clone(),
248            max_key: meta.max_key.clone(),
249            entry_count: meta.entry_count,
250            file_size: meta.file_size,
251            sequence: meta.sequence,
252        })?;
253
254        Ok(())
255    }
256
257    /// Remove an SSTable from the manifest
258    pub fn remove_sstable(&mut self, path: &Path) -> Result<()> {
259        let path_str = path.to_string_lossy().to_string();
260
261        self.snapshot.sstables.retain(|s| s.path != path_str);
262
263        self.write_record(&ManifestRecord::RemoveSSTable { path: path_str })?;
264
265        Ok(())
266    }
267
268    /// Update the sequence number
269    pub fn update_sequence(&mut self, sequence: u64) -> Result<()> {
270        self.snapshot.sequence = sequence;
271
272        self.write_record(&ManifestRecord::UpdateSequence { sequence })?;
273
274        Ok(())
275    }
276
277    /// Get the current sequence number
278    pub fn sequence(&self) -> u64 {
279        self.snapshot.sequence
280    }
281
282    /// Get all SSTables at a given level
283    pub fn sstables_at_level(&self, level: u32) -> Vec<&ManifestSSTable> {
284        self.snapshot
285            .sstables
286            .iter()
287            .filter(|s| s.level == level)
288            .collect()
289    }
290
291    /// Get all SSTables
292    pub fn all_sstables(&self) -> &[ManifestSSTable] {
293        &self.snapshot.sstables
294    }
295
296    /// Get the number of SSTables at each level
297    pub fn level_counts(&self) -> Vec<usize> {
298        let max_level = self
299            .snapshot
300            .sstables
301            .iter()
302            .map(|s| s.level)
303            .max()
304            .unwrap_or(0);
305
306        let mut counts = vec![0usize; (max_level + 1) as usize];
307        for sst in &self.snapshot.sstables {
308            counts[sst.level as usize] += 1;
309        }
310
311        counts
312    }
313
314    /// Get total size of all SSTables
315    pub fn total_size(&self) -> u64 {
316        self.snapshot.sstables.iter().map(|s| s.file_size).sum()
317    }
318
319    /// Record a compaction completion
320    pub fn record_compaction(
321        &mut self,
322        level: u32,
323        inputs: Vec<PathBuf>,
324        outputs: Vec<SSTableMeta>,
325    ) -> Result<()> {
326        // Remove input files from manifest
327        for input in &inputs {
328            self.snapshot
329                .sstables
330                .retain(|s| s.path != input.to_string_lossy());
331        }
332
333        // Add output files to manifest
334        for output in &outputs {
335            let sstable = ManifestSSTable {
336                level: output.level,
337                path: output.path.to_string_lossy().to_string(),
338                min_key: output.min_key.clone(),
339                max_key: output.max_key.clone(),
340                entry_count: output.entry_count,
341                file_size: output.file_size,
342                sequence: output.sequence,
343            };
344            self.snapshot.sstables.push(sstable);
345        }
346
347        // Write record
348        self.write_record(&ManifestRecord::CompactionDone {
349            level,
350            inputs: inputs
351                .iter()
352                .map(|p| p.to_string_lossy().to_string())
353                .collect(),
354            outputs: outputs
355                .iter()
356                .map(|p| p.path.to_string_lossy().to_string())
357                .collect(),
358        })?;
359
360        Ok(())
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use tempfile::tempdir;
368
369    #[test]
370    fn test_manifest_create() {
371        let dir = tempdir().unwrap();
372        let manifest = Manifest::open(dir.path()).unwrap();
373
374        assert_eq!(manifest.sequence(), 0);
375        assert!(manifest.all_sstables().is_empty());
376    }
377
378    #[test]
379    fn test_manifest_add_sstable() {
380        let dir = tempdir().unwrap();
381        let mut manifest = Manifest::open(dir.path()).unwrap();
382
383        let meta = SSTableMeta {
384            path: PathBuf::from("test.sst"),
385            min_key: b"a".to_vec(),
386            max_key: b"z".to_vec(),
387            entry_count: 100,
388            file_size: 1024,
389            level: 0,
390            sequence: 1,
391        };
392
393        manifest.add_sstable(&meta).unwrap();
394
395        assert_eq!(manifest.all_sstables().len(), 1);
396        assert_eq!(manifest.sstables_at_level(0).len(), 1);
397        assert_eq!(manifest.sstables_at_level(1).len(), 0);
398    }
399
400    #[test]
401    fn test_manifest_remove_sstable() {
402        let dir = tempdir().unwrap();
403        let mut manifest = Manifest::open(dir.path()).unwrap();
404
405        let meta = SSTableMeta {
406            path: PathBuf::from("test.sst"),
407            min_key: b"a".to_vec(),
408            max_key: b"z".to_vec(),
409            entry_count: 100,
410            file_size: 1024,
411            level: 0,
412            sequence: 1,
413        };
414
415        manifest.add_sstable(&meta).unwrap();
416        assert_eq!(manifest.all_sstables().len(), 1);
417
418        manifest.remove_sstable(Path::new("test.sst")).unwrap();
419        assert!(manifest.all_sstables().is_empty());
420    }
421
422    #[test]
423    fn test_manifest_sequence() {
424        let dir = tempdir().unwrap();
425        let mut manifest = Manifest::open(dir.path()).unwrap();
426
427        manifest.update_sequence(100).unwrap();
428        assert_eq!(manifest.sequence(), 100);
429    }
430
431    #[test]
432    fn test_manifest_level_counts() {
433        let dir = tempdir().unwrap();
434        let mut manifest = Manifest::open(dir.path()).unwrap();
435
436        for i in 0..3 {
437            let meta = SSTableMeta {
438                path: PathBuf::from(format!("l0_{}.sst", i)),
439                min_key: vec![],
440                max_key: vec![],
441                entry_count: 0,
442                file_size: 0,
443                level: 0,
444                sequence: 0,
445            };
446            manifest.add_sstable(&meta).unwrap();
447        }
448
449        for i in 0..2 {
450            let meta = SSTableMeta {
451                path: PathBuf::from(format!("l1_{}.sst", i)),
452                min_key: vec![],
453                max_key: vec![],
454                entry_count: 0,
455                file_size: 0,
456                level: 1,
457                sequence: 0,
458            };
459            manifest.add_sstable(&meta).unwrap();
460        }
461
462        let counts = manifest.level_counts();
463        assert_eq!(counts[0], 3);
464        assert_eq!(counts[1], 2);
465    }
466}