1use serde::{Deserialize, Serialize};
32use std::collections::HashSet;
33use std::fs::{self, File, OpenOptions};
34use std::io::{BufRead, BufReader, BufWriter, Write};
35use std::path::{Path, PathBuf};
36use sochdb_core::Result;
37use tracing::{debug, info, warn};
38
39#[allow(dead_code)]
41const MANIFEST_FILENAME: &str = "MANIFEST";
42
43const CURRENT_FILENAME: &str = "CURRENT";
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct VersionEdit {
49 pub version: u64,
51 pub added_files: Vec<FileMetadata>,
53 pub removed_files: Vec<FileMetadata>,
55 pub log_number: Option<u64>,
57 pub next_file_id: Option<u64>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct FileMetadata {
64 pub level: usize,
66 pub file_id: u64,
68 pub file_size: u64,
70 pub smallest_key: Option<String>,
72 pub largest_key: Option<String>,
74}
75
76#[derive(Debug, Default)]
78pub struct LsmState {
79 pub version: u64,
81 pub active_files: Vec<HashSet<u64>>,
83 pub log_number: u64,
85 pub next_file_id: u64,
87}
88
89impl LsmState {
90 pub fn new() -> Self {
92 Self {
93 version: 0,
94 active_files: vec![HashSet::new(); 7], log_number: 0,
96 next_file_id: 1,
97 }
98 }
99
100 pub fn apply(&mut self, edit: &VersionEdit) {
102 self.version = edit.version;
103
104 for file in &edit.added_files {
106 if file.level < self.active_files.len() {
107 self.active_files[file.level].insert(file.file_id);
108 }
109 }
110
111 for file in &edit.removed_files {
113 if file.level < self.active_files.len() {
114 self.active_files[file.level].remove(&file.file_id);
115 }
116 }
117
118 if let Some(log_num) = edit.log_number {
120 self.log_number = log_num;
121 }
122 if let Some(next_id) = edit.next_file_id {
123 self.next_file_id = next_id;
124 }
125 }
126
127 pub fn is_file_active(&self, level: usize, file_id: u64) -> bool {
129 self.active_files
130 .get(level)
131 .map(|files| files.contains(&file_id))
132 .unwrap_or(false)
133 }
134}
135
136pub struct Manifest {
138 data_dir: PathBuf,
140 manifest_number: u64,
142 writer: Option<BufWriter<File>>,
144 state: LsmState,
146}
147
148impl Manifest {
149 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
151 let data_dir = data_dir.as_ref().to_path_buf();
152 fs::create_dir_all(&data_dir)?;
153
154 let current_path = data_dir.join(CURRENT_FILENAME);
156 let (manifest_number, state) = if current_path.exists() {
157 let current_content = fs::read_to_string(¤t_path)?;
159 let manifest_name = current_content.trim();
160 let manifest_number = parse_manifest_number(manifest_name).unwrap_or(1);
161 let manifest_path = data_dir.join(manifest_name);
162
163 if manifest_path.exists() {
164 let state = Self::read_manifest(&manifest_path)?;
165 info!(
166 "Loaded MANIFEST-{:06} with version {}",
167 manifest_number, state.version
168 );
169 (manifest_number, state)
170 } else {
171 warn!("MANIFEST file {} not found, starting fresh", manifest_name);
172 (manifest_number + 1, LsmState::new())
173 }
174 } else {
175 debug!("No CURRENT file found, creating new MANIFEST");
177 (1, LsmState::new())
178 };
179
180 let mut manifest = Self {
181 data_dir,
182 manifest_number,
183 writer: None,
184 state,
185 };
186
187 manifest.open_writer()?;
189
190 Ok(manifest)
191 }
192
193 fn read_manifest(path: &Path) -> Result<LsmState> {
195 let file = File::open(path)?;
196 let reader = BufReader::new(file);
197 let mut state = LsmState::new();
198
199 for line in reader.lines() {
200 let line = line?;
201 if line.is_empty() {
202 continue;
203 }
204
205 match serde_json::from_str::<VersionEdit>(&line) {
206 Ok(edit) => state.apply(&edit),
207 Err(e) => {
208 warn!("Failed to parse MANIFEST line: {}", e);
209 }
211 }
212 }
213
214 Ok(state)
215 }
216
217 fn open_writer(&mut self) -> Result<()> {
219 let manifest_name = format!("MANIFEST-{:06}", self.manifest_number);
220 let manifest_path = self.data_dir.join(&manifest_name);
221
222 let file = OpenOptions::new()
223 .create(true)
224 .append(true)
225 .open(&manifest_path)?;
226
227 self.writer = Some(BufWriter::new(file));
228
229 let current_path = self.data_dir.join(CURRENT_FILENAME);
231 let temp_path = self.data_dir.join("CURRENT.tmp");
232 fs::write(&temp_path, &manifest_name)?;
233 fs::rename(&temp_path, ¤t_path)?;
234
235 Ok(())
236 }
237
238 pub fn log_edit(&mut self, edit: &VersionEdit) -> Result<()> {
240 self.state.apply(edit);
242
243 if let Some(ref mut writer) = self.writer {
245 let line = serde_json::to_string(edit)
246 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
247 writeln!(writer, "{}", line)?;
248 writer.flush()?;
249 debug!("Logged version edit: version={}", edit.version);
250 }
251
252 Ok(())
253 }
254
255 pub fn add_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
257 let version = self.state.version + 1;
258 let edit = VersionEdit {
259 version,
260 added_files: files,
261 removed_files: vec![],
262 log_number: None,
263 next_file_id: None,
264 };
265 self.log_edit(&edit)?;
266 Ok(version)
267 }
268
269 pub fn remove_files(&mut self, files: Vec<FileMetadata>) -> Result<u64> {
271 let version = self.state.version + 1;
272 let edit = VersionEdit {
273 version,
274 added_files: vec![],
275 removed_files: files,
276 log_number: None,
277 next_file_id: None,
278 };
279 self.log_edit(&edit)?;
280 Ok(version)
281 }
282
283 pub fn log_compaction(
285 &mut self,
286 added: Vec<FileMetadata>,
287 removed: Vec<FileMetadata>,
288 ) -> Result<u64> {
289 let version = self.state.version + 1;
290 let edit = VersionEdit {
291 version,
292 added_files: added,
293 removed_files: removed,
294 log_number: None,
295 next_file_id: None,
296 };
297 self.log_edit(&edit)?;
298 Ok(version)
299 }
300
301 pub fn set_next_file_id(&mut self, next_id: u64) -> Result<()> {
303 let version = self.state.version + 1;
304 let edit = VersionEdit {
305 version,
306 added_files: vec![],
307 removed_files: vec![],
308 log_number: None,
309 next_file_id: Some(next_id),
310 };
311 self.log_edit(&edit)
312 }
313
314 pub fn state(&self) -> &LsmState {
316 &self.state
317 }
318
319 pub fn version(&self) -> u64 {
321 self.state.version
322 }
323
324 pub fn active_files(&self, level: usize) -> Option<&HashSet<u64>> {
326 self.state.active_files.get(level)
327 }
328
329 pub fn sync(&mut self) -> Result<()> {
331 if let Some(ref mut writer) = self.writer {
332 writer.flush()?;
333 writer.get_ref().sync_all()?;
334 }
335 Ok(())
336 }
337
338 pub fn rotate(&mut self) -> Result<()> {
340 self.sync()?;
342
343 self.manifest_number += 1;
345 self.open_writer()?;
346
347 let snapshot = VersionEdit {
349 version: self.state.version,
350 added_files: self.collect_all_files(),
351 removed_files: vec![],
352 log_number: Some(self.state.log_number),
353 next_file_id: Some(self.state.next_file_id),
354 };
355 self.log_edit(&snapshot)?;
356
357 Ok(())
358 }
359
360 fn collect_all_files(&self) -> Vec<FileMetadata> {
362 let mut files = Vec::new();
363 for (level, file_ids) in self.state.active_files.iter().enumerate() {
364 for &file_id in file_ids {
365 files.push(FileMetadata {
366 level,
367 file_id,
368 file_size: 0, smallest_key: None,
370 largest_key: None,
371 });
372 }
373 }
374 files
375 }
376}
377
378fn parse_manifest_number(name: &str) -> Option<u64> {
380 name.strip_prefix("MANIFEST-").and_then(|n| n.parse().ok())
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use tempfile::TempDir;
387
388 #[test]
389 fn test_manifest_basic() {
390 let temp_dir = TempDir::new().unwrap();
391 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
392
393 let files = vec![
395 FileMetadata {
396 level: 0,
397 file_id: 1,
398 file_size: 1000,
399 smallest_key: None,
400 largest_key: None,
401 },
402 FileMetadata {
403 level: 0,
404 file_id: 2,
405 file_size: 2000,
406 smallest_key: None,
407 largest_key: None,
408 },
409 ];
410 manifest.add_files(files).unwrap();
411
412 assert_eq!(manifest.version(), 1);
413 assert!(manifest.state.is_file_active(0, 1));
414 assert!(manifest.state.is_file_active(0, 2));
415 assert!(!manifest.state.is_file_active(0, 3));
416 }
417
418 #[test]
419 fn test_manifest_recovery() {
420 let temp_dir = TempDir::new().unwrap();
421
422 {
424 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
425 manifest
426 .add_files(vec![FileMetadata {
427 level: 0,
428 file_id: 1,
429 file_size: 1000,
430 smallest_key: None,
431 largest_key: None,
432 }])
433 .unwrap();
434 manifest.sync().unwrap();
435 }
436
437 {
439 let manifest = Manifest::open(temp_dir.path()).unwrap();
440 assert!(manifest.state.is_file_active(0, 1));
441 }
442 }
443
444 #[test]
445 fn test_compaction_tracking() {
446 let temp_dir = TempDir::new().unwrap();
447 let mut manifest = Manifest::open(temp_dir.path()).unwrap();
448
449 manifest
451 .add_files(vec![
452 FileMetadata {
453 level: 0,
454 file_id: 1,
455 file_size: 1000,
456 smallest_key: None,
457 largest_key: None,
458 },
459 FileMetadata {
460 level: 0,
461 file_id: 2,
462 file_size: 1000,
463 smallest_key: None,
464 largest_key: None,
465 },
466 ])
467 .unwrap();
468
469 manifest
471 .log_compaction(
472 vec![FileMetadata {
473 level: 1,
474 file_id: 3,
475 file_size: 2000,
476 smallest_key: None,
477 largest_key: None,
478 }],
479 vec![
480 FileMetadata {
481 level: 0,
482 file_id: 1,
483 file_size: 1000,
484 smallest_key: None,
485 largest_key: None,
486 },
487 FileMetadata {
488 level: 0,
489 file_id: 2,
490 file_size: 1000,
491 smallest_key: None,
492 largest_key: None,
493 },
494 ],
495 )
496 .unwrap();
497
498 assert!(!manifest.state.is_file_active(0, 1));
500 assert!(!manifest.state.is_file_active(0, 2));
501 assert!(manifest.state.is_file_active(1, 3));
502 }
503}