use std::fs;
use std::sync::Arc;
use std::time::Duration;
use zerocopy::FromBytes;
use crate::Key;
use crate::disk_loc::DiskLoc;
use crate::entry::{EntryHeader, entry_size};
use crate::error::DbResult;
use crate::hint;
use crate::io::direct;
use crate::shard::{ImmutableFile, Shard};
type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;
#[cfg(feature = "encryption")]
use crate::io::tags::{self, TagFile};
fn dir_fsync(dir: &std::path::Path) -> DbResult<()> {
let d = fs::File::open(dir)?;
d.sync_all()?;
Ok(())
}
pub trait CompactionIndex<K: Key>: Send + Sync {
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
fn contains_key(&self, key: &K) -> bool;
}
#[cfg(feature = "replication")]
pub trait CompactionGuard: Send + Sync {
fn min_replicated_gsn(&self, shard_id: u8) -> u64;
}
#[cfg(feature = "replication")]
pub struct NoReplicationGuard;
#[cfg(feature = "replication")]
impl CompactionGuard for NoReplicationGuard {
fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
u64::MAX
}
}
fn should_rotate_output(write_offset: u64, needed: u64, max_file_size: u64) -> bool {
write_offset + needed > max_file_size
}
pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
) -> DbResult<usize> {
compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
}
#[cfg(feature = "replication")]
pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
guard: &dyn CompactionGuard,
) -> DbResult<usize> {
let min_gsn = guard.min_replicated_gsn(shard.id);
compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
}
struct OutputState {
file_id: u32,
tmp_file: fs::File,
write_offset: u64,
#[cfg(feature = "encryption")]
enc_state: Option<EncryptionState>,
}
#[cfg(feature = "encryption")]
struct EncryptionState {
cipher: Arc<PageCipher>,
page_buf: Box<[u8; 4096]>,
page_offset: usize,
pages_written: u64,
tag_list: Vec<[u8; 16]>,
}
#[cfg(feature = "encryption")]
impl EncryptionState {
fn flush_page(&mut self, file: &fs::File, file_id: u32) -> DbResult<()> {
let tag = self
.cipher
.encrypt_page(file_id, self.pages_written, &mut *self.page_buf)?;
direct::pwrite_at(file, &*self.page_buf, self.pages_written * 4096)?;
self.tag_list.push(tag);
self.pages_written += 1;
self.page_buf.fill(0);
self.page_offset = 0;
Ok(())
}
fn write_bytes(&mut self, file: &fs::File, file_id: u32, data: &[u8]) -> DbResult<()> {
let mut remaining = data;
while !remaining.is_empty() {
let space = 4096 - self.page_offset;
let chunk = remaining.len().min(space);
self.page_buf[self.page_offset..self.page_offset + chunk]
.copy_from_slice(&remaining[..chunk]);
self.page_offset += chunk;
remaining = &remaining[chunk..];
if self.page_offset == 4096 {
self.flush_page(file, file_id)?;
}
}
Ok(())
}
}
struct FinalizedOutput {
file_id: u32,
total_bytes: u64,
final_data_path: std::path::PathBuf,
#[cfg(feature = "encryption")]
final_tag_path: Option<std::path::PathBuf>,
}
fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
shard: &Shard,
index: &I,
threshold: f64,
min_replicated_gsn: u64,
) -> DbResult<usize> {
let mut files_to_compact = Vec::new();
#[cfg(feature = "encryption")]
let cipher_opt: Option<Arc<PageCipher>>;
let max_file_size: u64;
let cooldown_ids: Vec<u32>;
{
let mut inner = shard.lock();
max_file_size = inner.max_file_size;
cooldown_ids = std::mem::take(&mut inner.last_compaction_output_ids);
#[cfg(feature = "encryption")]
{
cipher_opt = inner.cipher.clone();
}
for file in &inner.immutable {
let total = file.total_bytes;
let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
if total > 0
&& (dead as f64 / total as f64) > threshold
&& !cooldown_ids.contains(&file.file_id)
{
files_to_compact.push(file.clone());
}
}
}
if files_to_compact.is_empty() {
return Ok(0);
}
if min_replicated_gsn < u64::MAX {
files_to_compact.retain(|file| {
let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
Some(max_gsn) => max_gsn < min_replicated_gsn,
None => false, }
});
if files_to_compact.is_empty() {
return Ok(0);
}
}
files_to_compact.sort_by_key(|f| f.file_id);
files_to_compact.truncate(4);
let compact_start = std::time::Instant::now();
let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
let first_file_id = {
let mut inner = shard.lock();
inner.allocate_file_id()?
};
let mut output = open_output(shard, first_file_id)?;
const BATCH_SIZE: usize = 256;
struct BatchEntry<K> {
key: K,
gsn: u64,
old_loc: DiskLoc,
new_loc: DiskLoc,
is_tombstone: bool,
}
let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
let mut pending_outputs: Vec<FinalizedOutput> = Vec::new();
for old_arc in &files_to_compact {
let file = &old_arc.file;
let file_len = old_arc.total_bytes;
let mut offset: u64 = 0;
#[cfg(feature = "encryption")]
let read_fn: Box<ReadFn> =
if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
let c = cipher.clone();
let fid = old_arc.file_id;
let tp = tags::tags_path_for_data(&old_arc.path);
let tf = Arc::new(TagFile::open_read(&tp)?);
Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
} else {
Box::new(direct::pread_value)
};
#[cfg(not(feature = "encryption"))]
let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
while offset + size_of::<EntryHeader>() as u64 <= file_len {
let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
Ok(b) => b,
Err(_) => break,
};
let header = match EntryHeader::read_from_bytes(&header_bytes) {
Ok(h) => h,
Err(_) => break,
};
if header.gsn == 0 && header.crc32 == 0 && header.value_len == 0 {
break;
}
let total = entry_size(size_of::<K>(), header.value_len);
if offset + total > file_len {
break;
}
if output.write_offset > 0
&& should_rotate_output(output.write_offset, total, max_file_size)
{
let next_file_id = {
let mut inner = shard.lock();
inner.allocate_file_id()?
};
let finished = finalize_output(shard, output)?;
pending_outputs.push(finished);
output = open_output(shard, next_file_id)?;
}
let old_loc = DiskLoc::new(
shard.id,
old_arc.file_id,
(offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
header.value_len,
);
let entry_bytes = read_fn(file, offset, total as usize)?;
let key_start = size_of::<EntryHeader>();
let key_end = key_start + size_of::<K>();
let val_end = key_end + header.value_len as usize;
let computed_crc = crate::entry::compute_crc32(
header.gsn,
header.value_len,
&entry_bytes[key_start..key_end],
&entry_bytes[key_end..val_end],
);
if computed_crc != header.crc32 {
for out in &pending_outputs {
let _ = fs::remove_file(&out.final_data_path);
#[cfg(feature = "encryption")]
if let Some(ref tp) = out.final_tag_path {
let _ = fs::remove_file(tp);
}
}
let cur_tmp = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
let _ = fs::remove_file(&cur_tmp);
#[cfg(feature = "encryption")]
{
let cur_tags_tmp = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
let _ = fs::remove_file(&cur_tags_tmp);
}
return Err(crate::error::DbError::CrcMismatch {
expected: header.crc32,
actual: computed_crc,
});
}
#[cfg(feature = "encryption")]
if let Some(ref mut enc) = output.enc_state {
enc.write_bytes(&output.tmp_file, output.file_id, &entry_bytes)?;
} else {
direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
}
#[cfg(not(feature = "encryption"))]
direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
let value_offset =
output.write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64;
debug_assert!(value_offset <= u32::MAX as u64);
let new_loc = DiskLoc::new(
shard.id,
output.file_id,
value_offset as u32,
header.value_len,
);
let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
let key: K = K::from_bytes(key_bytes);
batch.push(BatchEntry {
key,
gsn: header.gsn,
old_loc,
new_loc,
is_tombstone: header.is_tombstone(),
});
output.write_offset += total;
offset += total;
}
}
if output.write_offset > 0 {
let last_output = finalize_output(shard, output)?;
pending_outputs.push(last_output);
} else {
let tmp_data_path = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
let _ = std::fs::remove_file(&tmp_data_path);
#[cfg(feature = "encryption")]
{
let tmp_tags_path = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
let _ = std::fs::remove_file(&tmp_tags_path);
}
}
#[cfg(feature = "encryption")]
let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
.iter()
.map(|out| {
let final_file = direct::open_read(&out.final_data_path)?;
let final_tag_file = if let Some(ref tp) = out.final_tag_path {
Some(Arc::new(TagFile::open_read(tp)?))
} else {
None
};
Ok(Arc::new(ImmutableFile {
file: final_file,
file_id: out.file_id,
path: out.final_data_path.clone(),
total_bytes: out.total_bytes,
tag_file: final_tag_file,
}))
})
.collect::<DbResult<Vec<_>>>()?;
#[cfg(not(feature = "encryption"))]
let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
.iter()
.map(|out| {
let final_file = direct::open_read(&out.final_data_path)?;
Ok(Arc::new(ImmutableFile {
file: final_file,
file_id: out.file_id,
total_bytes: out.total_bytes,
}))
})
.collect::<DbResult<Vec<_>>>()?;
{
let mut inner = shard.lock();
inner.immutable.extend(new_immutables);
inner.immutable.sort_by_key(|f| f.file_id);
inner.last_compaction_output_ids = pending_outputs.iter().map(|o| o.file_id).collect();
}
let mut compacted_entries = 0;
let key_len = size_of::<K>();
let mut live_hint_data: std::collections::HashMap<u32, Vec<u8>> = pending_outputs
.iter()
.map(|o| (o.file_id, Vec::new()))
.collect();
for chunk in batch.chunks(BATCH_SIZE) {
let mut inner = shard.lock();
for entry in chunk {
if entry.is_tombstone {
if index.contains_key(&entry.key) {
inner.add_dead_bytes(
entry.new_loc.file_id,
entry_size(size_of::<K>(), entry.new_loc.len),
);
} else {
compacted_entries += 1;
if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
append_hint_entry(
buf,
entry.gsn,
&entry.key,
entry.new_loc.offset as u64,
entry.new_loc.len,
key_len,
);
}
}
} else if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
compacted_entries += 1;
if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
append_hint_entry(
buf,
entry.gsn,
&entry.key,
entry.new_loc.offset as u64,
entry.new_loc.len,
key_len,
);
}
} else {
inner.add_dead_bytes(
entry.new_loc.file_id,
entry_size(size_of::<K>(), entry.new_loc.len),
);
}
}
}
{
let mut inner = shard.lock();
inner
.immutable
.retain(|f| !old_file_ids.contains(&f.file_id));
for fid in &old_file_ids {
inner.dead_bytes.remove(fid);
}
for old_arc in &files_to_compact {
index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
}
}
for out in &pending_outputs {
let hint_data = live_hint_data.remove(&out.file_id).unwrap_or_default();
let hint_path = shard.dir().join(format!("{:06}.hint", out.file_id));
hint::write_hint_file(&hint_path, &hint_data)?;
}
dir_fsync(shard.dir())?;
for fid in &old_file_ids {
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
#[cfg(feature = "encryption")]
let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
}
dir_fsync(shard.dir())?;
let elapsed = compact_start.elapsed().as_secs_f64();
metrics::counter!("armdb.compaction.runs").increment(1);
metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
tracing::info!(
entries = compacted_entries,
files = old_file_ids.len(),
elapsed_ms = (elapsed * 1000.0) as u64,
"compaction complete"
);
Ok(compacted_entries)
}
fn open_output(shard: &Shard, file_id: u32) -> DbResult<OutputState> {
let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
match fs::remove_file(&tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
let tmp_file = direct::open_write(&tmp_path)?;
Ok(OutputState {
file_id,
tmp_file,
write_offset: 0,
#[cfg(feature = "encryption")]
enc_state: shard.lock().cipher.clone().map(|cipher| EncryptionState {
cipher,
page_buf: Box::new([0u8; 4096]),
page_offset: 0,
pages_written: 0,
tag_list: Vec::new(),
}),
})
}
fn finalize_output(shard: &Shard, output: OutputState) -> DbResult<FinalizedOutput> {
let file_id = output.file_id;
let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
let final_data_path = shard.dir().join(format!("{file_id:06}.data"));
#[cfg(feature = "encryption")]
let (total_bytes, final_tag_path) = match output.enc_state {
Some(mut enc) => {
if enc.page_offset > 0 {
enc.flush_page(&output.tmp_file, file_id)?;
}
let tp = shard.dir().join(format!("{file_id:06}.tags.tmp"));
match fs::remove_file(&tp) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
let tf = TagFile::open_write(&tp)?;
tf.write_tags(0, &enc.tag_list)?;
tf.sync()?;
let final_tags_path = shard.dir().join(format!("{file_id:06}.tags"));
direct::fsync(&output.tmp_file)?;
fs::rename(&tmp_path, &final_data_path)?;
fs::rename(&tp, &final_tags_path)?;
dir_fsync(shard.dir())?;
(enc.pages_written * 4096, Some(final_tags_path))
}
None => {
direct::fsync(&output.tmp_file)?;
fs::rename(&tmp_path, &final_data_path)?;
dir_fsync(shard.dir())?;
(output.write_offset, None)
}
};
#[cfg(not(feature = "encryption"))]
{
direct::fsync(&output.tmp_file)?;
fs::rename(&tmp_path, &final_data_path)?;
dir_fsync(shard.dir())?;
}
Ok(FinalizedOutput {
file_id,
#[cfg(feature = "encryption")]
total_bytes,
#[cfg(not(feature = "encryption"))]
total_bytes: output.write_offset,
final_data_path,
#[cfg(feature = "encryption")]
final_tag_path,
})
}
pub struct Compactor {
stop: crate::shutdown::ShutdownSignal,
handle: Option<std::thread::JoinHandle<()>>,
}
impl Compactor {
pub fn start(
compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
interval: Duration,
) -> Self {
Self::start_with_signal(compact_fn, interval, crate::shutdown::ShutdownSignal::new())
}
pub fn start_with_signal(
compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
interval: Duration,
signal: crate::shutdown::ShutdownSignal,
) -> Self {
let stop = signal.clone();
let handle = std::thread::spawn(move || {
while !stop.is_shutdown() {
if stop.wait_timeout(interval) {
break;
}
match compact_fn() {
Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
Err(e) => tracing::error!(error = %e, "compaction error"),
_ => {}
}
}
});
Self {
stop: signal,
handle: Some(handle),
}
}
pub fn stop(&mut self) {
self.stop.shutdown();
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for Compactor {
fn drop(&mut self) {
self.stop();
}
}
fn append_hint_entry<K: Key>(
buf: &mut Vec<u8>,
gsn: u64,
key: &K,
value_offset: u64,
value_len: u32,
_key_len: usize,
) {
buf.extend_from_slice(&gsn.to_ne_bytes());
buf.extend_from_slice(key.as_bytes());
buf.extend_from_slice(&value_offset.to_ne_bytes());
buf.extend_from_slice(&value_len.to_ne_bytes());
}
fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
let data = hint::read_hint_file(hint_path).ok()??;
let entry_size = hint::hint_entry_size(key_len);
if data.is_empty() || data.len() % entry_size != 0 {
return None;
}
let mut max_gsn = 0u64;
for chunk in data.chunks_exact(entry_size) {
let gsn_bytes: [u8; 8] = chunk[..8].try_into().ok()?;
let gsn = u64::from_ne_bytes(gsn_bytes) & !crate::entry::TOMBSTONE_BIT;
if gsn > max_gsn {
max_gsn = gsn;
}
}
Some(max_gsn)
}
#[cfg(test)]
mod compaction_output_rotation_tests {
use super::*;
#[test]
fn output_should_rotate_when_offset_plus_needed_exceeds_limit() {
assert!(!should_rotate_output(0, 4096, 64 * 4096));
assert!(!should_rotate_output(60 * 4096, 4096, 64 * 4096));
assert!(should_rotate_output(63 * 4096, 4096 + 1, 64 * 4096));
assert!(should_rotate_output(
u32::MAX as u64 - 10,
100,
u32::MAX as u64
));
}
}
#[cfg(test)]
mod compaction_file_max_gsn_tests {
use super::*;
use tempfile::tempdir;
#[test]
fn file_max_gsn_returns_true_max_not_last_entry() {
let dir = tempdir().unwrap();
let hint_path = dir.path().join("000001.hint");
let key_len = 8;
let entry_size = crate::hint::hint_entry_size(key_len);
let mut data = Vec::new();
for &gsn in &[100u64, 300u64, 150u64] {
let start = data.len();
data.resize(start + entry_size, 0);
data[start..start + 8].copy_from_slice(&gsn.to_ne_bytes());
}
std::fs::write(&hint_path, &data).unwrap();
let result = file_max_gsn(&hint_path, 1, key_len);
assert_eq!(result, Some(300));
}
#[test]
fn file_max_gsn_strips_tombstone_bit() {
let dir = tempdir().unwrap();
let hint_path = dir.path().join("000002.hint");
let key_len = 8;
let entry_size = crate::hint::hint_entry_size(key_len);
let tombstone_gsn = 200u64 | crate::entry::TOMBSTONE_BIT;
let mut data = vec![0u8; entry_size * 2];
data[0..8].copy_from_slice(&100u64.to_ne_bytes());
data[entry_size..entry_size + 8].copy_from_slice(&tombstone_gsn.to_ne_bytes());
std::fs::write(&hint_path, &data).unwrap();
let result = file_max_gsn(&hint_path, 2, key_len);
assert_eq!(result, Some(200));
}
}