use std::collections::HashMap;
use std::sync::Arc;
use grafeo_common::memory::buffer::{MemoryConsumer, MemoryRegion, priorities};
use grafeo_common::types::{EdgeId, EpochId, HlcClock, HlcTimestamp, NodeId, Value};
use hashbrown::HashMap as HbHashMap;
use parking_lot::RwLock;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum ChangeKind {
Create,
Update,
Delete,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum EntityId {
Node(NodeId),
Edge(EdgeId),
Triple(u64),
}
impl From<NodeId> for EntityId {
fn from(id: NodeId) -> Self {
Self::Node(id)
}
}
impl From<EdgeId> for EntityId {
fn from(id: EdgeId) -> Self {
Self::Edge(id)
}
}
impl EntityId {
#[must_use]
pub fn as_u64(&self) -> u64 {
match self {
Self::Node(id) => id.as_u64(),
Self::Edge(id) => id.as_u64(),
Self::Triple(h) => *h,
}
}
#[must_use]
pub fn is_node(&self) -> bool {
matches!(self, Self::Node(_))
}
#[must_use]
pub fn is_triple(&self) -> bool {
matches!(self, Self::Triple(_))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChangeEvent {
pub entity_id: EntityId,
pub kind: ChangeKind,
pub epoch: EpochId,
pub timestamp: HlcTimestamp,
pub before: Option<HashMap<String, Value>>,
pub after: Option<HashMap<String, Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub labels: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edge_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub src_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dst_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub triple_subject: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub triple_predicate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub triple_object: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub triple_graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CdcRetentionConfig {
pub max_epochs: Option<u64>,
pub max_events: Option<usize>,
}
impl Default for CdcRetentionConfig {
fn default() -> Self {
Self {
max_epochs: Some(1000),
max_events: Some(100_000),
}
}
}
#[derive(Debug)]
pub struct CdcLog {
events: RwLock<HbHashMap<EntityId, Vec<ChangeEvent>>>,
clock: Arc<HlcClock>,
retention: CdcRetentionConfig,
}
impl CdcLog {
#[must_use]
pub fn new() -> Self {
Self {
events: RwLock::new(HbHashMap::new()),
clock: Arc::new(HlcClock::new()),
retention: CdcRetentionConfig::default(),
}
}
#[must_use]
pub fn with_retention(retention: CdcRetentionConfig) -> Self {
Self {
events: RwLock::new(HbHashMap::new()),
clock: Arc::new(HlcClock::new()),
retention,
}
}
pub fn next_timestamp(&self) -> HlcTimestamp {
self.clock.now()
}
pub fn clock(&self) -> &Arc<HlcClock> {
&self.clock
}
pub fn record(&self, event: ChangeEvent) {
self.events
.write()
.entry(event.entity_id)
.or_default()
.push(event);
}
pub fn record_batch(&self, events: impl IntoIterator<Item = ChangeEvent>) {
let mut guard = self.events.write();
for event in events {
guard.entry(event.entity_id).or_default().push(event);
}
}
pub fn record_create_node(
&self,
id: NodeId,
epoch: EpochId,
props: Option<HashMap<String, Value>>,
labels: Option<Vec<String>>,
) {
self.record(ChangeEvent {
entity_id: EntityId::Node(id),
kind: ChangeKind::Create,
epoch,
timestamp: self.clock.now(),
before: None,
after: props,
labels,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: None,
triple_predicate: None,
triple_object: None,
triple_graph: None,
});
}
pub fn record_create_edge(
&self,
id: EdgeId,
epoch: EpochId,
props: Option<HashMap<String, Value>>,
src_id: u64,
dst_id: u64,
edge_type: String,
) {
self.record(ChangeEvent {
entity_id: EntityId::Edge(id),
kind: ChangeKind::Create,
epoch,
timestamp: self.clock.now(),
before: None,
after: props,
labels: None,
edge_type: Some(edge_type),
src_id: Some(src_id),
dst_id: Some(dst_id),
triple_subject: None,
triple_predicate: None,
triple_object: None,
triple_graph: None,
});
}
pub fn record_triple_insert(
&self,
subject: &str,
predicate: &str,
object: &str,
graph: Option<&str>,
epoch: EpochId,
) {
let id = triple_hash(subject, predicate, object, graph);
self.record(ChangeEvent {
entity_id: EntityId::Triple(id),
kind: ChangeKind::Create,
epoch,
timestamp: self.clock.now(),
before: None,
after: None,
labels: None,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: Some(subject.to_string()),
triple_predicate: Some(predicate.to_string()),
triple_object: Some(object.to_string()),
triple_graph: graph.map(ToString::to_string),
});
}
pub fn record_triple_delete(
&self,
subject: &str,
predicate: &str,
object: &str,
graph: Option<&str>,
epoch: EpochId,
) {
let id = triple_hash(subject, predicate, object, graph);
self.record(ChangeEvent {
entity_id: EntityId::Triple(id),
kind: ChangeKind::Delete,
epoch,
timestamp: self.clock.now(),
before: None,
after: None,
labels: None,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: Some(subject.to_string()),
triple_predicate: Some(predicate.to_string()),
triple_object: Some(object.to_string()),
triple_graph: graph.map(ToString::to_string),
});
}
pub fn record_update(
&self,
entity_id: EntityId,
epoch: EpochId,
key: &str,
old_value: Option<Value>,
new_value: Value,
) {
let before = old_value.map(|v| {
let mut m = HashMap::new();
m.insert(key.to_string(), v);
m
});
let mut after_map = HashMap::new();
after_map.insert(key.to_string(), new_value);
self.record(ChangeEvent {
entity_id,
kind: ChangeKind::Update,
epoch,
timestamp: self.clock.now(),
before,
after: Some(after_map),
labels: None,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: None,
triple_predicate: None,
triple_object: None,
triple_graph: None,
});
}
pub fn record_delete(
&self,
entity_id: EntityId,
epoch: EpochId,
props: Option<HashMap<String, Value>>,
) {
self.record(ChangeEvent {
entity_id,
kind: ChangeKind::Delete,
epoch,
timestamp: self.clock.now(),
before: props,
after: None,
labels: None,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: None,
triple_predicate: None,
triple_object: None,
triple_graph: None,
});
}
#[must_use]
pub fn history(&self, entity_id: EntityId) -> Vec<ChangeEvent> {
self.events
.read()
.get(&entity_id)
.cloned()
.unwrap_or_default()
}
#[must_use]
pub fn history_since(&self, entity_id: EntityId, since_epoch: EpochId) -> Vec<ChangeEvent> {
self.events
.read()
.get(&entity_id)
.map(|events| {
events
.iter()
.filter(|e| e.epoch >= since_epoch)
.cloned()
.collect()
})
.unwrap_or_default()
}
#[must_use]
pub fn changes_between(&self, start_epoch: EpochId, end_epoch: EpochId) -> Vec<ChangeEvent> {
let guard = self.events.read();
let mut results = Vec::new();
for events in guard.values() {
for event in events {
if event.epoch >= start_epoch && event.epoch <= end_epoch {
results.push(event.clone());
}
}
}
results.sort_by_key(|e| e.epoch);
results
}
#[must_use]
pub fn event_count(&self) -> usize {
self.events.read().values().map(Vec::len).sum()
}
#[must_use]
pub fn heap_memory_bytes(&self) -> (usize, usize, usize) {
let guard = self.events.read();
let entity_count = guard.len();
let mut event_count = 0usize;
let mut bytes = 0usize;
let entry_overhead = std::mem::size_of::<(EntityId, Vec<ChangeEvent>)>() + 8;
let event_size = std::mem::size_of::<ChangeEvent>();
for events in guard.values() {
event_count += events.len();
bytes += entry_overhead + events.capacity() * event_size;
}
(bytes, entity_count, event_count)
}
pub fn prune_before(&self, min_epoch: EpochId) {
let mut guard = self.events.write();
guard.retain(|_, events| {
events.retain(|e| e.epoch >= min_epoch);
!events.is_empty()
});
}
pub fn prune_to_limit(&self) {
let Some(max) = self.retention.max_events else {
return;
};
let count = self.event_count();
if count <= max {
return;
}
let guard = self.events.read();
let mut epochs: Vec<EpochId> = guard
.values()
.flat_map(|events| events.iter().map(|e| e.epoch))
.collect();
drop(guard);
epochs.sort();
let excess = count - max;
if let Some(&cutoff) = epochs.get(excess.saturating_sub(1)) {
self.prune_before(EpochId::new(cutoff.as_u64() + 1));
}
}
pub fn apply_retention(&self, current_epoch: EpochId) {
if let Some(max_epochs) = self.retention.max_epochs {
let cutoff = current_epoch.as_u64().saturating_sub(max_epochs);
self.prune_before(EpochId::new(cutoff));
}
self.prune_to_limit();
}
#[must_use]
pub fn approximate_memory_bytes(&self) -> usize {
self.event_count() * 256
}
}
impl Default for CdcLog {
fn default() -> Self {
Self::new()
}
}
impl MemoryConsumer for CdcLog {
fn name(&self) -> &str {
"cdc_log"
}
fn memory_usage(&self) -> usize {
self.approximate_memory_bytes()
}
fn eviction_priority(&self) -> u8 {
priorities::QUERY_CACHE
}
fn region(&self) -> MemoryRegion {
MemoryRegion::ExecutionBuffers
}
fn evict(&self, target_bytes: usize) -> usize {
let before = self.approximate_memory_bytes();
if before == 0 {
return 0;
}
let events_to_remove = target_bytes / 256;
if events_to_remove == 0 {
return 0;
}
let guard = self.events.read();
let mut epochs: Vec<EpochId> = guard
.values()
.flat_map(|events| events.iter().map(|e| e.epoch))
.collect();
drop(guard);
if epochs.is_empty() {
return 0;
}
epochs.sort();
let cutoff_idx = events_to_remove.min(epochs.len()).saturating_sub(1);
let cutoff = epochs[cutoff_idx];
self.prune_before(EpochId::new(cutoff.as_u64() + 1));
before.saturating_sub(self.approximate_memory_bytes())
}
fn can_spill(&self) -> bool {
false
}
}
fn triple_hash(subject: &str, predicate: &str, object: &str, graph: Option<&str>) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
subject.hash(&mut h);
predicate.hash(&mut h);
object.hash(&mut h);
graph.hash(&mut h);
h.finish()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_and_history() {
let log = CdcLog::new();
let node_id = NodeId::new(1);
log.record_create_node(node_id, EpochId(1), None, None);
log.record_update(
EntityId::Node(node_id),
EpochId(2),
"name",
None,
Value::from("Alix"),
);
log.record_update(
EntityId::Node(node_id),
EpochId(3),
"name",
Some(Value::from("Alix")),
Value::from("Gus"),
);
let history = log.history(EntityId::Node(node_id));
assert_eq!(history.len(), 3);
assert_eq!(history[0].kind, ChangeKind::Create);
assert_eq!(history[1].kind, ChangeKind::Update);
assert_eq!(history[2].kind, ChangeKind::Update);
}
#[test]
fn test_history_since() {
let log = CdcLog::new();
let node_id = NodeId::new(1);
log.record_create_node(node_id, EpochId(1), None, None);
log.record_update(
EntityId::Node(node_id),
EpochId(5),
"name",
None,
Value::from("Alix"),
);
log.record_update(
EntityId::Node(node_id),
EpochId(10),
"name",
Some(Value::from("Alix")),
Value::from("Gus"),
);
let since_5 = log.history_since(EntityId::Node(node_id), EpochId(5));
assert_eq!(since_5.len(), 2);
assert_eq!(since_5[0].epoch, EpochId(5));
}
#[test]
fn test_changes_between() {
let log = CdcLog::new();
log.record_create_node(NodeId::new(1), EpochId(1), None, None);
log.record_create_node(NodeId::new(2), EpochId(3), None, None);
log.record_update(
EntityId::Node(NodeId::new(1)),
EpochId(5),
"x",
None,
Value::from(42),
);
let changes = log.changes_between(EpochId(2), EpochId(5));
assert_eq!(changes.len(), 2); }
#[test]
fn test_delete_event() {
let log = CdcLog::new();
let node_id = NodeId::new(1);
let mut props = HashMap::new();
props.insert("name".to_string(), Value::from("Alix"));
log.record_create_node(node_id, EpochId(1), Some(props.clone()), None);
log.record_delete(EntityId::Node(node_id), EpochId(2), Some(props));
let history = log.history(EntityId::Node(node_id));
assert_eq!(history.len(), 2);
assert_eq!(history[1].kind, ChangeKind::Delete);
assert!(history[1].after.is_none());
assert!(history[1].before.is_some());
}
#[test]
fn test_empty_history() {
let log = CdcLog::new();
let history = log.history(EntityId::Node(NodeId::new(999)));
assert!(history.is_empty());
}
#[test]
fn test_event_count() {
let log = CdcLog::new();
assert_eq!(log.event_count(), 0);
log.record_create_node(NodeId::new(1), EpochId(1), None, None);
log.record_create_node(NodeId::new(2), EpochId(2), None, None);
assert_eq!(log.event_count(), 2);
}
#[test]
fn test_entity_id_conversions() {
let node_id = NodeId::new(42);
let entity: EntityId = node_id.into();
assert!(entity.is_node());
assert_eq!(entity.as_u64(), 42);
let edge_id = EdgeId::new(7);
let entity: EntityId = edge_id.into();
assert!(!entity.is_node());
assert_eq!(entity.as_u64(), 7);
}
#[test]
fn test_prune_before() {
let log = CdcLog::new();
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
log.prune_before(EpochId(6));
assert_eq!(log.event_count(), 5);
let remaining = log.changes_between(EpochId(0), EpochId(100));
assert!(remaining.iter().all(|e| e.epoch >= EpochId(6)));
}
#[test]
fn test_prune_to_limit() {
let retention = CdcRetentionConfig {
max_epochs: None,
max_events: Some(5),
};
let log = CdcLog::with_retention(retention);
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
log.prune_to_limit();
assert!(log.event_count() <= 5);
}
#[test]
fn test_apply_retention_epoch_based() {
let retention = CdcRetentionConfig {
max_epochs: Some(3),
max_events: None,
};
let log = CdcLog::with_retention(retention);
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
log.apply_retention(EpochId(10));
let remaining = log.changes_between(EpochId(0), EpochId(100));
assert!(remaining.iter().all(|e| e.epoch >= EpochId(7)));
assert_eq!(remaining.len(), 4); }
#[test]
fn test_memory_consumer_evict() {
let log = CdcLog::new();
for epoch in 1..=100 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
let before = log.approximate_memory_bytes();
assert!(before > 0);
let freed = log.evict(before / 2);
assert!(freed > 0);
assert!(log.event_count() < 100);
}
#[test]
fn test_retention_config_default() {
let config = CdcRetentionConfig::default();
assert_eq!(config.max_epochs, Some(1000));
assert_eq!(config.max_events, Some(100_000));
}
#[test]
fn test_apply_retention_count_only() {
let retention = CdcRetentionConfig {
max_epochs: None,
max_events: Some(4),
};
let log = CdcLog::with_retention(retention);
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
log.apply_retention(EpochId(10));
assert!(
log.event_count() <= 4,
"count-based retention should prune to at most 4 events, got {}",
log.event_count()
);
}
#[test]
fn test_apply_retention_combined_epoch_and_count() {
let retention = CdcRetentionConfig {
max_epochs: Some(5),
max_events: Some(3),
};
let log = CdcLog::with_retention(retention);
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
log.apply_retention(EpochId(10));
assert!(
log.event_count() <= 3,
"combined retention should honour the stricter limit, got {}",
log.event_count()
);
let remaining = log.changes_between(EpochId(0), EpochId(100));
assert!(remaining.iter().all(|e| e.epoch >= EpochId(6)));
}
#[test]
fn test_prune_before_epoch_zero() {
let log = CdcLog::new();
for epoch in 1..=5 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 5);
log.prune_before(EpochId(0));
assert_eq!(
log.event_count(),
5,
"prune_before(0) should not remove anything"
);
}
#[test]
fn test_prune_to_limit_same_epoch() {
let retention = CdcRetentionConfig {
max_epochs: None,
max_events: Some(3),
};
let log = CdcLog::with_retention(retention);
for i in 1..=10 {
log.record_create_node(NodeId::new(i), EpochId(5), None, None);
}
assert_eq!(log.event_count(), 10);
log.prune_to_limit();
assert!(
log.event_count() < 10,
"prune_to_limit should have removed events, got {}",
log.event_count()
);
}
#[test]
fn test_evict_tiny_target_is_noop() {
let log = CdcLog::new();
for epoch in 1..=10 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
assert_eq!(log.event_count(), 10);
let freed = log.evict(100);
assert_eq!(freed, 0, "evict with target < 256 bytes should be a no-op");
assert_eq!(log.event_count(), 10);
}
#[test]
fn test_heap_memory_bytes_scales_with_events() {
let log = CdcLog::new();
let (empty_bytes, empty_entities, empty_events) = log.heap_memory_bytes();
assert_eq!(empty_entities, 0);
assert_eq!(empty_events, 0);
for epoch in 1..=50 {
log.record_create_node(NodeId::new(epoch), EpochId(epoch), None, None);
}
let (populated_bytes, populated_entities, populated_events) = log.heap_memory_bytes();
assert_eq!(populated_entities, 50);
assert_eq!(populated_events, 50);
assert!(
populated_bytes > empty_bytes,
"heap estimate must grow when events are recorded"
);
}
}