pub mod stcs;
use std::sync::Arc;
use crate::engine::RangeTombstone;
pub use crate::engine::utils::MergeIterator;
use crate::engine::utils::Record;
use crate::sstable::{self, PointEntry, SSTable, SSTableError};
use crate::engine::{EngineConfig, SSTABLE_DIR};
use crate::manifest::{Manifest, ManifestError, ManifestSstEntry};
use tracing::{debug, info};
pub trait CompactionStrategy {
fn compact(
&self,
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionStrategyType {
Stcs,
}
impl CompactionStrategyType {
pub fn minor(&self) -> Box<dyn CompactionStrategy> {
match self {
Self::Stcs => Box::new(stcs::MinorCompaction),
}
}
pub fn tombstone(&self) -> Box<dyn CompactionStrategy> {
match self {
Self::Stcs => Box::new(stcs::TombstoneCompaction),
}
}
pub fn major(&self) -> Box<dyn CompactionStrategy> {
match self {
Self::Stcs => Box::new(stcs::MajorCompaction),
}
}
}
pub struct CompactionResult {
pub removed_ids: Vec<u64>,
pub new_sst_path: Option<String>,
pub new_sst_id: Option<u64>,
}
pub fn dedup_records(
merge_iter: impl Iterator<Item = Record>,
) -> (Vec<PointEntry>, Vec<RangeTombstone>) {
let mut point_entries = Vec::new();
let mut range_tombstones = Vec::new();
let mut last_key: Option<Vec<u8>> = None;
for record in merge_iter {
match record {
Record::RangeDelete {
start,
end,
lsn,
timestamp,
} => {
range_tombstones.push(RangeTombstone {
start,
end,
lsn,
timestamp,
});
}
Record::Put {
key,
value,
lsn,
timestamp,
} => {
if last_key.as_ref() == Some(&key) {
continue; }
last_key = Some(key.clone());
point_entries.push(PointEntry {
key,
value: Some(value),
lsn,
timestamp,
});
}
Record::Delete {
key,
lsn,
timestamp,
} => {
if last_key.as_ref() == Some(&key) {
continue; }
last_key = Some(key.clone());
point_entries.push(PointEntry {
key,
value: None,
lsn,
timestamp,
});
}
}
}
(point_entries, range_tombstones)
}
pub fn full_range_scan_iters<'a>(
sstables: &'a [&'a SSTable],
) -> Result<Vec<Box<dyn Iterator<Item = Record> + 'a>>, SSTableError> {
if sstables.is_empty() {
return Ok(Vec::new());
}
let min_key = sstables
.iter()
.map(|s| &s.properties.min_key)
.min()
.ok_or_else(|| SSTableError::Internal("empty sstables in full_range_scan".into()))?
.clone();
let mut max_key = sstables
.iter()
.map(|s| &s.properties.max_key)
.max()
.ok_or_else(|| SSTableError::Internal("empty sstables in full_range_scan".into()))?
.clone();
max_key.push(0xFF);
let mut iters: Vec<Box<dyn Iterator<Item = Record> + 'a>> = Vec::new();
for sst in sstables {
let scan = sst.scan(&min_key, &max_key)?;
iters.push(Box::new(scan));
}
Ok(iters)
}
#[derive(Debug, thiserror::Error)]
pub enum CompactionError {
#[error("SSTable error: {0}")]
SSTable(#[from] SSTableError),
#[error("Manifest error: {0}")]
Manifest(#[from] ManifestError),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
}
pub(crate) fn finalize_compaction(
manifest: &mut Manifest,
data_dir: &str,
removed_ids: Vec<u64>,
point_entries: Vec<PointEntry>,
range_tombstones: Vec<RangeTombstone>,
) -> Result<CompactionResult, CompactionError> {
use std::fs;
use std::path::PathBuf;
if point_entries.is_empty() && range_tombstones.is_empty() {
info!(
removed_count = removed_ids.len(),
?removed_ids,
"finalize: all entries eliminated, removing old SSTables"
);
manifest.apply_compaction(Vec::new(), removed_ids.clone())?;
manifest.checkpoint()?;
for id in &removed_ids {
let path = format!("{}/{}/{:06}.sst", data_dir, SSTABLE_DIR, id);
if let Err(e) = fs::remove_file(&path) {
tracing::warn!(id, %e, "failed to remove old SSTable file during compaction");
}
}
return Ok(CompactionResult {
removed_ids,
new_sst_path: None,
new_sst_id: None,
});
}
let new_sst_id = manifest.allocate_sst_id()?;
let new_sst_path = format!("{}/{}/{:06}.sst", data_dir, SSTABLE_DIR, new_sst_id);
let point_count = point_entries.len();
let range_count = range_tombstones.len();
debug!(
new_sst_id,
point_count,
range_count,
removed_count = removed_ids.len(),
path = %new_sst_path,
"finalize: building new SSTable"
);
sstable::SstWriter::new(&new_sst_path).build(
point_entries.into_iter(),
point_count,
range_tombstones.into_iter(),
range_count,
)?;
let new_entry = ManifestSstEntry {
id: new_sst_id,
path: PathBuf::from(&new_sst_path),
};
manifest.apply_compaction(vec![new_entry], removed_ids.clone())?;
manifest.checkpoint()?;
for id in &removed_ids {
let path = format!("{}/{}/{:06}.sst", data_dir, SSTABLE_DIR, id);
if let Err(e) = fs::remove_file(&path) {
tracing::warn!(id, %e, "failed to remove old SSTable file during compaction");
}
}
Ok(CompactionResult {
removed_ids,
new_sst_path: Some(new_sst_path),
new_sst_id: Some(new_sst_id),
})
}