use grafeo_common::storage::{Section, SectionType};
use grafeo_common::utils::error::Result;
#[cfg(feature = "grafeo-file")]
use grafeo_storage::file::GrafeoFileManager;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum FlushReason {
#[allow(dead_code)] Checkpoint,
Explicit,
}
pub(super) struct FlushContext {
pub epoch: u64,
pub transaction_id: u64,
pub node_count: u64,
pub edge_count: u64,
}
pub(super) struct FlushResult {
pub sections_written: usize,
}
#[cfg(feature = "grafeo-file")]
pub(super) fn flush(
fm: &GrafeoFileManager,
sections: &[&dyn Section],
context: &FlushContext,
reason: FlushReason,
#[cfg(feature = "wal")] wal: Option<&grafeo_storage::wal::LpgWal>,
) -> Result<FlushResult> {
use grafeo_common::testing::crash::maybe_crash;
maybe_crash("flush:before_serialize");
let mut targets: Vec<(SectionType, Vec<u8>)> = Vec::new();
for section in sections {
if reason == FlushReason::Explicit || section.is_dirty() {
targets.push((section.section_type(), section.serialize()?));
}
}
if targets.is_empty() {
return Ok(FlushResult {
sections_written: 0,
});
}
let sections_written = targets.len();
maybe_crash("flush:after_serialize");
let section_refs: Vec<(SectionType, &[u8])> =
targets.iter().map(|(t, d)| (*t, d.as_slice())).collect();
fm.write_sections(
§ion_refs,
context.epoch,
context.transaction_id,
context.node_count,
context.edge_count,
)?;
for section in sections {
if targets.iter().any(|(t, _)| *t == section.section_type()) {
section.mark_clean();
}
}
maybe_crash("flush:after_write");
#[cfg(feature = "wal")]
if let Some(wal) = wal {
wal.sync()?;
}
Ok(FlushResult { sections_written })
}
#[cfg(feature = "lpg")]
pub(super) fn build_context(
store: &grafeo_core::graph::lpg::LpgStore,
transaction_manager: &crate::transaction::TransactionManager,
) -> FlushContext {
FlushContext {
epoch: store.current_epoch().0,
transaction_id: transaction_manager
.last_assigned_transaction_id()
.map_or(0, |t| t.0),
node_count: store.node_count() as u64,
edge_count: store.edge_count() as u64,
}
}
#[cfg(not(feature = "lpg"))]
pub(super) fn build_context_minimal(
transaction_manager: &crate::transaction::TransactionManager,
) -> FlushContext {
FlushContext {
epoch: 0,
transaction_id: transaction_manager
.last_assigned_transaction_id()
.map_or(0, |t| t.0),
node_count: 0,
edge_count: 0,
}
}