#[cfg(feature = "grafeo-file")]
use std::sync::Arc;
#[cfg(feature = "grafeo-file")]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "grafeo-file")]
use std::time::Duration;
#[cfg(feature = "grafeo-file")]
use grafeo_common::utils::error::Result;
#[cfg(feature = "grafeo-file")]
use grafeo_core::graph::lpg::LpgStore;
#[cfg(feature = "grafeo-file")]
use grafeo_storage::file::GrafeoFileManager;
#[cfg(feature = "grafeo-file")]
use crate::catalog::Catalog;
#[cfg(feature = "grafeo-file")]
use crate::transaction::TransactionManager;
#[cfg(feature = "grafeo-file")]
const POLL_INTERVAL: Duration = Duration::from_millis(100);
#[cfg(feature = "grafeo-file")]
pub(super) struct CheckpointTimer {
shutdown: Arc<AtomicBool>,
handle: Option<std::thread::JoinHandle<()>>,
}
#[cfg(feature = "grafeo-file")]
impl CheckpointTimer {
pub(super) fn start(
interval: Duration,
file_manager: Arc<GrafeoFileManager>,
store: Arc<LpgStore>,
catalog: Arc<Catalog>,
transaction_manager: Arc<TransactionManager>,
#[cfg(feature = "triple-store")] rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
#[cfg(feature = "wal")] wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
) -> Self {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let handle = std::thread::Builder::new()
.name("grafeo-checkpoint".to_string())
.spawn(move || {
Self::run(
&shutdown_clone,
interval,
&file_manager,
&store,
&catalog,
&transaction_manager,
#[cfg(feature = "triple-store")]
&rdf_store,
#[cfg(feature = "wal")]
wal.as_deref(),
);
})
.expect("failed to spawn checkpoint timer thread");
Self {
shutdown,
handle: Some(handle),
}
}
pub(super) fn stop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
#[allow(clippy::too_many_arguments)]
fn run(
shutdown: &AtomicBool,
interval: Duration,
file_manager: &GrafeoFileManager,
store: &Arc<LpgStore>,
catalog: &Arc<Catalog>,
transaction_manager: &TransactionManager,
#[cfg(feature = "triple-store")] rdf_store: &Arc<grafeo_core::graph::rdf::RdfStore>,
#[cfg(feature = "wal")] wal: Option<&grafeo_storage::wal::LpgWal>,
) {
let mut elapsed = Duration::ZERO;
loop {
std::thread::sleep(POLL_INTERVAL);
if shutdown.load(Ordering::Acquire) {
break;
}
elapsed += POLL_INTERVAL;
if elapsed < interval {
continue;
}
elapsed = Duration::ZERO;
if let Err(e) = Self::try_checkpoint(
file_manager,
store,
catalog,
transaction_manager,
#[cfg(feature = "triple-store")]
rdf_store,
#[cfg(feature = "wal")]
wal,
) {
eprintln!("periodic checkpoint failed: {e}");
}
}
}
fn try_checkpoint(
file_manager: &GrafeoFileManager,
store: &Arc<LpgStore>,
catalog: &Arc<Catalog>,
transaction_manager: &TransactionManager,
#[cfg(feature = "triple-store")] rdf_store: &Arc<grafeo_core::graph::rdf::RdfStore>,
#[cfg(feature = "wal")] wal: Option<&grafeo_storage::wal::LpgWal>,
) -> Result<()> {
use super::flush;
let sections = Self::build_sections(
store,
catalog,
#[cfg(feature = "triple-store")]
rdf_store,
);
let section_refs: Vec<&dyn grafeo_common::storage::Section> =
sections.iter().map(|s| s.as_ref()).collect();
let context = flush::build_context(store, transaction_manager);
flush::flush(
file_manager,
§ion_refs,
&context,
flush::FlushReason::Explicit,
#[cfg(feature = "wal")]
wal,
)
.map(|_| ())
}
fn build_sections(
store: &Arc<LpgStore>,
catalog: &Arc<Catalog>,
#[cfg(feature = "triple-store")] rdf_store: &Arc<grafeo_core::graph::rdf::RdfStore>,
) -> Vec<Box<dyn grafeo_common::storage::Section>> {
let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
let catalog_section = super::catalog_section::CatalogSection::new(
Arc::clone(catalog),
Arc::clone(store),
|| 0,
);
let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> =
vec![Box::new(catalog_section), Box::new(lpg)];
#[cfg(feature = "triple-store")]
if !rdf_store.is_empty() || rdf_store.graph_count() > 0 {
let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
sections.push(Box::new(rdf));
}
#[cfg(feature = "ring-index")]
if rdf_store.ring().is_some() {
let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
sections.push(Box::new(ring));
}
#[cfg(feature = "vector-index")]
{
let indexes = store.vector_index_entries();
if !indexes.is_empty() {
let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
sections.push(Box::new(vector));
}
}
#[cfg(feature = "text-index")]
{
let indexes = store.text_index_entries();
if !indexes.is_empty() {
let text = grafeo_core::index::text::TextIndexSection::new(indexes);
sections.push(Box::new(text));
}
}
sections
}
}
#[cfg(feature = "grafeo-file")]
impl Drop for CheckpointTimer {
fn drop(&mut self) {
self.stop();
}
}
#[cfg(test)]
#[cfg(feature = "grafeo-file")]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn timer_stops_promptly() {
let store = Arc::new(LpgStore::new().unwrap());
let catalog = Arc::new(Catalog::new());
let tm = Arc::new(TransactionManager::new());
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("timer_test.grafeo");
let fm = Arc::new(GrafeoFileManager::create(&path).unwrap());
let mut timer = CheckpointTimer::start(
Duration::from_secs(60), fm,
store,
catalog,
tm,
#[cfg(feature = "triple-store")]
Arc::new(grafeo_core::graph::rdf::RdfStore::new()),
#[cfg(feature = "wal")]
None,
);
let start = Instant::now();
timer.stop();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop() took {elapsed:?}, expected < 2s"
);
}
#[test]
fn timer_checkpoints_on_interval() {
let store = Arc::new(LpgStore::new().unwrap());
let catalog = Arc::new(Catalog::new());
let tm = Arc::new(TransactionManager::new());
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("interval_test.grafeo");
let fm = Arc::new(GrafeoFileManager::create(&path).unwrap());
store.create_node(&["Test"]);
let mut timer = CheckpointTimer::start(
Duration::from_millis(200), Arc::clone(&fm),
Arc::clone(&store),
Arc::clone(&catalog),
Arc::clone(&tm),
#[cfg(feature = "triple-store")]
Arc::new(grafeo_core::graph::rdf::RdfStore::new()),
#[cfg(feature = "wal")]
None,
);
std::thread::sleep(Duration::from_millis(500));
timer.stop();
let header = fm.active_header();
assert!(
header.iteration > 0,
"expected at least one checkpoint, got iteration={}",
header.iteration
);
assert_eq!(header.node_count, 1);
}
#[test]
fn timer_skips_when_clean() {
let store = Arc::new(LpgStore::new().unwrap());
let catalog = Arc::new(Catalog::new());
let tm = Arc::new(TransactionManager::new());
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("clean_test.grafeo");
let fm = Arc::new(GrafeoFileManager::create(&path).unwrap());
let mut timer = CheckpointTimer::start(
Duration::from_millis(200),
Arc::clone(&fm),
Arc::clone(&store),
Arc::clone(&catalog),
Arc::clone(&tm),
#[cfg(feature = "triple-store")]
Arc::new(grafeo_core::graph::rdf::RdfStore::new()),
#[cfg(feature = "wal")]
None,
);
std::thread::sleep(Duration::from_millis(500));
timer.stop();
let header = fm.active_header();
assert!(header.iteration <= 5);
}
}