use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::RwLock;
use crate::index::temporal::TemporalIndexes;
use crate::storage::current::CurrentStorage;
use crate::storage::historical::HistoricalStorage;
use crate::storage::index_persistence::IndexPersistenceManager;
use crate::storage::wal::concurrent_system::ConcurrentWalSystem;
use super::formats::{
GraphIndexManifestEntry, IndexManifest, PersistencePolicies, StringInternerManifestEntry,
TemporalAdjacencyIndexManifestEntry,
};
use super::operations::{
persist_all_indexes, persist_graph_index, persist_string_interner, persist_temporal_index,
persist_vector_indexes,
};
use super::tracker::PersistenceTracker;
pub(crate) fn update_manifest(
manager: &Arc<IndexPersistenceManager>,
historical: &Arc<RwLock<HistoricalStorage>>,
tracker: &Arc<PersistenceTracker>,
node_count: u64,
edge_count: u64,
string_count: u64,
) {
let safe_lsn = tracker.get_safe_manifest_lsn();
let mut manifest = IndexManifest::new(safe_lsn);
if string_count > 0 {
manifest.string_interner = Some(StringInternerManifestEntry {
interner_file: "strings/interner.idx".to_string(),
string_count,
});
}
if node_count > 0 || edge_count > 0 {
manifest.graph_index = Some(GraphIndexManifestEntry {
adjacency_file: "graph/adjacency.idx".to_string(),
node_count,
edge_count,
});
}
let hist_read = historical.read();
if let Some(adj_index) = hist_read.get_temporal_adjacency_index() {
let total_entries: usize = adj_index
.outgoing
.iter()
.map(|entry| entry.value().len())
.sum();
let node_count = adj_index.outgoing.len();
if total_entries > 0 {
manifest.temporal_adjacency_index = Some(TemporalAdjacencyIndexManifestEntry {
adjacency_file: "temporal_adjacency/adjacency.idx".to_string(),
entry_count: total_entries as u64,
node_count: node_count as u64,
});
}
}
drop(hist_read);
if let Err(e) = manager.save_manifest(&manifest) {
eprintln!("Background persistence: Failed to save manifest: {}", e);
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_background_persistence_thread(
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
manager: Arc<IndexPersistenceManager>,
tracker: Arc<PersistenceTracker>,
policies: PersistencePolicies,
stopped_flag: Arc<AtomicBool>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let check_interval = std::time::Duration::from_secs(1);
while !tracker.is_shutdown() {
std::thread::sleep(check_interval);
if tracker.is_shutdown() {
break;
}
let mut any_index_persisted = false;
let snapshot_lsn = wal.current_lsn().0;
let vector_mutations = tracker.get_vector_mutations();
let vector_seconds = tracker.seconds_since_vector_persist();
if vector_mutations >= policies.vector.mutation_threshold as u64
|| vector_seconds >= policies.vector.time_interval_secs as u64
{
match persist_vector_indexes(¤t, &manager, Some(&tracker), snapshot_lsn) {
Ok(()) => any_index_persisted = true,
Err(e) => {
eprintln!(
"Background persistence: Failed to persist vector indexes: {}",
e
);
}
}
}
let graph_mutations = tracker.get_graph_mutations();
let graph_seconds = tracker.seconds_since_graph_persist();
if graph_mutations >= policies.graph.mutation_threshold as u64
|| graph_seconds >= policies.graph.time_interval_secs as u64
{
match persist_graph_index(¤t, &manager, Some(&tracker), snapshot_lsn) {
Ok(_) => any_index_persisted = true,
Err(e) => {
eprintln!(
"Background persistence: Failed to persist graph index: {}",
e
);
}
}
}
let temporal_mutations = tracker.get_temporal_mutations();
let temporal_seconds = tracker.seconds_since_temporal_persist();
if temporal_mutations >= policies.temporal.version_threshold as u64
|| temporal_seconds >= policies.temporal.time_interval_secs as u64
{
match persist_temporal_index(
&historical,
&temporal_indexes,
&manager,
&tracker,
snapshot_lsn,
) {
Ok(()) => any_index_persisted = true,
Err(e) => {
eprintln!(
"Background persistence: Failed to persist temporal index: {}",
e
);
}
}
}
let string_mutations = tracker.get_string_mutations();
let string_seconds = tracker.seconds_since_string_persist();
if string_mutations >= policies.strings.new_strings_threshold as u64
|| string_seconds >= policies.strings.time_interval_secs as u64
{
match persist_string_interner(&manager, &tracker, snapshot_lsn) {
Ok(_) => any_index_persisted = true,
Err(e) => {
eprintln!(
"Background persistence: Failed to persist string interner: {}",
e
);
}
}
}
if any_index_persisted {
update_manifest(
&manager,
&historical,
&tracker,
tracker.get_last_persisted_node_count(),
tracker.get_last_persisted_edge_count(),
tracker.get_last_persisted_string_count(),
);
}
}
let _ = persist_all_indexes(
¤t,
&historical,
&temporal_indexes,
&wal,
&manager,
&tracker,
);
}));
stopped_flag.store(true, Ordering::Release);
match result {
Ok(()) => {
}
Err(e) => {
eprintln!("CRITICAL: Background persistence thread panicked: {:?}", e);
eprintln!(
"Database will continue running but NO FURTHER INDEX PERSISTENCE will occur."
);
eprintln!("You MUST restart the database to restore automatic persistence.");
}
}
})
}