1use crate::sstable::SSTableMeta;
8use rustlite_core::{Error, Result};
9use serde::{Deserialize, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufReader, BufWriter, Read, Write};
12use std::path::{Path, PathBuf};
13
14const MANIFEST_FILE: &str = "MANIFEST";
16const MANIFEST_BACKUP: &str = "MANIFEST.bak";
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub enum ManifestRecord {
22 AddSSTable {
24 level: u32,
25 path: String,
26 min_key: Vec<u8>,
27 max_key: Vec<u8>,
28 entry_count: u64,
29 file_size: u64,
30 sequence: u64,
31 },
32 RemoveSSTable { path: String },
34 UpdateSequence { sequence: u64 },
36 CompactionDone {
38 level: u32,
39 inputs: Vec<String>,
40 outputs: Vec<String>,
41 },
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ManifestSSTable {
47 pub level: u32,
49 pub path: String,
51 pub min_key: Vec<u8>,
53 pub max_key: Vec<u8>,
55 pub entry_count: u64,
57 pub file_size: u64,
59 pub sequence: u64,
61}
62
63impl ManifestSSTable {
64 pub fn to_meta(&self) -> SSTableMeta {
66 SSTableMeta {
67 path: PathBuf::from(&self.path),
68 min_key: self.min_key.clone(),
69 max_key: self.max_key.clone(),
70 entry_count: self.entry_count,
71 file_size: self.file_size,
72 level: self.level,
73 sequence: self.sequence,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ManifestSnapshot {
81 pub sequence: u64,
83 pub sstables: Vec<ManifestSSTable>,
85 pub version: u32,
87}
88
89impl Default for ManifestSnapshot {
90 fn default() -> Self {
91 Self {
92 sequence: 0,
93 sstables: Vec::new(),
94 version: 1,
95 }
96 }
97}
98
99pub struct Manifest {
101 dir: PathBuf,
103 snapshot: ManifestSnapshot,
105 log_writer: Option<BufWriter<File>>,
107 log_entries: usize,
109 log_threshold: usize,
111}
112
113impl Manifest {
114 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
116 let dir = dir.as_ref().to_path_buf();
117 fs::create_dir_all(&dir)?;
118
119 let manifest_path = dir.join(MANIFEST_FILE);
120
121 let snapshot = if manifest_path.exists() {
122 Self::load_snapshot(&manifest_path)?
123 } else {
124 ManifestSnapshot::default()
125 };
126
127 let log_writer = Some(BufWriter::new(
129 OpenOptions::new()
130 .create(true)
131 .append(true)
132 .open(&manifest_path)?,
133 ));
134
135 Ok(Self {
136 dir,
137 snapshot,
138 log_writer,
139 log_entries: 0,
140 log_threshold: 100, })
142 }
143
144 fn load_snapshot(path: &Path) -> Result<ManifestSnapshot> {
146 let file = File::open(path)?;
147 let mut reader = BufReader::new(file);
148
149 let mut contents = Vec::new();
150 reader.read_to_end(&mut contents)?;
151
152 if contents.is_empty() {
153 return Ok(ManifestSnapshot::default());
154 }
155
156 match bincode::deserialize::<ManifestSnapshot>(&contents) {
158 Ok(snapshot) => Ok(snapshot),
159 Err(_) => {
160 Ok(ManifestSnapshot::default())
162 }
163 }
164 }
165
166 fn write_record(&mut self, record: &ManifestRecord) -> Result<()> {
168 if let Some(ref mut writer) = self.log_writer {
169 let encoded =
170 bincode::serialize(record).map_err(|e| Error::Serialization(e.to_string()))?;
171 let len = encoded.len() as u32;
172
173 writer.write_all(&len.to_le_bytes())?;
174 writer.write_all(&encoded)?;
175 writer.flush()?;
176
177 self.log_entries += 1;
178
179 if self.log_entries >= self.log_threshold {
181 self.rewrite()?;
182 }
183 }
184
185 Ok(())
186 }
187
188 pub fn rewrite(&mut self) -> Result<()> {
190 self.log_writer = None;
192
193 let manifest_path = self.dir.join(MANIFEST_FILE);
194 let backup_path = self.dir.join(MANIFEST_BACKUP);
195
196 if manifest_path.exists() {
198 fs::copy(&manifest_path, &backup_path)?;
199 }
200
201 let encoded =
203 bincode::serialize(&self.snapshot).map_err(|e| Error::Serialization(e.to_string()))?;
204
205 fs::write(&manifest_path, &encoded)?;
206
207 let _ = fs::remove_file(&backup_path);
209
210 self.log_writer = Some(BufWriter::new(
212 OpenOptions::new()
213 .create(true)
214 .write(true)
215 .truncate(true)
216 .open(&manifest_path)?,
217 ));
218
219 if let Some(ref mut writer) = self.log_writer {
221 writer.write_all(&encoded)?;
222 writer.flush()?;
223 }
224
225 self.log_entries = 0;
226
227 Ok(())
228 }
229
230 pub fn add_sstable(&mut self, meta: &SSTableMeta) -> Result<()> {
232 let sstable = ManifestSSTable {
233 level: meta.level,
234 path: meta.path.to_string_lossy().to_string(),
235 min_key: meta.min_key.clone(),
236 max_key: meta.max_key.clone(),
237 entry_count: meta.entry_count,
238 file_size: meta.file_size,
239 sequence: meta.sequence,
240 };
241
242 self.snapshot.sstables.push(sstable);
243
244 self.write_record(&ManifestRecord::AddSSTable {
245 level: meta.level,
246 path: meta.path.to_string_lossy().to_string(),
247 min_key: meta.min_key.clone(),
248 max_key: meta.max_key.clone(),
249 entry_count: meta.entry_count,
250 file_size: meta.file_size,
251 sequence: meta.sequence,
252 })?;
253
254 Ok(())
255 }
256
257 pub fn remove_sstable(&mut self, path: &Path) -> Result<()> {
259 let path_str = path.to_string_lossy().to_string();
260
261 self.snapshot.sstables.retain(|s| s.path != path_str);
262
263 self.write_record(&ManifestRecord::RemoveSSTable { path: path_str })?;
264
265 Ok(())
266 }
267
268 pub fn update_sequence(&mut self, sequence: u64) -> Result<()> {
270 self.snapshot.sequence = sequence;
271
272 self.write_record(&ManifestRecord::UpdateSequence { sequence })?;
273
274 Ok(())
275 }
276
277 pub fn sequence(&self) -> u64 {
279 self.snapshot.sequence
280 }
281
282 pub fn sstables_at_level(&self, level: u32) -> Vec<&ManifestSSTable> {
284 self.snapshot
285 .sstables
286 .iter()
287 .filter(|s| s.level == level)
288 .collect()
289 }
290
291 pub fn all_sstables(&self) -> &[ManifestSSTable] {
293 &self.snapshot.sstables
294 }
295
296 pub fn level_counts(&self) -> Vec<usize> {
298 let max_level = self
299 .snapshot
300 .sstables
301 .iter()
302 .map(|s| s.level)
303 .max()
304 .unwrap_or(0);
305
306 let mut counts = vec![0usize; (max_level + 1) as usize];
307 for sst in &self.snapshot.sstables {
308 counts[sst.level as usize] += 1;
309 }
310
311 counts
312 }
313
314 pub fn total_size(&self) -> u64 {
316 self.snapshot.sstables.iter().map(|s| s.file_size).sum()
317 }
318
319 pub fn record_compaction(
321 &mut self,
322 level: u32,
323 inputs: Vec<PathBuf>,
324 outputs: Vec<SSTableMeta>,
325 ) -> Result<()> {
326 for input in &inputs {
328 self.snapshot
329 .sstables
330 .retain(|s| s.path != input.to_string_lossy());
331 }
332
333 for output in &outputs {
335 let sstable = ManifestSSTable {
336 level: output.level,
337 path: output.path.to_string_lossy().to_string(),
338 min_key: output.min_key.clone(),
339 max_key: output.max_key.clone(),
340 entry_count: output.entry_count,
341 file_size: output.file_size,
342 sequence: output.sequence,
343 };
344 self.snapshot.sstables.push(sstable);
345 }
346
347 self.write_record(&ManifestRecord::CompactionDone {
349 level,
350 inputs: inputs
351 .iter()
352 .map(|p| p.to_string_lossy().to_string())
353 .collect(),
354 outputs: outputs
355 .iter()
356 .map(|p| p.path.to_string_lossy().to_string())
357 .collect(),
358 })?;
359
360 Ok(())
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use tempfile::tempdir;
368
369 #[test]
370 fn test_manifest_create() {
371 let dir = tempdir().unwrap();
372 let manifest = Manifest::open(dir.path()).unwrap();
373
374 assert_eq!(manifest.sequence(), 0);
375 assert!(manifest.all_sstables().is_empty());
376 }
377
378 #[test]
379 fn test_manifest_add_sstable() {
380 let dir = tempdir().unwrap();
381 let mut manifest = Manifest::open(dir.path()).unwrap();
382
383 let meta = SSTableMeta {
384 path: PathBuf::from("test.sst"),
385 min_key: b"a".to_vec(),
386 max_key: b"z".to_vec(),
387 entry_count: 100,
388 file_size: 1024,
389 level: 0,
390 sequence: 1,
391 };
392
393 manifest.add_sstable(&meta).unwrap();
394
395 assert_eq!(manifest.all_sstables().len(), 1);
396 assert_eq!(manifest.sstables_at_level(0).len(), 1);
397 assert_eq!(manifest.sstables_at_level(1).len(), 0);
398 }
399
400 #[test]
401 fn test_manifest_remove_sstable() {
402 let dir = tempdir().unwrap();
403 let mut manifest = Manifest::open(dir.path()).unwrap();
404
405 let meta = SSTableMeta {
406 path: PathBuf::from("test.sst"),
407 min_key: b"a".to_vec(),
408 max_key: b"z".to_vec(),
409 entry_count: 100,
410 file_size: 1024,
411 level: 0,
412 sequence: 1,
413 };
414
415 manifest.add_sstable(&meta).unwrap();
416 assert_eq!(manifest.all_sstables().len(), 1);
417
418 manifest.remove_sstable(Path::new("test.sst")).unwrap();
419 assert!(manifest.all_sstables().is_empty());
420 }
421
422 #[test]
423 fn test_manifest_sequence() {
424 let dir = tempdir().unwrap();
425 let mut manifest = Manifest::open(dir.path()).unwrap();
426
427 manifest.update_sequence(100).unwrap();
428 assert_eq!(manifest.sequence(), 100);
429 }
430
431 #[test]
432 fn test_manifest_level_counts() {
433 let dir = tempdir().unwrap();
434 let mut manifest = Manifest::open(dir.path()).unwrap();
435
436 for i in 0..3 {
437 let meta = SSTableMeta {
438 path: PathBuf::from(format!("l0_{}.sst", i)),
439 min_key: vec![],
440 max_key: vec![],
441 entry_count: 0,
442 file_size: 0,
443 level: 0,
444 sequence: 0,
445 };
446 manifest.add_sstable(&meta).unwrap();
447 }
448
449 for i in 0..2 {
450 let meta = SSTableMeta {
451 path: PathBuf::from(format!("l1_{}.sst", i)),
452 min_key: vec![],
453 max_key: vec![],
454 entry_count: 0,
455 file_size: 0,
456 level: 1,
457 sequence: 0,
458 };
459 manifest.add_sstable(&meta).unwrap();
460 }
461
462 let counts = manifest.level_counts();
463 assert_eq!(counts[0], 3);
464 assert_eq!(counts[1], 2);
465 }
466}