use super::WriteTransaction;
use crate::core::error::{Result, StorageError};
use crate::core::graph::{Edge, Node};
use crate::core::id::{EdgeId, NodeId, VersionId};
use crate::core::interning::InternedString;
use crate::core::property::PropertyMap;
use crate::core::temporal::{BiTemporalInterval, Timestamp};
use crate::core::version::VersionMetadata;
use crate::storage::historical::HistoricalStorage;
#[inline]
pub(crate) fn create_temporal_interval(
valid_from: Timestamp,
tx_time: Timestamp,
is_tombstone: bool,
) -> Result<BiTemporalInterval> {
let mut temporal = BiTemporalInterval::with_valid_time(valid_from, tx_time);
if is_tombstone {
temporal = temporal.close_valid_time(valid_from)?;
}
Ok(temporal)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn apply_node_write(
tx: &WriteTransaction,
is_create: bool,
node_id: NodeId,
version_id: VersionId,
label: InternedString,
properties: PropertyMap,
valid_from: Timestamp,
commit_timestamp: Timestamp,
historical: &mut HistoricalStorage,
) -> Result<()> {
let metadata = VersionMetadata::uncommitted(tx.tx_id);
let node = Node::with_metadata(node_id, label, properties.clone(), version_id, metadata);
if is_create {
tx.current.insert_node_direct(node, commit_timestamp)?;
} else {
tx.current.update_node_direct(node, commit_timestamp)?;
if let Some(current_version_id) = historical.get_current_node_version(node_id) {
historical.close_node_version_transaction_time(current_version_id, commit_timestamp)?;
}
}
historical.add_node_version(
node_id,
version_id,
valid_from,
commit_timestamp,
label,
properties,
false, )?;
let temporal = create_temporal_interval(valid_from, commit_timestamp, false)?;
tx.temporal_indexes
.insert_node_version(node_id, version_id, temporal)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn apply_edge_write(
tx: &WriteTransaction,
is_create: bool,
edge_id: EdgeId,
version_id: VersionId,
source: NodeId,
target: NodeId,
label: InternedString,
properties: PropertyMap,
valid_from: Timestamp,
commit_timestamp: Timestamp,
historical: &mut HistoricalStorage,
) -> Result<()> {
let metadata = VersionMetadata::uncommitted(tx.tx_id);
let edge = Edge::with_metadata(
edge_id,
label,
source,
target,
properties.clone(),
version_id,
metadata,
);
if is_create {
tx.current.insert_edge_direct(edge)?;
} else {
tx.current.update_edge_direct(edge)?;
if let Some(current_version_id) = historical.get_current_edge_version(edge_id) {
historical.close_edge_version_transaction_time(current_version_id, commit_timestamp)?;
}
}
historical.add_edge_version(
edge_id,
version_id,
valid_from,
commit_timestamp,
label,
source,
target,
properties,
false, )?;
let temporal = create_temporal_interval(valid_from, commit_timestamp, false)?;
tx.temporal_indexes
.insert_edge_version(edge_id, version_id, temporal)?;
Ok(())
}
pub(crate) fn apply_node_delete(
tx: &WriteTransaction,
node_id: NodeId,
valid_from: Timestamp,
commit_timestamp: Timestamp,
tombstone_id: VersionId,
historical: &mut HistoricalStorage,
) -> Result<()> {
let node = tx.current.get_node(node_id)?;
if let Some(current_version_id) = historical.get_current_node_version(node_id) {
historical.close_node_version_transaction_time(current_version_id, commit_timestamp)?;
}
let tombstone_temporal = create_temporal_interval(valid_from, commit_timestamp, true)?;
historical.add_node_version(
node_id,
tombstone_id,
valid_from,
commit_timestamp,
node.label,
node.properties.clone(),
true, )?;
tx.temporal_indexes
.insert_node_version(node_id, tombstone_id, tombstone_temporal)?;
tx.current.delete_node_direct(node_id, commit_timestamp)?;
Ok(())
}
pub(crate) fn apply_edge_delete(
tx: &WriteTransaction,
edge_id: EdgeId,
valid_from: Timestamp,
commit_timestamp: Timestamp,
tombstone_id: VersionId,
historical: &mut HistoricalStorage,
) -> Result<()> {
let edge = tx.current.get_edge(edge_id)?;
if let Some(current_version_id) = historical.get_current_edge_version(edge_id) {
historical.close_edge_version_transaction_time(current_version_id, commit_timestamp)?;
}
let tombstone_temporal = create_temporal_interval(valid_from, commit_timestamp, true)?;
historical.add_edge_version(
edge_id,
tombstone_id,
valid_from,
commit_timestamp,
edge.label,
edge.source,
edge.target,
edge.properties.clone(),
true, )?;
tx.temporal_indexes
.insert_edge_version(edge_id, tombstone_id, tombstone_temporal)?;
tx.current.delete_edge_direct(edge_id)?;
Ok(())
}
pub(crate) fn apply_single_write(
tx: &WriteTransaction,
write: &crate::api::transaction::BufferedWrite,
commit_timestamp: Timestamp,
historical: &mut HistoricalStorage,
tombstone_ids: &mut std::vec::IntoIter<u64>,
num_deletes: usize,
) -> Result<()> {
match write {
crate::api::transaction::BufferedWrite::CreateNode {
node_id,
version_id,
label,
properties,
valid_from,
}
| crate::api::transaction::BufferedWrite::UpdateNode {
node_id,
version_id,
label,
properties,
valid_from,
} => {
let is_create = matches!(
write,
crate::api::transaction::BufferedWrite::CreateNode { .. }
);
apply_node_write(
tx,
is_create,
*node_id,
*version_id,
*label,
properties.clone(),
*valid_from,
commit_timestamp,
historical,
)?;
}
crate::api::transaction::BufferedWrite::CreateEdge {
edge_id,
version_id,
source,
target,
label,
properties,
valid_from,
}
| crate::api::transaction::BufferedWrite::UpdateEdge {
edge_id,
version_id,
source,
target,
label,
properties,
valid_from,
} => {
let is_create = matches!(
write,
crate::api::transaction::BufferedWrite::CreateEdge { .. }
);
apply_edge_write(
tx,
is_create,
*edge_id,
*version_id,
*source,
*target,
*label,
properties.clone(),
*valid_from,
commit_timestamp,
historical,
)?;
}
crate::api::transaction::BufferedWrite::DeleteNode {
node_id,
valid_from,
} => {
let tombstone_version_id = VersionId::new_unchecked(tombstone_ids.next().ok_or_else(|| {
StorageError::InconsistentState {
reason: format!(
"Tombstone ID exhaustion for DeleteNode: expected {} deletes, iterator depleted at node_id {:?}",
num_deletes, node_id
),
}
})?);
apply_node_delete(
tx,
*node_id,
*valid_from,
commit_timestamp,
tombstone_version_id,
historical,
)?;
}
crate::api::transaction::BufferedWrite::DeleteEdge {
edge_id,
valid_from,
} => {
let tombstone_version_id = VersionId::new_unchecked(tombstone_ids.next().ok_or_else(|| {
StorageError::InconsistentState {
reason: format!(
"Tombstone ID exhaustion for DeleteEdge: expected {} deletes, iterator depleted at edge_id {:?}",
num_deletes, edge_id
),
}
})?);
apply_edge_delete(
tx,
*edge_id,
*valid_from,
commit_timestamp,
tombstone_version_id,
historical,
)?;
}
}
Ok(())
}
pub(crate) fn apply_changes(tx: &WriteTransaction, commit_timestamp: Timestamp) -> Result<()> {
let _temporal = BiTemporalInterval::current(commit_timestamp);
let mut historical = tx.historical.write();
let num_deletes = tx
.buffer
.operations()
.iter()
.filter(|op| {
matches!(
op,
crate::api::transaction::BufferedWrite::DeleteNode { .. }
| crate::api::transaction::BufferedWrite::DeleteEdge { .. }
)
})
.count();
let mut tombstone_ids = if num_deletes > 0 {
let ids: Result<Vec<u64>> = (0..num_deletes)
.map(|_| tx.version_id_gen.next().map_err(Into::into))
.collect();
ids?.into_iter()
} else {
Vec::new().into_iter()
};
for write in tx.buffer.operations() {
apply_single_write(
tx,
write,
commit_timestamp,
&mut historical,
&mut tombstone_ids,
num_deletes,
)?;
}
debug_assert!(
tombstone_ids.next().is_none(),
"Tombstone ID surplus: expected {} deletes, but iterator has remaining IDs",
num_deletes
);
drop(historical);
Ok(())
}
pub(crate) fn finalize_current_commit_timestamps(
tx: &WriteTransaction,
commit_timestamp: Timestamp,
) {
for write in tx.buffer.operations() {
match write {
crate::api::transaction::BufferedWrite::CreateNode { node_id, .. }
| crate::api::transaction::BufferedWrite::UpdateNode { node_id, .. } => {
tx.current
.set_node_commit_timestamp(*node_id, commit_timestamp);
}
crate::api::transaction::BufferedWrite::CreateEdge { edge_id, .. }
| crate::api::transaction::BufferedWrite::UpdateEdge { edge_id, .. } => {
tx.current
.set_edge_commit_timestamp(*edge_id, commit_timestamp);
}
_ => {}
}
}
}