use super::sharded_index::ShardedIndex;
use memmap2::MmapMut;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
#[allow(unused_variables)]
pub fn punch_hole(file: &File, offset: u64, len: u64) -> io::Result<bool> {
if len == 0 {
return Ok(false);
}
#[cfg(target_os = "linux")]
{
punch_hole_linux(file, offset, len)
}
#[cfg(target_os = "windows")]
{
punch_hole_windows(file, offset, len)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
punch_hole_fallback(file, offset, len)
}
}
#[cfg(target_os = "linux")]
fn punch_hole_linux(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::os::unix::io::AsRawFd;
const FALLOC_FL_KEEP_SIZE: i32 = 0x01;
const FALLOC_FL_PUNCH_HOLE: i32 = 0x02;
let fd = file.as_raw_fd();
let mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;
let offset_off_t = libc::off_t::try_from(offset).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"offset does not fit in libc::off_t",
)
})?;
let len_off_t = libc::off_t::try_from(len).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"len does not fit in libc::off_t",
)
})?;
let ret = unsafe { libc::fallocate(fd, mode, offset_off_t, len_off_t) };
if ret == 0 {
Ok(true) } else {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EOPNOTSUPP) {
punch_hole_fallback(file, offset, len)
} else {
Err(err)
}
}
}
#[cfg(target_os = "windows")]
fn punch_hole_windows(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::Foundation::{FALSE, HANDLE};
use windows_sys::Win32::System::Ioctl::FSCTL_SET_ZERO_DATA;
use windows_sys::Win32::System::IO::DeviceIoControl;
#[repr(C)]
struct FileZeroDataInformation {
file_offset: i64,
beyond_final_zero: i64,
}
let handle = file.as_raw_handle() as HANDLE;
#[allow(clippy::cast_possible_wrap)]
let info = FileZeroDataInformation {
file_offset: i64::try_from(offset).unwrap_or(i64::MAX),
beyond_final_zero: i64::try_from(offset.saturating_add(len)).unwrap_or(i64::MAX),
};
let mut bytes_returned: u32 = 0;
let result = unsafe {
DeviceIoControl(
handle,
FSCTL_SET_ZERO_DATA,
std::ptr::addr_of!(info).cast(),
#[allow(clippy::cast_possible_truncation)]
{
std::mem::size_of::<FileZeroDataInformation>() as u32
},
std::ptr::null_mut(),
0,
std::ptr::addr_of_mut!(bytes_returned),
std::ptr::null_mut(),
)
};
if result == FALSE {
punch_hole_fallback(file, offset, len)
} else {
Ok(true) }
}
#[cfg(any(
not(any(target_os = "linux", target_os = "windows")),
target_os = "linux",
target_os = "windows"
))]
const FALLBACK_CHUNK_SIZE: usize = 64 * 1024;
fn punch_hole_fallback(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::io::{Seek, SeekFrom, Write};
let mut file = file.try_clone()?;
file.seek(SeekFrom::Start(offset))?;
let zeros = vec![0u8; FALLBACK_CHUNK_SIZE];
#[allow(clippy::cast_possible_truncation)]
let mut remaining = len as usize;
while remaining > 0 {
let to_write = remaining.min(FALLBACK_CHUNK_SIZE);
file.write_all(&zeros[..to_write])?;
remaining -= to_write;
}
Ok(false) }
pub(super) fn persist_flat_index(path: &Path, index: &FxHashMap<u64, usize>) -> io::Result<()> {
let bytes = postcard::to_allocvec(index).map_err(io::Error::other)?;
let mut writer = io::BufWriter::new(File::create(path)?);
writer.write_all(&bytes)?;
writer.flush()?;
writer
.into_inner()
.map_err(std::io::IntoInnerError::into_error)?
.sync_all()
}
pub(super) fn persist_flat_index_atomic(
path: &Path,
index: &FxHashMap<u64, usize>,
) -> io::Result<()> {
let mut staging = path.as_os_str().to_owned();
staging.push(".new");
let staging = std::path::PathBuf::from(staging);
persist_flat_index(&staging, index)?;
promote_index_sidecar(&staging, path)?;
match path.parent() {
Some(dir) if !dir.as_os_str().is_empty() => sync_dir(dir),
_ => Ok(()),
}
}
fn promote_index_sidecar(sidecar: &Path, idx_path: &Path) -> io::Result<()> {
#[cfg(not(unix))]
if idx_path.exists() {
std::fs::remove_file(idx_path)?;
}
std::fs::rename(sidecar, idx_path)
}
fn sync_dir(dir: &Path) -> io::Result<()> {
#[cfg(unix)]
{
File::open(dir)?.sync_all()
}
#[cfg(not(unix))]
{
let _ = dir;
Ok(())
}
}
pub fn recover_compaction_artifacts(data_path: &Path) -> io::Result<()> {
let bak_path = data_path.with_extension("dat.bak");
if bak_path.exists() {
if data_path.exists() {
std::fs::remove_file(&bak_path)?;
} else {
std::fs::rename(&bak_path, data_path)?;
}
}
recover_staged_compaction(data_path)
}
fn recover_staged_compaction(data_path: &Path) -> io::Result<()> {
let new_path = data_path.with_extension("dat.tmp");
let idx_tmp_path = data_path.with_file_name("vectors.idx.tmp");
if new_path.exists() {
std::fs::remove_file(&new_path)?;
if idx_tmp_path.exists() {
std::fs::remove_file(&idx_tmp_path)?;
}
} else if idx_tmp_path.exists() {
promote_index_sidecar(&idx_tmp_path, &data_path.with_file_name("vectors.idx"))?;
}
Ok(())
}
fn atomic_replace(src: &Path, dst: &Path) -> io::Result<()> {
#[cfg(unix)]
{
std::fs::rename(src, dst)
}
#[cfg(windows)]
{
let backup = dst.with_extension("dat.bak");
let _ = std::fs::remove_file(&backup);
if dst.exists() {
std::fs::rename(dst, &backup)?;
}
match std::fs::rename(src, dst) {
Ok(()) => {
let _ = std::fs::remove_file(&backup);
Ok(())
}
Err(e) => {
if backup.exists() {
let _ = std::fs::rename(&backup, dst);
}
Err(e)
}
}
}
#[cfg(not(any(unix, windows)))]
{
std::fs::rename(src, dst)
}
}
pub(super) struct CompactionContext<'a> {
pub path: &'a Path,
pub dimension: usize,
pub index: &'a ShardedIndex,
pub mmap: &'a RwLock<MmapMut>,
pub next_offset: &'a AtomicUsize,
pub wal: &'a RwLock<io::BufWriter<File>>,
pub initial_size: u64,
}
impl CompactionContext<'_> {
pub fn compact(&self) -> io::Result<usize> {
let vector_size = self.dimension * std::mem::size_of::<f32>();
let active_count = self.index.len();
if active_count == 0 {
return Ok(0);
}
let current_offset = self.next_offset.load(Ordering::Acquire);
let active_size = active_count * vector_size;
if current_offset <= active_size {
return Ok(0);
}
let bytes_to_reclaim = current_offset - active_size;
let temp_path = self.path.join("vectors.dat.tmp");
let temp_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
#[allow(clippy::cast_possible_truncation)]
let new_size = (active_size as u64).max(self.initial_size);
temp_file.set_len(new_size)?;
let mut temp_mmap = unsafe { MmapMut::map_mut(&temp_file)? };
let old_index = self.index.to_hashmap();
let mmap = self.mmap.read();
let mut new_index: FxHashMap<u64, usize> = FxHashMap::default();
new_index.reserve(active_count);
let mut new_offset = 0usize;
for (&id, &old_offset) in &old_index {
let Some(src_end) = old_offset.checked_add(vector_size) else {
tracing::warn!(id, old_offset, "compaction: skipping offset overflow");
continue;
};
if src_end > mmap.len() {
tracing::warn!(
id,
old_offset,
mmap_len = mmap.len(),
"compaction: skipping out-of-bounds vector offset"
);
continue;
}
let src = &mmap[old_offset..src_end];
temp_mmap[new_offset..new_offset + vector_size].copy_from_slice(src);
new_index.insert(id, new_offset);
new_offset += vector_size;
}
drop(mmap);
temp_mmap.flush()?;
drop(temp_mmap);
temp_file.sync_all()?;
drop(temp_file);
let idx_tmp_path = self.path.join("vectors.idx.tmp");
persist_flat_index(&idx_tmp_path, &new_index)?;
let data_path = self.path.join("vectors.dat");
self.commit_compaction(&temp_path, &idx_tmp_path, &data_path)?;
let new_data_file = OpenOptions::new().read(true).write(true).open(&data_path)?;
let new_mmap = unsafe { MmapMut::map_mut(&new_data_file)? };
*self.mmap.write() = new_mmap;
self.index.replace_all(new_index);
self.next_offset.store(new_offset, Ordering::Release);
Ok(bytes_to_reclaim)
}
fn commit_compaction(
&self,
temp_path: &Path,
idx_tmp_path: &Path,
data_path: &Path,
) -> io::Result<()> {
let mut wal = self.wal.write();
atomic_replace(temp_path, data_path)?;
promote_index_sidecar(idx_tmp_path, &self.path.join("vectors.idx"))?;
sync_dir(self.path)?;
wal.flush()?;
wal.get_ref().set_len(0)?;
wal.get_ref().sync_all()
}
#[must_use]
pub fn fragmentation_ratio(&self) -> f64 {
let active_count = self.index.len();
if active_count == 0 {
return 0.0;
}
let vector_size = self.dimension * std::mem::size_of::<f32>();
let active_size = active_count * vector_size;
let current_offset = self.next_offset.load(Ordering::Acquire);
if current_offset == 0 {
return 0.0;
}
#[allow(clippy::cast_precision_loss)]
let ratio = 1.0 - (active_size as f64 / current_offset as f64);
ratio.max(0.0)
}
}