use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::SyncSender;
use std::time::Instant;
use arc_swap::ArcSwap;
use selene_core::{Change, HlcTimestamp, metrics};
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::index_provider::IndexProvider;
use crate::write_txn::{CommitOutcome, CommitWarning, SealedCommit};
fn commit_timestamp(durable_providers: &[Arc<dyn DurableProvider>]) -> HlcTimestamp {
durable_providers
.first()
.map_or_else(HlcTimestamp::zero, |provider| provider.next_timestamp())
}
pub(crate) struct AppendedCommit {
pub(crate) next_snapshot: Arc<SeleneGraph>,
pub(crate) changes: Vec<Change>,
pub(crate) fanout_changes: Option<Vec<Change>>,
pub(crate) principal: Option<Arc<[u8]>>,
pub(crate) schema_changed: bool,
pub(crate) generation: u64,
pub(crate) next_node_id: u64,
pub(crate) next_edge_id: u64,
pub(crate) warnings: Vec<CommitWarning>,
pub(crate) durable_at: Option<u64>,
pub(crate) reply: Option<SyncSender<GraphResult<CommitOutcome>>>,
pub(crate) started: Instant,
}
const _: fn() = || {
fn assert_send_static<T: Send + 'static>() {}
assert_send_static::<AppendedCommit>();
};
pub(crate) fn append_sealed(
sealed: SealedCommit,
durable_providers: &[Arc<dyn DurableProvider>],
) -> GraphResult<AppendedCommit> {
let started = Instant::now();
let SealedCommit {
seal_seq: _,
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
} = sealed;
#[cfg(debug_assertions)]
if let Err(reason) = next_snapshot.assert_indexes_consistent() {
panic!("selene-graph: pre-publish index consistency violation: {reason}");
}
let timestamp = commit_timestamp(durable_providers);
let mut durable_at: Option<u64> = None;
for durable in durable_providers {
let seq = durable
.write_commit(principal.as_ref(), &changes, timestamp)
.map_err(|error| GraphError::Durable {
reason: format!("{}: {error}", durable.provider_tag()),
})?;
durable_at = Some(durable_at.map_or(seq, |highest| highest.max(seq)));
}
Ok(AppendedCommit {
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
durable_at,
reply: None,
started,
})
}
pub(crate) fn flush_durables(durable_providers: &[Arc<dyn DurableProvider>]) -> GraphResult<()> {
for durable in durable_providers {
durable.flush().map_err(|error| GraphError::Durable {
reason: format!("{}: {error}", durable.provider_tag()),
})?;
}
Ok(())
}
pub(crate) fn publish_appended(
appended: AppendedCommit,
snapshot: &ArcSwap<SeleneGraph>,
schema_version: &AtomicU64,
providers: &[Arc<dyn IndexProvider>],
) -> CommitOutcome {
let AppendedCommit {
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
durable_at,
reply: _,
started,
} = appended;
#[cfg(test)]
publish_panic_inject::maybe_panic();
snapshot.store(Arc::clone(&next_snapshot));
if schema_changed {
schema_version.fetch_add(1, Ordering::AcqRel);
}
let fanout: &[Change] = fanout_changes.as_deref().unwrap_or(&changes);
{
let _fanout_guard = crate::reentry::FanoutGuard::enter();
crate::provider_fanout::notify_providers(providers, generation, fanout);
}
metrics::counter_inc(metrics::COMMITS_TOTAL);
metrics::histogram_record(
metrics::COMMIT_DURATION_SECONDS,
started.elapsed().as_secs_f64(),
);
metrics::gauge_set(metrics::GRAPH_NODES, next_snapshot.node_count() as f64);
metrics::gauge_set(metrics::GRAPH_EDGES, next_snapshot.edge_count() as f64);
CommitOutcome {
generation,
changes,
principal,
durable_at,
next_node_id,
next_edge_id,
warnings,
}
}
pub(super) fn expand_truncates_for_fanout(
changes: &[Change],
expansions: &[(usize, Vec<Change>)],
) -> Option<Vec<Change>> {
if expansions.is_empty() {
return None;
}
let mut view = Vec::with_capacity(changes.len());
for (index, change) in changes.iter().enumerate() {
match change {
Change::NodesOfTypeTruncated { .. }
| Change::EdgesOfTypeTruncated { .. }
| Change::GraphReset { .. } => {
if let Some((_, expansion)) = expansions.iter().find(|(staged, _)| *staged == index)
{
view.extend(expansion.iter().cloned());
}
}
other => view.push(other.clone()),
}
}
Some(view)
}
#[cfg(test)]
pub(crate) mod publish_panic_inject {
use std::cell::Cell;
thread_local! {
static COUNTDOWN: Cell<Option<u32>> = const { Cell::new(None) };
}
pub(crate) fn arm(after: u32) {
COUNTDOWN.with(|cell| cell.set(Some(after)));
}
pub(crate) fn maybe_panic() {
COUNTDOWN.with(|cell| {
if let Some(remaining) = cell.get() {
let next = remaining.saturating_sub(1);
if next == 0 {
cell.set(None);
panic!("selene-graph test: injected Stage-3 publish_appended panic");
}
cell.set(Some(next));
}
});
}
}