use crate::core::error::{Result, StorageError};
use crate::core::id::{EdgeId, EntityId, NodeId, VersionId};
use crate::core::temporal::{BiTemporalInterval, TimeRange, Timestamp};
use dashmap::DashMap;
use smallvec::SmallVec;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DeduplicationPolicy {
#[default]
FirstOccurrence,
LastOccurrence,
Reject,
}
#[derive(Debug, Clone)]
pub struct TemporalIndexConfig {
pub max_versions_per_entity: usize,
}
impl Default for TemporalIndexConfig {
fn default() -> Self {
Self {
max_versions_per_entity: 1_000_000,
}
}
}
pub type TimelineVersionMetadataIndex = u32;
pub type IndexVec = SmallVec<[TimelineVersionMetadataIndex; 16]>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimelineVersionMetadata {
version_id: VersionId,
}
impl TimelineVersionMetadata {
#[inline]
pub const fn new(version_id: VersionId) -> Self {
Self { version_id }
}
#[inline]
pub const fn version_id(&self) -> VersionId {
self.version_id
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TimelineEntry {
start: Timestamp,
end: Timestamp,
metadata_idx: TimelineVersionMetadataIndex,
}
impl TimelineEntry {
#[inline]
#[cfg(test)]
pub const fn metadata_index(&self) -> TimelineVersionMetadataIndex {
self.metadata_idx
}
}
#[derive(Debug, Clone, Default)]
struct EntityTimeline {
versions: Vec<TimelineEntry>,
metadata_to_position: std::collections::HashMap<
TimelineVersionMetadataIndex,
usize,
std::hash::BuildHasherDefault<crate::core::hasher::IdentityHasher>,
>,
}
impl EntityTimeline {
fn insert(
&mut self,
start: Timestamp,
end: Timestamp,
metadata_idx: TimelineVersionMetadataIndex,
) {
let entry = TimelineEntry {
start,
end,
metadata_idx,
};
let new_key = (start, metadata_idx);
if self
.versions
.last()
.is_none_or(|last| (last.start, last.metadata_idx) <= new_key)
{
let position = self.versions.len();
self.versions.push(entry);
self.metadata_to_position.insert(metadata_idx, position);
return;
}
let idx = self
.versions
.partition_point(|e| (e.start, e.metadata_idx) < new_key);
self.versions.insert(idx, entry);
for pos in (idx + 1)..self.versions.len() {
self.metadata_to_position
.insert(self.versions[pos].metadata_idx, pos);
}
self.metadata_to_position.insert(metadata_idx, idx);
}
fn rebuild_position_map(&mut self) {
self.metadata_to_position.clear();
for (pos, entry) in self.versions.iter().enumerate() {
self.metadata_to_position.insert(entry.metadata_idx, pos);
}
}
fn update_end_time(
&mut self,
metadata_idx: TimelineVersionMetadataIndex,
new_end: Timestamp,
) -> bool {
if let Some(&position) = self.metadata_to_position.get(&metadata_idx)
&& let Some(entry) = self.versions.get_mut(position)
{
entry.end = new_end;
return true;
}
false
}
fn insert_batch(
&mut self,
mut entries: Vec<TimelineEntry>,
policy: DeduplicationPolicy,
) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
if policy == DeduplicationPolicy::Reject {
let mut seen_in_batch = std::collections::HashSet::with_capacity(entries.len());
for entry in &entries {
if !seen_in_batch.insert(entry.metadata_idx) {
return Err(StorageError::DuplicateId {
id: format!("metadata_idx:{}", entry.metadata_idx),
kind: "version (duplicate in batch)".to_string(),
}
.into());
}
}
for entry in &entries {
if self.metadata_to_position.contains_key(&entry.metadata_idx) {
return Err(StorageError::DuplicateId {
id: format!("metadata_idx:{}", entry.metadata_idx),
kind: "version (already exists in timeline)".to_string(),
}
.into());
}
}
}
self.versions.reserve(entries.len());
self.versions.append(&mut entries);
self.versions.sort_by_key(|e| (e.start, e.metadata_idx));
match policy {
DeduplicationPolicy::FirstOccurrence => {
let mut seen = std::collections::HashSet::with_capacity(self.versions.len());
self.versions
.retain(|entry| seen.insert(entry.metadata_idx));
}
DeduplicationPolicy::LastOccurrence => {
let mut seen = std::collections::HashSet::with_capacity(self.versions.len());
let mut deduped = Vec::with_capacity(self.versions.len());
for entry in self.versions.iter().rev() {
if seen.insert(entry.metadata_idx) {
deduped.push(*entry);
}
}
deduped.reverse();
self.versions = deduped;
}
DeduplicationPolicy::Reject => {
}
}
self.rebuild_position_map();
Ok(())
}
fn find_indices_in_range_iter(
&self,
range: TimeRange,
) -> impl Iterator<Item = TimelineVersionMetadataIndex> + '_ {
let cutoff = self.versions.partition_point(|e| e.start < range.end());
let range_start = range.start();
self.versions[..cutoff]
.iter()
.filter(move |entry| entry.end > range_start)
.map(|entry| entry.metadata_idx)
}
fn find_indices_in_range(&self, range: TimeRange) -> IndexVec {
let mut results = IndexVec::new();
results.extend(self.find_indices_in_range_iter(range));
results
}
fn find_indices_at_point_iter(
&self,
timestamp: Timestamp,
) -> impl Iterator<Item = TimelineVersionMetadataIndex> + '_ {
let cutoff = self.versions.partition_point(|e| e.start <= timestamp);
self.versions[..cutoff]
.iter()
.filter(move |entry| entry.end > timestamp)
.map(|entry| entry.metadata_idx)
}
fn find_indices_at_point(&self, timestamp: Timestamp) -> IndexVec {
let mut results = IndexVec::new();
results.extend(self.find_indices_at_point_iter(timestamp));
results
}
}
#[derive(Debug, Clone, Default)]
struct EntityTimelines {
version_metadata: Vec<TimelineVersionMetadata>,
version_id_to_idx: std::collections::HashMap<VersionId, TimelineVersionMetadataIndex>,
valid: EntityTimeline,
tx: EntityTimeline,
}
impl EntityTimelines {
#[inline]
pub fn version_metadata_count(&self) -> usize {
self.version_metadata.len()
}
#[inline]
#[cfg(test)]
pub(crate) fn get_version_metadata(
&self,
index: TimelineVersionMetadataIndex,
) -> Option<&TimelineVersionMetadata> {
self.version_metadata.get(index as usize)
}
#[inline]
fn add_version_metadata(
&mut self,
metadata: TimelineVersionMetadata,
) -> Result<TimelineVersionMetadataIndex> {
let index = self.version_metadata.len();
if index > u32::MAX as usize {
return Err(StorageError::CapacityExceeded {
resource: "version metadata indices".to_string(),
current: index,
limit: u32::MAX as usize,
}
.into());
}
let idx = index as TimelineVersionMetadataIndex;
self.version_id_to_idx.insert(metadata.version_id(), idx);
self.version_metadata.push(metadata);
Ok(idx)
}
#[inline]
fn resolve_version_id(&self, index: TimelineVersionMetadataIndex) -> VersionId {
self.version_metadata
.get(index as usize)
.expect(
"internal error: invalid metadata index - this indicates a bug in temporal index",
)
.version_id()
}
#[inline]
fn resolve_version_ids_iter<'a>(
&'a self,
indices: &'a [TimelineVersionMetadataIndex],
) -> impl Iterator<Item = VersionId> + 'a {
indices.iter().map(|&idx| self.resolve_version_id(idx))
}
#[inline]
fn resolve_version_ids(&self, indices: &[TimelineVersionMetadataIndex]) -> Vec<VersionId> {
self.resolve_version_ids_iter(indices).collect()
}
#[inline]
fn find_metadata_index(&self, version_id: VersionId) -> Option<TimelineVersionMetadataIndex> {
self.version_id_to_idx.get(&version_id).copied()
}
fn update_valid_time_end(&mut self, version_id: VersionId, new_end: Timestamp) -> bool {
if let Some(metadata_idx) = self.find_metadata_index(version_id) {
let result = self.valid.update_end_time(metadata_idx, new_end);
debug_assert!(
result,
"Temporal index inconsistency: version {:?} exists in metadata but not in valid timeline",
version_id
);
result
} else {
debug_assert!(
false,
"Temporal index inconsistency: version {:?} not found in metadata",
version_id
);
false
}
}
fn update_transaction_time_end(&mut self, version_id: VersionId, new_end: Timestamp) -> bool {
if let Some(metadata_idx) = self.find_metadata_index(version_id) {
let result = self.tx.update_end_time(metadata_idx, new_end);
debug_assert!(
result,
"Temporal index inconsistency: version {:?} exists in metadata but not in tx timeline",
version_id
);
result
} else {
debug_assert!(
false,
"Temporal index inconsistency: version {:?} not found in metadata",
version_id
);
false
}
}
}
#[derive(Debug)]
pub struct TemporalIndexes {
index: DashMap<EntityId, EntityTimelines>,
config: TemporalIndexConfig,
}
impl Default for TemporalIndexes {
fn default() -> Self {
Self::new()
}
}
impl TemporalIndexes {
pub fn new() -> Self {
Self::with_config(TemporalIndexConfig::default())
}
pub fn with_config(config: TemporalIndexConfig) -> Self {
Self {
index: DashMap::new(),
config,
}
}
pub fn insert_node_version(
&self,
node_id: NodeId,
version_id: VersionId,
temporal: BiTemporalInterval,
) -> Result<()> {
self.insert_version(EntityId::Node(node_id), version_id, temporal)
}
pub fn insert_edge_version(
&self,
edge_id: EdgeId,
version_id: VersionId,
temporal: BiTemporalInterval,
) -> Result<()> {
self.insert_version(EntityId::Edge(edge_id), version_id, temporal)
}
pub fn insert_node_versions_batch(
&self,
node_id: NodeId,
versions: Vec<(VersionId, BiTemporalInterval)>,
) -> Result<()> {
self.insert_node_versions_batch_with_policy(
node_id,
versions,
DeduplicationPolicy::default(),
)
}
pub fn insert_node_versions_batch_with_policy(
&self,
node_id: NodeId,
versions: Vec<(VersionId, BiTemporalInterval)>,
policy: DeduplicationPolicy,
) -> Result<()> {
self.insert_versions_batch(EntityId::Node(node_id), versions, policy)
}
pub fn insert_edge_versions_batch(
&self,
edge_id: EdgeId,
versions: Vec<(VersionId, BiTemporalInterval)>,
) -> Result<()> {
self.insert_edge_versions_batch_with_policy(
edge_id,
versions,
DeduplicationPolicy::default(),
)
}
pub fn insert_edge_versions_batch_with_policy(
&self,
edge_id: EdgeId,
versions: Vec<(VersionId, BiTemporalInterval)>,
policy: DeduplicationPolicy,
) -> Result<()> {
self.insert_versions_batch(EntityId::Edge(edge_id), versions, policy)
}
fn insert_version(
&self,
entity_id: EntityId,
version_id: VersionId,
temporal: BiTemporalInterval,
) -> Result<()> {
let mut timelines = self.index.entry(entity_id).or_default();
let current_count = timelines.version_metadata_count();
if current_count >= self.config.max_versions_per_entity {
return Err(StorageError::CapacityExceeded {
resource: format!("versions for entity {:?}", entity_id),
current: current_count,
limit: self.config.max_versions_per_entity,
}
.into());
}
if timelines.find_metadata_index(version_id).is_some() {
return Err(StorageError::DuplicateId {
id: format!("{}", version_id),
kind: "version".to_string(),
}
.into());
}
let metadata = TimelineVersionMetadata::new(version_id);
let metadata_idx = timelines.add_version_metadata(metadata)?;
let valid = temporal.valid_time();
timelines
.valid
.insert(valid.start(), valid.end(), metadata_idx);
let tx = temporal.transaction_time();
timelines.tx.insert(tx.start(), tx.end(), metadata_idx);
Ok(())
}
fn insert_versions_batch(
&self,
entity_id: EntityId,
versions: Vec<(VersionId, BiTemporalInterval)>,
policy: DeduplicationPolicy,
) -> Result<()> {
if versions.is_empty() {
return Ok(());
}
let mut timelines = self.index.entry(entity_id).or_default();
let current_count = timelines.version_metadata_count();
let new_count = current_count + versions.len();
if new_count > self.config.max_versions_per_entity {
return Err(StorageError::CapacityExceeded {
resource: format!("versions for entity {:?}", entity_id),
current: new_count,
limit: self.config.max_versions_per_entity,
}
.into());
}
timelines.version_metadata.reserve(versions.len());
timelines.version_id_to_idx.reserve(versions.len());
let mut valid_entries = Vec::with_capacity(versions.len());
let mut tx_entries = Vec::with_capacity(versions.len());
let mut v_id_to_idx = std::collections::HashMap::with_capacity(versions.len());
for (v_id, temporal) in versions {
let metadata_idx = if let Some(&idx) = v_id_to_idx.get(&v_id) {
idx
} else if let Some(idx) = timelines.find_metadata_index(v_id) {
v_id_to_idx.insert(v_id, idx);
idx
} else {
let metadata = TimelineVersionMetadata::new(v_id);
let idx = timelines.add_version_metadata(metadata)?;
v_id_to_idx.insert(v_id, idx);
idx
};
let valid = temporal.valid_time();
let tx = temporal.transaction_time();
valid_entries.push(TimelineEntry {
start: valid.start(),
end: valid.end(),
metadata_idx,
});
tx_entries.push(TimelineEntry {
start: tx.start(),
end: tx.end(),
metadata_idx,
});
}
timelines.valid.insert_batch(valid_entries, policy)?;
timelines.tx.insert_batch(tx_entries, policy)?;
Ok(())
}
pub fn update_node_valid_time_end(
&self,
node_id: NodeId,
version_id: VersionId,
new_end: Timestamp,
) {
if let Some(mut timelines) = self.index.get_mut(&EntityId::Node(node_id)) {
timelines.update_valid_time_end(version_id, new_end);
}
}
pub fn update_node_transaction_time_end(
&self,
node_id: NodeId,
version_id: VersionId,
new_end: Timestamp,
) {
if let Some(mut timelines) = self.index.get_mut(&EntityId::Node(node_id)) {
timelines.update_transaction_time_end(version_id, new_end);
}
}
pub fn update_edge_valid_time_end(
&self,
edge_id: EdgeId,
version_id: VersionId,
new_end: Timestamp,
) {
if let Some(mut timelines) = self.index.get_mut(&EntityId::Edge(edge_id)) {
timelines.update_valid_time_end(version_id, new_end);
}
}
pub fn update_edge_transaction_time_end(
&self,
edge_id: EdgeId,
version_id: VersionId,
new_end: Timestamp,
) {
if let Some(mut timelines) = self.index.get_mut(&EntityId::Edge(edge_id)) {
timelines.update_transaction_time_end(version_id, new_end);
}
}
pub fn find_node_versions_in_valid_time_range(
&self,
node_id: NodeId,
time_range: TimeRange,
) -> Vec<VersionId> {
self.index
.get(&EntityId::Node(node_id))
.map(|t| {
let indices = t.valid.find_indices_in_range(time_range);
t.resolve_version_ids(&indices)
})
.unwrap_or_default()
}
pub fn find_edge_versions_in_valid_time_range(
&self,
edge_id: EdgeId,
time_range: TimeRange,
) -> Vec<VersionId> {
self.index
.get(&EntityId::Edge(edge_id))
.map(|t| {
let indices = t.valid.find_indices_in_range(time_range);
t.resolve_version_ids(&indices)
})
.unwrap_or_default()
}
pub fn find_node_versions_in_transaction_time_range(
&self,
node_id: NodeId,
time_range: TimeRange,
) -> Vec<VersionId> {
self.index
.get(&EntityId::Node(node_id))
.map(|t| {
let indices = t.tx.find_indices_in_range(time_range);
t.resolve_version_ids(&indices)
})
.unwrap_or_default()
}
pub fn find_edge_versions_in_transaction_time_range(
&self,
edge_id: EdgeId,
time_range: TimeRange,
) -> Vec<VersionId> {
self.index
.get(&EntityId::Edge(edge_id))
.map(|t| {
let indices = t.tx.find_indices_in_range(time_range);
t.resolve_version_ids(&indices)
})
.unwrap_or_default()
}
pub fn find_node_version_at_point(
&self,
node_id: NodeId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> Vec<VersionId> {
self.find_version_at_point_impl(EntityId::Node(node_id), valid_time, transaction_time)
}
pub fn find_edge_version_at_point(
&self,
edge_id: EdgeId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> Vec<VersionId> {
self.find_version_at_point_impl(EntityId::Edge(edge_id), valid_time, transaction_time)
}
pub fn find_node_version_at_point_iter(
&self,
node_id: NodeId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> impl Iterator<Item = VersionId> + '_ {
self.find_version_at_point_iter_impl(EntityId::Node(node_id), valid_time, transaction_time)
}
pub fn find_edge_version_at_point_iter(
&self,
edge_id: EdgeId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> impl Iterator<Item = VersionId> + '_ {
self.find_version_at_point_iter_impl(EntityId::Edge(edge_id), valid_time, transaction_time)
}
fn find_version_at_point_iter_impl(
&self,
entity_id: EntityId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> impl Iterator<Item = VersionId> {
self.find_version_at_point_impl(entity_id, valid_time, transaction_time)
.into_iter()
}
fn find_version_at_point_impl(
&self,
entity_id: EntityId,
valid_time: Timestamp,
transaction_time: Timestamp,
) -> Vec<VersionId> {
let Some(timelines) = self.index.get(&entity_id) else {
return Vec::new();
};
let valid_indices = timelines.valid.find_indices_at_point(valid_time);
let tx_indices = timelines.tx.find_indices_at_point(transaction_time);
let intersected_indices = Self::intersect_metadata_indices(&valid_indices, &tx_indices);
timelines.resolve_version_ids(&intersected_indices)
}
fn intersect_metadata_indices(
a: &[TimelineVersionMetadataIndex],
b: &[TimelineVersionMetadataIndex],
) -> IndexVec {
const HASH_THRESHOLD: usize = 16;
let max_len = a.len().max(b.len());
if max_len < HASH_THRESHOLD {
if a.len() <= b.len() {
a.iter().copied().filter(|v| b.contains(v)).collect()
} else {
b.iter().copied().filter(|v| a.contains(v)).collect()
}
} else {
use std::collections::HashSet;
let b_set: HashSet<_> = b.iter().copied().collect();
a.iter().copied().filter(|v| b_set.contains(v)).collect()
}
}
pub fn version_count(&self) -> usize {
self.index
.iter()
.map(|entry| entry.value().version_metadata_count())
.sum()
}
pub fn entity_ids(&self) -> impl Iterator<Item = EntityId> + '_ {
self.index.iter().map(|entry| *entry.key())
}
pub fn clear(&self) {
self.index.clear();
}
}
#[cfg(test)]
mod tests;