use crate::persistence::directory as vicinity_dir;
use crate::persistence::error::{PersistenceError, PersistenceResult};
use std::sync::Arc;
pub use crate::persistence::format::GraphWalEntry;
pub use durability::walog::{WalEntry, WalRecord, WalReplayMode};
fn to_durability_err(e: PersistenceError) -> durability::PersistenceError {
match e {
PersistenceError::Io(e) => durability::PersistenceError::Io(e),
PersistenceError::Format(s) => durability::PersistenceError::Format(s),
PersistenceError::Serialization(s) => durability::PersistenceError::Encode(s),
PersistenceError::Deserialization(s) => durability::PersistenceError::Decode(s),
PersistenceError::ChecksumMismatch { expected, actual } => {
durability::PersistenceError::CrcMismatch { expected, actual }
}
PersistenceError::LockFailed { resource, reason } => {
durability::PersistenceError::LockFailed { resource, reason }
}
PersistenceError::InvalidState(s) => durability::PersistenceError::InvalidState(s),
PersistenceError::NotFound(s) => durability::PersistenceError::NotFound(s),
PersistenceError::InvalidConfig(s) => durability::PersistenceError::InvalidConfig(s),
PersistenceError::NotSupported(s) => durability::PersistenceError::NotSupported(s),
}
}
#[derive(Clone)]
struct DirAdapter {
inner: Arc<dyn vicinity_dir::Directory>,
}
impl durability::Directory for DirAdapter {
fn create_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Write + Send>> {
self.inner.create_file(path).map_err(to_durability_err)
}
fn open_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Read + Send>> {
self.inner.open_file(path).map_err(to_durability_err)
}
fn exists(&self, path: &str) -> bool {
self.inner.exists(path)
}
fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.delete(path).map_err(to_durability_err)
}
fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
self.inner
.atomic_rename(from, to)
.map_err(to_durability_err)
}
fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.create_dir_all(path).map_err(to_durability_err)
}
fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
self.inner.list_dir(path).map_err(to_durability_err)
}
fn append_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Write + Send>> {
self.inner.append_file(path).map_err(to_durability_err)
}
fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
self.inner
.atomic_write(path, data)
.map_err(to_durability_err)
}
fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
self.inner.file_path(path)
}
}
pub struct WalWriter {
inner: durability::walog::WalWriter<WalEntry>,
}
impl WalWriter {
pub fn new(directory: impl Into<Arc<dyn vicinity_dir::Directory>>) -> Self {
let inner_dir: Arc<dyn durability::Directory> = Arc::new(DirAdapter {
inner: directory.into(),
});
Self {
inner: durability::walog::WalWriter::new(inner_dir),
}
}
pub fn append(&mut self, entry: WalEntry) -> PersistenceResult<u64> {
self.inner.append(&entry).map_err(PersistenceError::from)
}
pub fn flush(&mut self) -> PersistenceResult<()> {
self.inner.flush().map_err(PersistenceError::from)
}
}
pub struct WalReader {
inner: durability::walog::WalReader<WalEntry>,
}
impl WalReader {
pub fn new(directory: impl Into<Arc<dyn vicinity_dir::Directory>>) -> Self {
let inner_dir: Arc<dyn durability::Directory> = Arc::new(DirAdapter {
inner: directory.into(),
});
Self {
inner: durability::walog::WalReader::new(inner_dir),
}
}
pub fn replay(&self) -> PersistenceResult<Vec<WalRecord<WalEntry>>> {
self.inner.replay().map_err(PersistenceError::from)
}
pub fn replay_best_effort(&self) -> PersistenceResult<Vec<WalRecord<WalEntry>>> {
self.inner
.replay_best_effort()
.map_err(PersistenceError::from)
}
}
pub struct GraphWalWriter {
inner: durability::walog::WalWriter<GraphWalEntry>,
}
impl GraphWalWriter {
pub fn new(directory: impl Into<Arc<dyn vicinity_dir::Directory>>) -> Self {
let inner_dir: Arc<dyn durability::Directory> = Arc::new(GraphDirAdapter {
inner: DirAdapter {
inner: directory.into(),
},
});
Self {
inner: durability::walog::WalWriter::new(inner_dir),
}
}
pub fn append(&mut self, entry: GraphWalEntry) -> PersistenceResult<u64> {
self.inner.append(&entry).map_err(PersistenceError::from)
}
pub fn flush(&mut self) -> PersistenceResult<()> {
self.inner.flush().map_err(PersistenceError::from)
}
}
pub struct GraphWalReader {
inner: durability::walog::WalReader<GraphWalEntry>,
}
impl GraphWalReader {
pub fn new(directory: impl Into<Arc<dyn vicinity_dir::Directory>>) -> Self {
let inner_dir: Arc<dyn durability::Directory> = Arc::new(GraphDirAdapter {
inner: DirAdapter {
inner: directory.into(),
},
});
Self {
inner: durability::walog::WalReader::new(inner_dir),
}
}
pub fn replay(&self) -> PersistenceResult<Vec<WalRecord<GraphWalEntry>>> {
self.inner.replay().map_err(PersistenceError::from)
}
pub fn replay_best_effort(&self) -> PersistenceResult<Vec<WalRecord<GraphWalEntry>>> {
self.inner
.replay_best_effort()
.map_err(PersistenceError::from)
}
}
#[derive(Clone)]
struct GraphDirAdapter {
inner: DirAdapter,
}
impl durability::Directory for GraphDirAdapter {
fn create_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Write + Send>> {
self.inner.create_file(&remap_graph_wal_path(path))
}
fn open_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Read + Send>> {
self.inner.open_file(&remap_graph_wal_path(path))
}
fn exists(&self, path: &str) -> bool {
self.inner.exists(&remap_graph_wal_path(path))
}
fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.delete(&remap_graph_wal_path(path))
}
fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
self.inner
.atomic_rename(&remap_graph_wal_path(from), &remap_graph_wal_path(to))
}
fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.create_dir_all(&remap_graph_wal_path(path))
}
fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
self.inner.list_dir(&remap_graph_wal_path(path))
}
fn append_file(
&self,
path: &str,
) -> durability::PersistenceResult<Box<dyn std::io::Write + Send>> {
self.inner.append_file(&remap_graph_wal_path(path))
}
fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
self.inner.atomic_write(&remap_graph_wal_path(path), data)
}
fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
self.inner.file_path(&remap_graph_wal_path(path))
}
}
fn remap_graph_wal_path(path: &str) -> String {
if let Some(rest) = path.strip_prefix("wal/") {
format!("graph_wal/{}", rest)
} else if path == "wal" {
"graph_wal".to_string()
} else {
path.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::persistence::directory::MemoryDirectory;
#[test]
fn wal_write_read_roundtrip() {
let dir: Arc<dyn vicinity_dir::Directory> = Arc::new(MemoryDirectory::new());
let mut w = WalWriter::new(dir.clone());
w.append(WalEntry::AddSegment {
segment_id: 1,
doc_count: 10,
})
.unwrap();
w.append(WalEntry::DeleteDocuments {
deletes: vec![(1, 5)],
})
.unwrap();
w.flush().unwrap();
let r = WalReader::new(dir);
let entries = r.replay().unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn graph_wal_write_read_roundtrip() {
let dir: Arc<dyn vicinity_dir::Directory> = Arc::new(MemoryDirectory::new());
let mut gw = GraphWalWriter::new(dir.clone());
gw.append(GraphWalEntry::InsertNode {
doc_id: 42,
level: 1,
vector: vec![1.0, 2.0, 3.0],
neighbors_per_level: vec![vec![10, 20], vec![30]],
})
.unwrap();
gw.append(GraphWalEntry::UpdateNeighbors {
node_id: 10,
level: 0,
neighbors: vec![42, 99],
})
.unwrap();
gw.append(GraphWalEntry::DeleteNode { doc_id: 99 }).unwrap();
gw.flush().unwrap();
let gr = GraphWalReader::new(dir.clone());
let entries = gr.replay().unwrap();
assert_eq!(entries.len(), 3);
match &entries[0].payload {
GraphWalEntry::InsertNode {
doc_id,
level,
vector,
..
} => {
assert_eq!(*doc_id, 42);
assert_eq!(*level, 1);
assert_eq!(vector.len(), 3);
}
other => panic!("expected InsertNode, got {:?}", other),
}
let segment_reader = WalReader::new(dir);
let segment_entries = segment_reader.replay().unwrap();
assert_eq!(
segment_entries.len(),
0,
"graph WAL should not pollute segment WAL"
);
}
}