use super::sharded_index::ShardedIndex;
use memmap2::MmapMut;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
#[allow(unused_variables)]
pub fn punch_hole(file: &File, offset: u64, len: u64) -> io::Result<bool> {
if len == 0 {
return Ok(false);
}
#[cfg(target_os = "linux")]
{
punch_hole_linux(file, offset, len)
}
#[cfg(target_os = "windows")]
{
punch_hole_windows(file, offset, len)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
punch_hole_fallback(file, offset, len)
}
}
#[cfg(target_os = "linux")]
fn punch_hole_linux(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::os::unix::io::AsRawFd;
const FALLOC_FL_KEEP_SIZE: i32 = 0x01;
const FALLOC_FL_PUNCH_HOLE: i32 = 0x02;
let fd = file.as_raw_fd();
let mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;
let offset_off_t = libc::off_t::try_from(offset).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"offset does not fit in libc::off_t",
)
})?;
let len_off_t = libc::off_t::try_from(len).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"len does not fit in libc::off_t",
)
})?;
let ret = unsafe { libc::fallocate(fd, mode, offset_off_t, len_off_t) };
if ret == 0 {
Ok(true) } else {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EOPNOTSUPP) {
punch_hole_fallback(file, offset, len)
} else {
Err(err)
}
}
}
#[cfg(target_os = "windows")]
fn punch_hole_windows(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::Foundation::{FALSE, HANDLE};
use windows_sys::Win32::System::Ioctl::FSCTL_SET_ZERO_DATA;
use windows_sys::Win32::System::IO::DeviceIoControl;
#[repr(C)]
struct FileZeroDataInformation {
file_offset: i64,
beyond_final_zero: i64,
}
let handle = file.as_raw_handle() as HANDLE;
#[allow(clippy::cast_possible_wrap)]
let info = FileZeroDataInformation {
file_offset: i64::try_from(offset).unwrap_or(i64::MAX),
beyond_final_zero: i64::try_from(offset.saturating_add(len)).unwrap_or(i64::MAX),
};
let mut bytes_returned: u32 = 0;
let result = unsafe {
DeviceIoControl(
handle,
FSCTL_SET_ZERO_DATA,
std::ptr::addr_of!(info).cast(),
#[allow(clippy::cast_possible_truncation)]
{
std::mem::size_of::<FileZeroDataInformation>() as u32
},
std::ptr::null_mut(),
0,
std::ptr::addr_of_mut!(bytes_returned),
std::ptr::null_mut(),
)
};
if result == FALSE {
punch_hole_fallback(file, offset, len)
} else {
Ok(true) }
}
#[cfg(any(
not(any(target_os = "linux", target_os = "windows")),
target_os = "linux",
target_os = "windows"
))]
const FALLBACK_CHUNK_SIZE: usize = 64 * 1024;
fn punch_hole_fallback(file: &File, offset: u64, len: u64) -> io::Result<bool> {
use std::io::{Seek, SeekFrom, Write};
let mut file = file.try_clone()?;
file.seek(SeekFrom::Start(offset))?;
let zeros = vec![0u8; FALLBACK_CHUNK_SIZE];
#[allow(clippy::cast_possible_truncation)]
let mut remaining = len as usize;
while remaining > 0 {
let to_write = remaining.min(FALLBACK_CHUNK_SIZE);
file.write_all(&zeros[..to_write])?;
remaining -= to_write;
}
Ok(false) }
pub fn recover_compaction_artifacts(data_path: &Path) -> io::Result<()> {
let bak_path = data_path.with_extension("dat.bak");
let new_path = data_path.with_extension("dat.tmp");
if bak_path.exists() {
if data_path.exists() {
std::fs::remove_file(&bak_path)?;
} else {
std::fs::rename(&bak_path, data_path)?;
}
}
if new_path.exists() {
std::fs::remove_file(&new_path)?;
}
Ok(())
}
fn atomic_replace(src: &Path, dst: &Path) -> io::Result<()> {
#[cfg(unix)]
{
std::fs::rename(src, dst)
}
#[cfg(windows)]
{
let backup = dst.with_extension("dat.bak");
let _ = std::fs::remove_file(&backup);
if dst.exists() {
std::fs::rename(dst, &backup)?;
}
match std::fs::rename(src, dst) {
Ok(()) => {
let _ = std::fs::remove_file(&backup);
Ok(())
}
Err(e) => {
if backup.exists() {
let _ = std::fs::rename(&backup, dst);
}
Err(e)
}
}
}
#[cfg(not(any(unix, windows)))]
{
std::fs::rename(src, dst)
}
}
pub(super) struct CompactionContext<'a> {
pub path: &'a Path,
pub dimension: usize,
pub index: &'a ShardedIndex,
pub mmap: &'a RwLock<MmapMut>,
pub next_offset: &'a AtomicUsize,
pub wal: &'a RwLock<io::BufWriter<File>>,
pub initial_size: u64,
}
impl CompactionContext<'_> {
pub fn compact(&self) -> io::Result<usize> {
let vector_size = self.dimension * std::mem::size_of::<f32>();
let active_count = self.index.len();
if active_count == 0 {
return Ok(0);
}
let current_offset = self.next_offset.load(Ordering::Acquire);
let active_size = active_count * vector_size;
if current_offset <= active_size {
return Ok(0);
}
let bytes_to_reclaim = current_offset - active_size;
let temp_path = self.path.join("vectors.dat.tmp");
let temp_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
#[allow(clippy::cast_possible_truncation)]
let new_size = (active_size as u64).max(self.initial_size);
temp_file.set_len(new_size)?;
let mut temp_mmap = unsafe { MmapMut::map_mut(&temp_file)? };
let old_index = self.index.to_hashmap();
let mmap = self.mmap.read();
let mut new_index: FxHashMap<u64, usize> = FxHashMap::default();
new_index.reserve(active_count);
let mut new_offset = 0usize;
for (&id, &old_offset) in &old_index {
let src = &mmap[old_offset..old_offset + vector_size];
temp_mmap[new_offset..new_offset + vector_size].copy_from_slice(src);
new_index.insert(id, new_offset);
new_offset += vector_size;
}
drop(mmap);
temp_mmap.flush()?;
drop(temp_mmap);
drop(temp_file);
let data_path = self.path.join("vectors.dat");
atomic_replace(&temp_path, &data_path)?;
let new_data_file = OpenOptions::new().read(true).write(true).open(&data_path)?;
let new_mmap = unsafe { MmapMut::map_mut(&new_data_file)? };
{
let mut wal = self.wal.write();
wal.write_all(&[4u8])?;
wal.flush()?;
}
*self.mmap.write() = new_mmap;
self.index.replace_all(new_index);
self.next_offset.store(new_offset, Ordering::Release);
Ok(bytes_to_reclaim)
}
#[must_use]
pub fn fragmentation_ratio(&self) -> f64 {
let active_count = self.index.len();
if active_count == 0 {
return 0.0;
}
let vector_size = self.dimension * std::mem::size_of::<f32>();
let active_size = active_count * vector_size;
let current_offset = self.next_offset.load(Ordering::Acquire);
if current_offset == 0 {
return 0.0;
}
#[allow(clippy::cast_precision_loss)]
let ratio = 1.0 - (active_size as f64 / current_offset as f64);
ratio.max(0.0)
}
}