1use serde::{Deserialize, Serialize};
29use std::collections::HashSet;
30use std::fs::{self, File, OpenOptions};
31use std::io::{BufRead, BufReader, BufWriter, Write};
32use std::path::{Path, PathBuf};
33use sochdb_core::Result;
34use tracing::{debug, info, warn};
35
36#[allow(dead_code)]
38const MANIFEST_FILENAME: &str = "MANIFEST";
39
40const CURRENT_FILENAME: &str = "CURRENT";
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct VersionEdit {
46 pub version: u64,
48 pub added_files: Vec<FileMetadata>,
50 pub removed_files: Vec<FileMetadata>,
52 pub log_number: Option<u64>,
54 pub next_file_id: Option<u64>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
60pub struct FileMetadata {
61 pub level: usize,
63 pub file_id: u64,
65 pub file_size: u64,
67 pub smallest_key: Option<String>,
69 pub largest_key: Option<String>,
71}
72
73#[derive(Debug, Default)]
75pub struct LsmState {
76 pub version: u64,
78 pub active_files: Vec<HashSet<u64>>,
80 pub log_number: u64,
82 pub next_file_id: u64,
84}
85
86impl LsmState {
87 pub fn new() -> Self {
89 Self {
90 version: 0,
91 active_files: vec![HashSet::new(); 7], log_number: 0,
93 next_file_id: 1,
94 }
95 }
96
97 pub fn apply(&mut self, edit: &VersionEdit) {
99 self.version = edit.version;
100
101 for file in &edit.added_files {
103 if file.level < self.active_files.len() {
104 self.active_files[file.level].insert(file.file_id);
105 }
106 }
107
108 for file in &edit.removed_files {
110 if file.level < self.active_files.len() {
111 self.active_files[file.level].remove(&file.file_id);
112 }
113 }
114
115 if let Some(log_num) = edit.log_number {
117 self.log_number = log_num;
118 }
119 if let Some(next_id) = edit.next_file_id {
120 self.next_file_id = next_id;
121 }
122 }
123
124 pub fn is_file_active(&self, level: usize, file_id: u64) -> bool {
126 self.active_files
127 .get(level)
128 .map(|files| files.contains(&file_id))
129 .unwrap_or(false)
130 }
131}
132
133pub struct Manifest {
135 data_dir: PathBuf,
137 manifest_number: u64,
139 writer: Option<BufWriter<File>>,
141 state: LsmState,
143}
144
145impl Manifest {
146 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
148 let data_dir = data_dir.as_ref().to_path_buf();
149 fs::create_dir_all(&data_dir)?;
150
151 let current_path = data_dir.join(CURRENT_FILENAME);
153 let (manifest_number, state) = if current_path.exists() {
154 let current_content = fs::read_to_string(¤t_path)?;
156 let manifest_name = current_content.trim();
157 let manifest_number = parse_manifest_number(manifest_name).unwrap_or(1);
158 let manifest_path = data_dir.join(manifest_name);
159
160 if manifest_path.exists() {
161 let state = Self::read_manifest(&manifest_path)?;
162 info!(
163 "Loaded MANIFEST-{:06} with version {}",
164 manifest_number, state.version
165 );
166 (manifest_number, state)
167 } else {
168 warn!("MANIFEST file {} not found, starting fresh", manifest_name);
169 (manifest_number + 1, LsmState::new())
170 }
171 } else {
172 debug!("No CURRENT file found, creating new MANIFEST");
174 (1, LsmState::new())
175 };
176
177 let mut manifest = Self {
178 data_dir,
179 manifest_number,
180 writer: None,
181 state,
182 };
183
184 manifest.open_writer()?;
186
187 Ok(manifest)
188 }
189
190 fn read_manifest(path: &Path) -> Result<LsmState> {
192 let file = File::open(path)?;
193 let reader = BufReader::new(file);
194 let mut state = LsmState::new();
195
196 for line in reader.lines() {
197 let line = line?;
198 if line.is_empty() {
199 continue;
200 }
201
202 match serde_json::from_str::<VersionEdit>(&line) {
203 Ok(edit) => state.apply(&edit),
204 Err(e) => {
205 warn!("Failed to parse MANIFEST line: {}", e);
206 }
208 }
209 }
210
211 Ok(state)
212 }
213
214 fn open_writer(&mut self) -> Result<()> {
216 let manifest_name = format!("MANIFEST-{:06}", self.manifest_number);
217 let manifest_path = self.data_dir.join(&manifest_name);
218
219 let file = OpenOptions::new()
220 .create(true)
221 .append(true)
222 .open(&manifest_path)?;
223
224 self.writer = Some(BufWriter::new(file));
225
226 let current_path = self.data_dir.join(CURRENT_FILENAME);
228 let temp_path = self.data_dir.join("CURRENT.tmp");
229 fs::write(&temp_path, &manifest_name)?;
230 fs::rename(&temp_path, ¤t_path)?;
231
232 Ok(())
233 }
234
235 pub fn log_edit(&mut self, edit: &VersionEdit) -> Result<()> {
237 self.state.apply(edit);
239
240 if let Some(ref mut writer) = self.writer {
242 let line = serde_json::to_string(edit)
243 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
244 writeln!(writer, "{}", line)?;
245 writer.flush()?;
246 debug!("Logged version edit: version={}", edit.version);
247 }
248
249 Ok(())
250 }
251
252 pub fn add_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
254 let version = self.state.version + 1;
255 let edit = VersionEdit {
256 version,
257 added_files: files,
258 removed_files: vec![],
259 log_number: None,
260 next_file_id: None,
261 };
262 self.log_edit(&edit)?;
263 Ok(version)
264 }
265
266 pub fn remove_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
268 let version = self.state.version + 1;
269 let edit = VersionEdit {
270 version,
271 added_files: vec![],
272 removed_files: files,
273 log_number: None,
274 next_file_id: None,
275 };
276 self.log_edit(&edit)?;
277 Ok(version)
278 }
279
280 pub fn log_compaction(
282 &mut self,
283 added: Vec<FileMetadata>,
284 removed: Vec<FileMetadata>,
285 ) -> Result<u64> {
286 let version = self.state.version + 1;
287 let edit = VersionEdit {
288 version,
289 added_files: added,
290 removed_files: removed,
291 log_number: None,
292 next_file_id: None,
293 };
294 self.log_edit(&edit)?;
295 Ok(version)
296 }
297
298 pub fn set_next_file_id(&mut self, next_id: u64) -> Result<()> {
300 let version = self.state.version + 1;
301 let edit = VersionEdit {
302 version,
303 added_files: vec![],
304 removed_files: vec![],
305 log_number: None,
306 next_file_id: Some(next_id),
307 };
308 self.log_edit(&edit)
309 }
310
311 pub fn state(&self) -> &LsmState {
313 &self.state
314 }
315
316 pub fn version(&self) -> u64 {
318 self.state.version
319 }
320
321 pub fn active_files(&self, level: usize) -> Option<&HashSet<u64>> {
323 self.state.active_files.get(level)
324 }
325
326 pub fn sync(&mut self) -> Result<()> {
328 if let Some(ref mut writer) = self.writer {
329 writer.flush()?;
330 writer.get_ref().sync_all()?;
331 }
332 Ok(())
333 }
334
335 pub fn rotate(&mut self) -> Result<()> {
337 self.sync()?;
339
340 self.manifest_number += 1;
342 self.open_writer()?;
343
344 let snapshot = VersionEdit {
346 version: self.state.version,
347 added_files: self.collect_all_files(),
348 removed_files: vec![],
349 log_number: Some(self.state.log_number),
350 next_file_id: Some(self.state.next_file_id),
351 };
352 self.log_edit(&snapshot)?;
353
354 Ok(())
355 }
356
357 fn collect_all_files(&self) -> Vec<FileMetadata> {
359 let mut files = Vec::new();
360 for (level, file_ids) in self.state.active_files.iter().enumerate() {
361 for &file_id in file_ids {
362 files.push(FileMetadata {
363 level,
364 file_id,
365 file_size: 0, smallest_key: None,
367 largest_key: None,
368 });
369 }
370 }
371 files
372 }
373}
374
375fn parse_manifest_number(name: &str) -> Option<u64> {
377 name.strip_prefix("MANIFEST-").and_then(|n| n.parse().ok())
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use tempfile::TempDir;
384
385 #[test]
386 fn test_manifest_basic() {
387 let temp_dir = TempDir::new().unwrap();
388 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
389
390 let files = vec![
392 FileMetadata {
393 level: 0,
394 file_id: 1,
395 file_size: 1000,
396 smallest_key: None,
397 largest_key: None,
398 },
399 FileMetadata {
400 level: 0,
401 file_id: 2,
402 file_size: 2000,
403 smallest_key: None,
404 largest_key: None,
405 },
406 ];
407 manifest.add_files(files).unwrap();
408
409 assert_eq!(manifest.version(), 1);
410 assert!(manifest.state.is_file_active(0, 1));
411 assert!(manifest.state.is_file_active(0, 2));
412 assert!(!manifest.state.is_file_active(0, 3));
413 }
414
415 #[test]
416 fn test_manifest_recovery() {
417 let temp_dir = TempDir::new().unwrap();
418
419 {
421 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
422 manifest
423 .add_files(vec![FileMetadata {
424 level: 0,
425 file_id: 1,
426 file_size: 1000,
427 smallest_key: None,
428 largest_key: None,
429 }])
430 .unwrap();
431 manifest.sync().unwrap();
432 }
433
434 {
436 let manifest = Manifest::open(temp_dir.path()).unwrap();
437 assert!(manifest.state.is_file_active(0, 1));
438 }
439 }
440
441 #[test]
442 fn test_compaction_tracking() {
443 let temp_dir = TempDir::new().unwrap();
444 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
445
446 manifest
448 .add_files(vec![
449 FileMetadata {
450 level: 0,
451 file_id: 1,
452 file_size: 1000,
453 smallest_key: None,
454 largest_key: None,
455 },
456 FileMetadata {
457 level: 0,
458 file_id: 2,
459 file_size: 1000,
460 smallest_key: None,
461 largest_key: None,
462 },
463 ])
464 .unwrap();
465
466 manifest
468 .log_compaction(
469 vec![FileMetadata {
470 level: 1,
471 file_id: 3,
472 file_size: 2000,
473 smallest_key: None,
474 largest_key: None,
475 }],
476 vec![
477 FileMetadata {
478 level: 0,
479 file_id: 1,
480 file_size: 1000,
481 smallest_key: None,
482 largest_key: None,
483 },
484 FileMetadata {
485 level: 0,
486 file_id: 2,
487 file_size: 1000,
488 smallest_key: None,
489 largest_key: None,
490 },
491 ],
492 )
493 .unwrap();
494
495 assert!(!manifest.state.is_file_active(0, 1));
497 assert!(!manifest.state.is_file_active(0, 2));
498 assert!(manifest.state.is_file_active(1, 3));
499 }
500}