use std::sync::mpsc::SyncSender;
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use std::time::Instant;
use arc_swap::ArcSwap;
use parking_lot::{MutexGuard, RwLockWriteGuard};
use selene_core::{Change, HlcTimestamp, metrics};
use crate::committer::Committer;
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::id_allocator::IdAllocator;
use crate::index_provider::IndexProvider;
use crate::mutator::Mutator;
use crate::type_validator::TypeWarning;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CommitWarning {
pub warning: TypeWarning,
}
#[derive(Clone, Debug, PartialEq)]
pub struct CommitOutcome {
pub generation: u64,
pub changes: Vec<Change>,
pub principal: Option<Arc<[u8]>>,
pub durable_at: Option<u64>,
pub next_node_id: u64,
pub next_edge_id: u64,
pub warnings: Vec<CommitWarning>,
}
pub(crate) struct SealedCommit {
pub(crate) seal_seq: u64,
pub(crate) next_snapshot: Arc<SeleneGraph>,
pub(crate) changes: Vec<Change>,
pub(crate) fanout_changes: Option<Vec<Change>>,
pub(crate) principal: Option<Arc<[u8]>>,
pub(crate) schema_changed: bool,
pub(crate) generation: u64,
pub(crate) next_node_id: u64,
pub(crate) next_edge_id: u64,
pub(crate) warnings: Vec<CommitWarning>,
}
const _: fn() = || {
fn assert_send_static<T: Send + 'static>() {}
assert_send_static::<SealedCommit>();
};
pub struct WriteTxn<'g> {
pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
pub(crate) committer: Committer,
pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
pub(crate) allocator: MutexGuard<'g, IdAllocator>,
pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
pub(crate) changes: Vec<Change>,
pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
pub(crate) warnings: Vec<CommitWarning>,
}
impl<'g> WriteTxn<'g> {
pub(crate) fn new(
guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
committer: Committer,
allocator: MutexGuard<'g, IdAllocator>,
providers: Arc<[Arc<dyn IndexProvider>]>,
) -> Self {
let pre_txn = Some(Arc::clone(&*guard));
Self {
guard,
committer,
pre_txn,
allocator,
providers,
changes: Vec::new(),
truncate_expansions: Vec::new(),
warnings: Vec::new(),
}
}
#[must_use]
pub fn mutator(&mut self) -> Mutator<'_, 'g> {
Mutator::new(self)
}
#[must_use]
pub fn read(&self) -> &SeleneGraph {
self.guard.as_ref()
}
pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
Arc::make_mut(&mut *self.guard)
}
pub fn commit(self) -> GraphResult<CommitOutcome> {
self.commit_with_principal(None)
}
#[tracing::instrument(
name = "selene.graph.commit",
skip(self, principal),
fields(change_count = self.change_count())
)]
pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
let committer = self.committer.clone();
let sealed = self.seal(principal, None)?;
committer.submit_commit(sealed)
}
pub(crate) fn seal(
mut self,
principal: Option<Arc<[u8]>>,
cancel: Option<&AtomicBool>,
) -> GraphResult<SealedCommit> {
debug_assert!(
self.pre_txn.is_some(),
"pre_txn must be present at seal entry"
);
let schema_changed = self
.changes
.iter()
.any(|change| matches!(change, Change::SchemaChanged { .. }));
let next_node_id = self.allocator.peek_next_node();
let next_edge_id = self.allocator.peek_next_edge();
{
let graph = self.guard_mut();
graph.meta.generation = graph
.meta
.generation
.checked_add(1)
.expect("graph generation exhausted");
graph.meta.next_node_id = next_node_id;
graph.meta.next_edge_id = next_edge_id;
}
let generation = self.read().meta.generation;
let mut validation_warnings = Vec::new();
if let Some(type_def) = self.read().meta.bound_type.as_deref() {
for change in &self.changes {
validation_warnings.extend(
crate::type_validator::validate_change(change, self.read(), type_def)?
.into_iter()
.map(|warning| CommitWarning { warning }),
);
}
if schema_changed {
validation_warnings.extend(
crate::type_validator::validate_entity_state(self.read(), type_def)?
.into_iter()
.map(|warning| CommitWarning { warning }),
);
} else {
crate::type_validator::validate_unique_property_changes(
&self.changes,
self.read(),
type_def,
)?;
}
}
for warning in validation_warnings {
if !self.warnings.contains(&warning) {
self.warnings.push(warning);
}
}
if let Some(flag) = cancel
&& flag.load(Ordering::Acquire)
{
return Err(GraphError::Cancelled);
}
let seal_seq = self.committer.next_seal_seq();
self.pre_txn = None;
let next_snapshot = Arc::clone(&*self.guard);
let changes = std::mem::take(&mut self.changes);
let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
let warnings = std::mem::take(&mut self.warnings);
let fanout_changes = expand_truncates_for_fanout(&changes, &truncate_expansions);
Ok(SealedCommit {
seal_seq,
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
})
}
pub fn rollback(self) {}
#[must_use]
pub fn change_count(&self) -> usize {
self.changes.len()
}
#[must_use]
pub fn has_schema_changes(&self) -> bool {
self.changes
.iter()
.any(|change| matches!(change, Change::SchemaChanged { .. }))
}
}
impl Drop for WriteTxn<'_> {
fn drop(&mut self) {
if let Some(prior) = self.pre_txn.take() {
*self.guard = prior;
}
}
}
fn commit_timestamp(durable_providers: &[Arc<dyn DurableProvider>]) -> HlcTimestamp {
durable_providers
.first()
.map_or_else(HlcTimestamp::zero, |provider| provider.next_timestamp())
}
pub(crate) struct AppendedCommit {
pub(crate) next_snapshot: Arc<SeleneGraph>,
pub(crate) changes: Vec<Change>,
pub(crate) fanout_changes: Option<Vec<Change>>,
pub(crate) principal: Option<Arc<[u8]>>,
pub(crate) schema_changed: bool,
pub(crate) generation: u64,
pub(crate) next_node_id: u64,
pub(crate) next_edge_id: u64,
pub(crate) warnings: Vec<CommitWarning>,
pub(crate) durable_at: Option<u64>,
pub(crate) reply: Option<SyncSender<GraphResult<CommitOutcome>>>,
pub(crate) started: Instant,
}
const _: fn() = || {
fn assert_send_static<T: Send + 'static>() {}
assert_send_static::<AppendedCommit>();
};
pub(crate) fn append_sealed(
sealed: SealedCommit,
durable_providers: &[Arc<dyn DurableProvider>],
) -> GraphResult<AppendedCommit> {
let started = Instant::now();
let SealedCommit {
seal_seq: _,
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
} = sealed;
#[cfg(debug_assertions)]
if let Err(reason) = next_snapshot.assert_indexes_consistent() {
panic!("selene-graph: pre-publish index consistency violation: {reason}");
}
let timestamp = commit_timestamp(durable_providers);
let mut durable_at: Option<u64> = None;
for durable in durable_providers {
let seq = durable
.write_commit(principal.as_ref(), &changes, timestamp)
.map_err(|error| GraphError::Durable {
reason: format!("{}: {error}", durable.provider_tag()),
})?;
durable_at = Some(durable_at.map_or(seq, |highest| highest.max(seq)));
}
Ok(AppendedCommit {
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
durable_at,
reply: None,
started,
})
}
pub(crate) fn flush_durables(durable_providers: &[Arc<dyn DurableProvider>]) -> GraphResult<()> {
for durable in durable_providers {
durable.flush().map_err(|error| GraphError::Durable {
reason: format!("{}: {error}", durable.provider_tag()),
})?;
}
Ok(())
}
pub(crate) fn publish_appended(
appended: AppendedCommit,
snapshot: &ArcSwap<SeleneGraph>,
schema_version: &AtomicU64,
providers: &[Arc<dyn IndexProvider>],
) -> CommitOutcome {
let AppendedCommit {
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
durable_at,
reply: _,
started,
} = appended;
#[cfg(test)]
publish_panic_inject::maybe_panic();
snapshot.store(Arc::clone(&next_snapshot));
if schema_changed {
schema_version.fetch_add(1, Ordering::AcqRel);
}
let fanout: &[Change] = fanout_changes.as_deref().unwrap_or(&changes);
{
let _fanout_guard = crate::reentry::FanoutGuard::enter();
crate::provider_fanout::notify_providers(providers, generation, fanout);
}
metrics::counter_inc(metrics::COMMITS_TOTAL);
metrics::histogram_record(
metrics::COMMIT_DURATION_SECONDS,
started.elapsed().as_secs_f64(),
);
metrics::gauge_set(metrics::GRAPH_NODES, next_snapshot.node_count() as f64);
metrics::gauge_set(metrics::GRAPH_EDGES, next_snapshot.edge_count() as f64);
CommitOutcome {
generation,
changes,
principal,
durable_at,
next_node_id,
next_edge_id,
warnings,
}
}
fn expand_truncates_for_fanout(
changes: &[Change],
expansions: &[(usize, Vec<Change>)],
) -> Option<Vec<Change>> {
if expansions.is_empty() {
return None;
}
let mut view = Vec::with_capacity(changes.len());
for (index, change) in changes.iter().enumerate() {
match change {
Change::NodesOfTypeTruncated { .. }
| Change::EdgesOfTypeTruncated { .. }
| Change::GraphReset { .. } => {
if let Some((_, expansion)) = expansions.iter().find(|(staged, _)| *staged == index)
{
view.extend(expansion.iter().cloned());
}
}
other => view.push(other.clone()),
}
}
Some(view)
}
#[cfg(test)]
pub(crate) mod publish_panic_inject {
use std::cell::Cell;
thread_local! {
static COUNTDOWN: Cell<Option<u32>> = const { Cell::new(None) };
}
pub(crate) fn arm(after: u32) {
COUNTDOWN.with(|cell| cell.set(Some(after)));
}
pub(crate) fn maybe_panic() {
COUNTDOWN.with(|cell| {
if let Some(remaining) = cell.get() {
let next = remaining.saturating_sub(1);
if next == 0 {
cell.set(None);
panic!("selene-graph test: injected Stage-3 publish_appended panic");
}
cell.set(Some(next));
}
});
}
}
#[cfg(test)]
mod tests;