use std::sync::Arc;
use selene_core::{Change, GraphId, NodeId};
use super::{append_wal, node_created, temp_dir};
use crate::{IndexProvider, ProviderError, ProviderTag, SharedGraph, SubTag};
struct NoopIndexProvider {
tag: ProviderTag,
}
impl NoopIndexProvider {
const fn new(tag: ProviderTag) -> Self {
Self { tag }
}
}
impl IndexProvider for NoopIndexProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub_tag: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, _change: &Change) -> Result<(), ProviderError> {
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn recover_with_providers_replays_wal() {
let dir = temp_dir("provider-replay");
append_wal(&dir, 0, &[node_created(1)]);
let tag = ProviderTag(*b"NOOP");
let provider: Arc<dyn IndexProvider> = Arc::new(NoopIndexProvider::new(tag));
let recovered =
SharedGraph::recover_with_providers(&dir, GraphId::new(7), vec![provider]).unwrap();
assert!(recovered.read().is_node_alive(NodeId::new(1)));
let _ = std::fs::remove_dir_all(dir);
}
struct FailingIndexProvider {
tag: ProviderTag,
}
impl FailingIndexProvider {
const fn new(tag: ProviderTag) -> Self {
Self { tag }
}
}
impl IndexProvider for FailingIndexProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub_tag: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, _change: &Change) -> Result<(), ProviderError> {
Err(ProviderError::Inconsistent {
reason: "synthetic index-provider on_change failure".to_owned(),
})
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn recover_with_providers_propagates_index_on_change_error() {
let dir = temp_dir("provider-replay-fail");
append_wal(&dir, 0, &[node_created(1)]);
let provider: Arc<dyn IndexProvider> =
Arc::new(FailingIndexProvider::new(ProviderTag(*b"FAIL")));
let err = match SharedGraph::recover_with_providers(&dir, GraphId::new(7), vec![provider]) {
Ok(_) => panic!("recovery must fail when an index provider's on_change errors"),
Err(error) => error,
};
let crate::GraphError::Persist(selene_persist::PersistError::ProviderFailed {
provider,
source,
..
}) = &err
else {
panic!("expected PersistError::ProviderFailed, got {err:?}");
};
assert_eq!(
*provider, *b"FAIL",
"the failing provider's tag is surfaced"
);
assert!(
format!("{source}").contains("synthetic index-provider on_change failure"),
"the boxed provider error must surface verbatim, got: {source}",
);
let _ = std::fs::remove_dir_all(dir);
}
struct FailingBatchIndexProvider {
tag: ProviderTag,
}
impl FailingBatchIndexProvider {
const fn new(tag: ProviderTag) -> Self {
Self { tag }
}
}
impl IndexProvider for FailingBatchIndexProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub_tag: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, _change: &Change) -> Result<(), ProviderError> {
Ok(())
}
fn handles_change_batches(&self) -> bool {
true
}
fn on_changes(&self, changes: &[Change]) -> Result<(), ProviderError> {
if changes
.iter()
.any(|change| matches!(change, Change::GraphReset {}))
{
return Err(ProviderError::Inconsistent {
reason: format!(
"synthetic index-provider on_changes failure after {} replayed changes",
changes.len()
),
});
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn recover_with_providers_propagates_index_on_changes_error_for_reset() {
let dir = temp_dir("provider-replay-batch-fail");
append_wal(&dir, 0, &[node_created(1), Change::GraphReset {}]);
let provider: Arc<dyn IndexProvider> =
Arc::new(FailingBatchIndexProvider::new(ProviderTag(*b"BFL1")));
let err = match SharedGraph::recover_with_providers(&dir, GraphId::new(7), vec![provider]) {
Ok(_) => panic!("recovery must fail when an index provider's on_changes errors"),
Err(error) => error,
};
let crate::GraphError::Persist(selene_persist::PersistError::ProviderFailed {
provider,
source,
..
}) = &err
else {
panic!("expected PersistError::ProviderFailed, got {err:?}");
};
assert_eq!(
*provider, *b"BFL1",
"the failing provider's tag is surfaced"
);
let source = format!("{source}");
assert!(
source.contains("synthetic index-provider on_changes failure"),
"the boxed provider error must surface verbatim, got: {source}",
);
assert!(
source.contains("2 replayed changes"),
"the provider must see the whole WAL commit batch, got: {source}",
);
let _ = std::fs::remove_dir_all(dir);
}