use std::fs;
use std::sync::Arc;
use std::time::Duration;
use zerocopy::FromBytes;
use crate::Key;
use crate::disk_loc::DiskLoc;
use crate::entry::{EntryHeader, entry_size};
use crate::error::DbResult;
use crate::hint;
use crate::io::direct;
use crate::shard::{ImmutableFile, Shard};
type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;
#[cfg(feature = "encryption")]
use crate::io::tags::{self, TagFile};
pub trait CompactionIndex<K: Key>: Send + Sync {
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
fn contains_key(&self, key: &K) -> bool;
}
#[cfg(feature = "replication")]
pub trait CompactionGuard: Send + Sync {
fn min_replicated_gsn(&self, shard_id: u8) -> u64;
}
#[cfg(feature = "replication")]
pub struct NoReplicationGuard;
#[cfg(feature = "replication")]
impl CompactionGuard for NoReplicationGuard {
fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
u64::MAX
}
}
pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
) -> DbResult<usize> {
compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
}
#[cfg(feature = "replication")]
pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
guard: &dyn CompactionGuard,
) -> DbResult<usize> {
let min_gsn = guard.min_replicated_gsn(shard.id);
compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
}
fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
min_replicated_gsn: u64,
) -> DbResult<usize> {
let mut files_to_compact = Vec::new();
#[cfg(feature = "encryption")]
let cipher_opt: Option<Arc<PageCipher>>;
{
let inner = shard.lock();
#[cfg(feature = "encryption")]
{
cipher_opt = inner.cipher.clone();
}
for file in &inner.immutable {
let total = file.total_bytes;
let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
if total > 0 && (dead as f64 / total as f64) > threshold {
files_to_compact.push(file.clone());
}
}
}
if files_to_compact.is_empty() {
return Ok(0);
}
if min_replicated_gsn < u64::MAX {
files_to_compact.retain(|file| {
let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
Some(max_gsn) => max_gsn < min_replicated_gsn,
None => false, }
});
if files_to_compact.is_empty() {
return Ok(0);
}
}
files_to_compact.sort_by_key(|f| f.file_id);
files_to_compact.truncate(4);
let compact_start = std::time::Instant::now();
let new_file_id = {
let mut inner = shard.lock();
let id = inner.next_file_id;
inner.next_file_id += 1;
id
};
let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
let tmp_path = shard.dir().join(format!("{new_file_id:06}.data.tmp"));
let tmp_file = direct::open_write(&tmp_path)?;
let mut write_offset: u64 = 0;
const BATCH_SIZE: usize = 256;
struct BatchEntry<K> {
key: K,
gsn: u64,
old_loc: DiskLoc,
new_loc: DiskLoc,
is_tombstone: bool,
}
let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
#[cfg(feature = "encryption")]
let mut plaintext_buf: Option<Vec<u8>> = if cipher_opt.is_some() {
Some(Vec::new())
} else {
None
};
for old_arc in &files_to_compact {
let file = &old_arc.file;
let file_len = old_arc.total_bytes;
let mut offset: u64 = 0;
#[cfg(feature = "encryption")]
let read_fn: Box<ReadFn> =
if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
let c = cipher.clone();
let fid = old_arc.file_id;
let tp = tags::tags_path_for_data(&old_arc.path);
let tf = Arc::new(TagFile::open_read(&tp)?);
Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
} else {
Box::new(direct::pread_value)
};
#[cfg(not(feature = "encryption"))]
let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
while offset + size_of::<EntryHeader>() as u64 <= file_len {
let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
Ok(b) => b,
Err(_) => break,
};
let header = match EntryHeader::read_from_bytes(&header_bytes) {
Ok(h) => h,
Err(_) => break,
};
let total = entry_size(size_of::<K>(), header.value_len);
if offset + total > file_len {
break;
}
let old_loc = DiskLoc::new(
shard.id,
old_arc.file_id as u16,
(offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
header.value_len,
);
let entry_bytes = read_fn(file, offset, total as usize)?;
let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
let key: K = K::from_bytes(key_bytes);
#[cfg(feature = "encryption")]
if let Some(ref mut buf) = plaintext_buf {
if buf.len() < (write_offset + total) as usize {
buf.resize((write_offset + total) as usize, 0);
}
buf[write_offset as usize..(write_offset + total) as usize]
.copy_from_slice(&entry_bytes);
} else {
direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
}
#[cfg(not(feature = "encryption"))]
direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
let new_loc = DiskLoc::new(
shard.id,
new_file_id as u16,
(write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
header.value_len,
);
batch.push(BatchEntry {
key,
gsn: header.gsn,
old_loc,
new_loc,
is_tombstone: header.is_tombstone(),
});
write_offset += total;
offset += total;
}
}
#[cfg(feature = "encryption")]
let tmp_tags_path = if let (Some(cipher), Some(mut buf)) = (&cipher_opt, plaintext_buf.take()) {
let padded_len = (buf.len() + 4095) & !4095;
buf.resize(padded_len, 0);
let num_pages = padded_len / 4096;
let mut tag_list = Vec::with_capacity(num_pages);
for i in 0..num_pages {
let page = &mut buf[i * 4096..(i + 1) * 4096];
let tag = cipher.encrypt_page(new_file_id, i as u64, page)?;
tag_list.push(tag);
}
direct::pwrite_at(&tmp_file, &buf, 0)?;
let tp = shard.dir().join(format!("{new_file_id:06}.tags.tmp"));
let tf = TagFile::open_write(&tp)?;
tf.write_tags(0, &tag_list)?;
tf.sync()?;
Some(tp)
} else {
None
};
direct::fsync(&tmp_file)?;
let final_data_path = shard.dir().join(format!("{new_file_id:06}.data"));
{
let mut inner = shard.lock();
fs::rename(&tmp_path, &final_data_path)?;
#[cfg(feature = "encryption")]
let final_tag_file = if let Some(ref tp) = tmp_tags_path {
let final_tags_path = shard.dir().join(format!("{new_file_id:06}.tags"));
fs::rename(tp, &final_tags_path)?;
Some(TagFile::open_read(&final_tags_path)?)
} else {
None
};
let final_file = direct::open_read(&final_data_path)?;
inner.immutable.push(Arc::new(ImmutableFile {
file: final_file,
file_id: new_file_id,
#[cfg(feature = "encryption")]
path: final_data_path,
total_bytes: write_offset,
#[cfg(feature = "encryption")]
tag_file: final_tag_file,
}));
inner.immutable.sort_by_key(|f| f.file_id);
}
let mut compacted_entries = 0;
let mut live_hint_data: Vec<u8> = Vec::new();
let key_len = size_of::<K>();
for chunk in batch.chunks(BATCH_SIZE) {
let mut inner = shard.lock();
for entry in chunk {
if entry.is_tombstone {
if index.contains_key(&entry.key) {
inner.add_dead_bytes(
entry.new_loc.file_id as u32,
entry_size(size_of::<K>(), entry.new_loc.len),
);
} else {
compacted_entries += 1;
append_hint_entry(
&mut live_hint_data,
entry.gsn,
&entry.key,
entry.new_loc.offset as u64,
entry.new_loc.len,
key_len,
);
}
} else {
if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
compacted_entries += 1;
append_hint_entry(
&mut live_hint_data,
entry.gsn,
&entry.key,
entry.new_loc.offset as u64,
entry.new_loc.len,
key_len,
);
} else {
inner.add_dead_bytes(
entry.new_loc.file_id as u32,
entry_size(size_of::<K>(), entry.new_loc.len),
);
}
}
}
}
{
let mut inner = shard.lock();
inner
.immutable
.retain(|f| !old_file_ids.contains(&f.file_id));
for fid in &old_file_ids {
inner.dead_bytes.remove(fid);
}
}
let hint_data = live_hint_data;
let tmp_hint_path = shard.dir().join(format!("{new_file_id:06}.hint.tmp"));
hint::write_hint_file(&tmp_hint_path, &hint_data)?;
let final_hint_path = shard.dir().join(format!("{new_file_id:06}.hint"));
fs::rename(&tmp_hint_path, &final_hint_path)?;
for old_arc in &files_to_compact {
index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
}
for fid in &old_file_ids {
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
#[cfg(feature = "encryption")]
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
}
let elapsed = compact_start.elapsed().as_secs_f64();
metrics::counter!("armdb.compaction.runs").increment(1);
metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
tracing::info!(
entries = compacted_entries,
files = old_file_ids.len(),
elapsed_ms = (elapsed * 1000.0) as u64,
"compaction complete"
);
Ok(compacted_entries)
}
pub struct Compactor {
stop: crate::shutdown::ShutdownSignal,
handle: Option<std::thread::JoinHandle<()>>,
}
impl Compactor {
pub fn start(
compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
interval: Duration,
) -> Self {
Self::start_with_signal(compact_fn, interval, crate::shutdown::ShutdownSignal::new())
}
pub fn start_with_signal(
compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
interval: Duration,
signal: crate::shutdown::ShutdownSignal,
) -> Self {
let stop = signal.clone();
let handle = std::thread::spawn(move || {
while !stop.is_shutdown() {
if stop.wait_timeout(interval) {
break;
}
match compact_fn() {
Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
Err(e) => tracing::error!(error = %e, "compaction error"),
_ => {}
}
}
});
Self {
stop: signal,
handle: Some(handle),
}
}
pub fn stop(&mut self) {
self.stop.shutdown();
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for Compactor {
fn drop(&mut self) {
self.stop();
}
}
fn append_hint_entry<K: Key>(
buf: &mut Vec<u8>,
gsn: u64,
key: &K,
value_offset: u64,
value_len: u32,
_key_len: usize,
) {
buf.extend_from_slice(&gsn.to_ne_bytes());
buf.extend_from_slice(key.as_bytes());
buf.extend_from_slice(&value_offset.to_ne_bytes());
buf.extend_from_slice(&value_len.to_ne_bytes());
}
fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
let data = hint::read_hint_file(hint_path).ok()??;
let entry_size = hint::hint_entry_size(key_len);
if data.is_empty() || data.len() % entry_size != 0 {
return None;
}
let entry_count = data.len() / entry_size;
let last_start = (entry_count - 1) * entry_size;
let gsn_bytes: [u8; 8] = data[last_start..last_start + 8].try_into().ok()?;
let gsn = u64::from_ne_bytes(gsn_bytes);
Some(gsn & !crate::entry::TOMBSTONE_BIT)
}