use super::{BatchGet, BatchPut, GetResult, Pager};
use crate::constants::CATALOG_ROOT_PKEY;
use crate::types::PhysicalKey;
use llkv_result::{Error, Result};
use simd_r_drive::{
DataStore,
traits::{DataStoreReader, DataStoreWriter},
};
use simd_r_drive_entry_handle::EntryHandle;
use std::fmt;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
pub struct SimdRDrivePager {
ds: DataStore,
next_key: AtomicU64,
}
impl fmt::Debug for SimdRDrivePager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let next = self.next_key.load(Ordering::Relaxed);
f.debug_struct("SimdRDrivePager")
.field("next_key", &next)
.finish()
}
}
impl SimdRDrivePager {
pub fn open(path: &Path) -> Result<Self> {
let ds = DataStore::open(path)
.map_err(|e| Error::Io(std::io::Error::other(format!("open DataStore failed: {e}"))))?;
let mut max_key = CATALOG_ROOT_PKEY;
for eh in ds.iter_entries() {
let k = eh.key_hash();
if k > max_key {
max_key = k;
}
}
Ok(Self {
ds,
next_key: AtomicU64::new(max_key.saturating_add(1)),
})
}
pub fn datastore(&self) -> &DataStore {
&self.ds
}
}
impl Pager for SimdRDrivePager {
type Blob = EntryHandle;
fn alloc_many(&self, n: usize) -> Result<Vec<PhysicalKey>> {
let n = u64::try_from(n)
.map_err(|_| Error::Internal("alloc_many: n does not fit in u64".into()))?;
let start = self
.next_key
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
cur.checked_add(n)
})
.map_err(|_| Error::Internal("physical key space overflow".into()))?;
Ok((start..start + n).collect())
}
fn batch_put(&self, puts: &[BatchPut]) -> Result<()> {
let mut entries: Vec<(u64, &[u8])> = Vec::with_capacity(puts.len());
for p in puts {
match p {
BatchPut::Raw { key, bytes } => {
if bytes.is_empty() {
return Err(Error::Internal("empty payload not allowed".into()));
}
entries.push((*key, bytes.as_slice()));
}
}
}
self.ds
.batch_write_with_key_hashes(entries, false)
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"DataStore write failed: {e}"
)))
})?;
Ok(())
}
fn batch_get(&self, gets: &[BatchGet]) -> Result<Vec<GetResult<Self::Blob>>> {
let mut out = Vec::with_capacity(gets.len());
for g in gets {
match *g {
BatchGet::Raw { key } => {
match self.ds.read_with_key_hash(key).map_err(|e| {
Error::Io(std::io::Error::other(format!("DataStore read failed: {e}")))
})? {
Some(h) => out.push(GetResult::Raw { key, bytes: h }),
None => out.push(GetResult::Missing { key }),
}
}
}
}
Ok(out)
}
fn free_many(&self, keys: &[PhysicalKey]) -> Result<()> {
self.ds.batch_delete_key_hashes(keys).map_err(|e| {
Error::Io(std::io::Error::other(format!(
"DataStore delete failed: {e}"
)))
})?;
Ok(())
}
}