use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use fjall::config::{BlockSizePolicy, PinningPolicy};
use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode};
use crate::artifact::{ExportManifest, ExportStage, verify_and_stage_import};
use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
use crate::snapshot::{SnapshotError, SnapshotStore};
use crate::snapshot_record::{decode_entry, encode_value_into};
const DATA_PARTITION: &str = "data";
const META_PARTITION: &str = "meta";
const CURSOR_KEY: &[u8] = b"cursor";
const MAX_WORKER_THREADS: usize = 16;
const DATA_MEMTABLE_BYTES: u64 = 256 << 20;
const META_MEMTABLE_BYTES: u64 = 8 << 20;
const DATA_BLOCK_SIZE: u32 = 16 * 1024;
#[derive(Debug, Clone, Copy)]
pub struct FjallConfig {
pub sync: bool,
pub cache_size_bytes: u64,
}
impl Default for FjallConfig {
fn default() -> Self {
Self {
sync: false,
cache_size_bytes: 1024 * 1024 * 1024,
}
}
}
pub struct FjallSnapshot {
db: Database,
data: Keyspace,
meta: Keyspace,
config: FjallConfig,
cursor: WatchCursor,
path: PathBuf,
}
impl FjallSnapshot {
pub fn open(path: &Path, config: FjallConfig) -> Result<(WatchCursor, Self), SnapshotError> {
std::fs::create_dir_all(path)?;
let workers = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(4)
.min(MAX_WORKER_THREADS);
let mut builder = Database::builder(path).worker_threads(workers);
if config.cache_size_bytes > 0 {
builder = builder.cache_size(config.cache_size_bytes);
}
let db: Database = builder.open().map_err(map_fjall)?;
let data = db
.keyspace(DATA_PARTITION, || {
KeyspaceCreateOptions::default()
.max_memtable_size(DATA_MEMTABLE_BYTES)
.data_block_size_policy(BlockSizePolicy::all(DATA_BLOCK_SIZE))
.filter_block_pinning_policy(PinningPolicy::new([true, true, false]))
})
.map_err(map_fjall)?;
let meta = db
.keyspace(META_PARTITION, || {
KeyspaceCreateOptions::default().max_memtable_size(META_MEMTABLE_BYTES)
})
.map_err(map_fjall)?;
let cursor = match meta.get(CURSOR_KEY).map_err(map_fjall)? {
Some(raw) => VersionToken::from_raw(&raw)
.map(WatchCursor::from_version)
.ok_or_else(|| {
SnapshotError::InvalidFormat(format!(
"stored cursor is {} bytes, exceeds version token capacity",
raw.len()
))
})?,
None => WatchCursor::none(),
};
Ok((
cursor.clone(),
Self {
db,
data,
meta,
config,
cursor,
path: path.to_path_buf(),
},
))
}
pub fn reader(&self) -> FjallReader {
FjallReader {
data: self.data.clone(),
}
}
pub fn settle(&self) -> Result<(), SnapshotError> {
self.data.major_compact().map_err(map_fjall)
}
pub fn import(
artifact_dir: &Path,
dest_dir: &Path,
config: FjallConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (manifest, stage) =
verify_and_stage_import(artifact_dir, dest_dir, Self::BACKEND, |v| {
if v == Self::BACKEND_VERSION {
Ok(())
} else {
Err(SnapshotError::ArtifactInvalid(format!(
"fjall artifact has on-disk format generation {v:?}, this build reads {:?}",
Self::BACKEND_VERSION
)))
}
})?;
{
let (staged_cursor, _verify) = Self::open(
&stage.payload(),
FjallConfig {
sync: true,
cache_size_bytes: 0,
},
)?;
if staged_cursor != manifest.cursor {
return Err(SnapshotError::ArtifactInvalid(format!(
"payload cursor {staged_cursor:?} disagrees with manifest cursor {:?}",
manifest.cursor
)));
}
}
stage.finalize_dir()?;
Self::open(dest_dir, config)
}
}
impl FjallSnapshot {
pub(crate) const BACKEND: &'static str = "fjall";
pub(crate) const BACKEND_VERSION: &'static str = "3";
const QUIESCE_TIMEOUT: Duration = Duration::from_secs(10);
fn quiesce(&self) {
for ks in [&self.data, &self.meta] {
if let Err(e) = ks.rotate_memtable_and_wait() {
tracing::warn!(error = %e, "fjall export quiesce: memtable rotation failed; proceeding");
return;
}
}
let deadline = Instant::now() + Self::QUIESCE_TIMEOUT;
while (self.db.outstanding_flushes() > 0 || self.db.active_compactions() > 0)
&& Instant::now() < deadline
{
std::thread::sleep(Duration::from_millis(50));
}
}
}
fn is_immutable_payload(rel: &Path) -> bool {
use std::path::Component;
rel.components()
.any(|c| matches!(c, Component::Normal(n) if n == "tables" || n == "blobs"))
}
fn copy_db_dir(src: &Path, dst: &Path) -> Result<(), SnapshotError> {
std::fs::create_dir_all(dst)?;
let mut stack = vec![src.to_path_buf()];
while let Some(dir) = stack.pop() {
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
let ty = entry.file_type()?;
let rel = entry
.path()
.strip_prefix(src)
.map_err(|_| SnapshotError::Backend("fjall copy escaped the DB root".into()))?
.to_path_buf();
let to = dst.join(&rel);
if ty.is_dir() {
std::fs::create_dir_all(&to)?;
stack.push(entry.path());
} else if ty.is_file() {
if is_immutable_payload(&rel) {
if std::fs::hard_link(entry.path(), &to).is_err() {
std::fs::copy(entry.path(), &to)?;
}
} else {
std::fs::copy(entry.path(), &to)?;
}
}
}
}
Ok(())
}
#[derive(Clone)]
pub struct FjallReader {
data: Keyspace,
}
impl FjallReader {
pub fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
match self.data.get(key.as_bytes()).map_err(map_fjall)? {
Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
None => Ok(None),
}
}
pub fn for_each_in_range(
&self,
prefix: &str,
mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
for guard in self.data.prefix(prefix.as_bytes()) {
let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?;
let key = std::str::from_utf8(&raw_key).map_err(|e| {
SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}"))
})?;
f(decode_entry(key, &raw_val)?)?;
}
Ok(())
}
pub fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
let mut out = Vec::new();
self.for_each_in_range(prefix, |e| {
out.push(e);
Ok(())
})?;
Ok(out)
}
}
impl SnapshotStore for FjallSnapshot {
fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
Self::open(path, FjallConfig::default())
}
fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
let mut wb = self.db.batch().durability(self.durability());
let mut scratch = Vec::new();
for update in batch {
match update {
KvUpdate::Put(entry) => {
encode_value_into(&mut scratch, &entry.value, &entry.version)?;
wb.insert(&self.data, entry.key.as_bytes(), scratch.as_slice());
}
KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
wb.remove(&self.data, key.as_bytes());
}
}
}
wb.insert(&self.meta, CURSOR_KEY, cursor.version().as_bytes());
wb.commit().map_err(map_fjall)?;
self.cursor = cursor.clone();
Ok(())
}
fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
match self.data.get(key.as_bytes()).map_err(map_fjall)? {
Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
None => Ok(None),
}
}
fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
let mut out = Vec::new();
self.for_each_in_range(prefix, |entry| {
out.push(entry);
Ok(())
})?;
Ok(out)
}
fn for_each_in_range(
&self,
prefix: &str,
mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
for guard in self.data.prefix(prefix.as_bytes()) {
let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?;
let key = std::str::from_utf8(&raw_key).map_err(|e| {
SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}"))
})?;
f(decode_entry(key, &raw_val)?)?;
}
Ok(())
}
fn cursor(&self) -> WatchCursor {
self.cursor.clone()
}
fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
let stage = ExportStage::new(dest_dir)?;
let payload = stage.payload();
self.db.persist(PersistMode::SyncAll).map_err(map_fjall)?;
self.quiesce();
let mut attempt = 0;
loop {
attempt += 1;
match copy_db_dir(&self.path, &payload) {
Ok(()) => break,
Err(SnapshotError::Io(e))
if e.kind() == std::io::ErrorKind::NotFound && attempt < 3 =>
{
tracing::warn!(attempt, "fjall export copy raced background GC; retrying");
std::fs::remove_dir_all(&payload)?;
}
Err(e) => return Err(e),
}
}
{
let (staged_cursor, _verify) = Self::open(
&payload,
FjallConfig {
sync: true,
cache_size_bytes: 0,
},
)?;
if staged_cursor != self.cursor {
return Err(SnapshotError::ArtifactInvalid(format!(
"exported copy recovered cursor {staged_cursor:?}, live fold is at {:?}",
self.cursor
)));
}
}
stage.seal_and_finalize(Self::BACKEND, Self::BACKEND_VERSION, &self.cursor)
}
}
impl FjallSnapshot {
fn durability(&self) -> Option<PersistMode> {
if self.config.sync {
Some(PersistMode::SyncAll)
} else {
Some(PersistMode::Buffer)
}
}
}
fn map_fjall(e: fjall::Error) -> SnapshotError {
match e {
fjall::Error::Io(io) => SnapshotError::Io(io),
other => SnapshotError::Backend(other.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn open_rejects_corrupted_cursor() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("store");
{
let (_c, store) =
FjallSnapshot::open(&path, FjallConfig::default()).expect("initial open");
store
.meta
.insert(CURSOR_KEY, [0u8; 11])
.expect("insert oversized cursor");
store.db.persist(PersistMode::SyncAll).expect("persist");
}
match FjallSnapshot::open(&path, FjallConfig::default()) {
Err(SnapshotError::InvalidFormat(_)) => {}
Err(other) => panic!("expected InvalidFormat, got {other:?}"),
Ok(_) => panic!("expected open to reject the oversized cursor"),
}
}
}