#![allow(dead_code)]
use std::collections::{BTreeMap, HashSet};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use crate::storage::segment::SegmentMetadata;
use crate::storage::{Manifest, Result, StorageError};
const MANIFEST_VERSION: u32 = 1;
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[serde(rename_all = "lowercase")]
pub enum SegmentTier {
Hot,
Warm,
Cold,
}
impl SegmentTier {
pub fn all() -> [SegmentTier; 3] {
[SegmentTier::Hot, SegmentTier::Warm, SegmentTier::Cold]
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompactionMetadata {
pub job_id: String,
pub inputs: Vec<String>,
pub output_id: String,
pub output_digest: String,
pub target_tier: SegmentTier,
pub started_at_ms: u128,
pub completed_at_ms: u128,
pub bytes_rewritten: u64,
pub retired: Vec<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct ManifestState {
version: u32,
tiers: BTreeMap<SegmentTier, Vec<SegmentMetadata>>,
compactions: Vec<CompactionMetadata>,
}
impl ManifestState {
fn new() -> Self {
let mut tiers = BTreeMap::new();
for tier in SegmentTier::all() {
tiers.insert(tier, Vec::new());
}
Self {
version: MANIFEST_VERSION,
tiers,
compactions: Vec::new(),
}
}
}
pub struct FileManifest {
path: PathBuf,
state: ManifestState,
}
impl FileManifest {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Self> {
let path = path.into();
if !path.exists() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let state = ManifestState::new();
let manifest = Self { path, state };
manifest.persist()?;
return Ok(manifest);
}
let file = File::open(&path)?;
let reader = BufReader::new(file);
let state: ManifestState = serde_json::from_reader(reader)?;
if state.version != MANIFEST_VERSION {
return Err(StorageError::InvalidFormat(
"unsupported manifest version".into(),
));
}
Ok(Self { path, state })
}
pub fn segments_in(&self, tier: &SegmentTier) -> &[SegmentMetadata] {
self.state
.tiers
.get(tier)
.map(Vec::as_slice)
.unwrap_or_default()
}
pub fn compaction_history(&self) -> &[CompactionMetadata] {
&self.state.compactions
}
fn remove_segments(&mut self, ids: &HashSet<String>) {
for tier_segments in self.state.tiers.values_mut() {
tier_segments.retain(|segment| !ids.contains(&segment.id));
}
}
}
impl Manifest for FileManifest {
fn register_segment(&mut self, tier: SegmentTier, metadata: SegmentMetadata) -> Result<()> {
let entry = self.state.tiers.entry(tier).or_insert_with(Vec::new);
entry.push(metadata);
Ok(())
}
fn record_compaction(
&mut self,
output_tier: SegmentTier,
output: SegmentMetadata,
retired: Vec<String>,
metadata: CompactionMetadata,
) -> Result<()> {
let retired_set: HashSet<String> = retired.into_iter().collect();
self.remove_segments(&retired_set);
self.register_segment(output_tier.clone(), output)?;
self.state.compactions.push(metadata);
Ok(())
}
fn persist(&self) -> Result<()> {
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}
let file = File::create(&self.path)?;
let writer = BufWriter::new(file);
serde_json::to_writer_pretty(writer, &self.state)?;
Ok(())
}
}
impl FileManifest {
pub fn path(&self) -> &Path {
&self.path
}
}