use crate::engine::ImmutableEpoch;
use crate::error::EngineError;
use crate::memtable::Memtable;
use crate::segment_reader::SegmentReader;
use crate::types::*;
pub struct SourceList<'a> {
pub(crate) active: &'a Memtable,
pub(crate) immutable: &'a [ImmutableEpoch],
pub(crate) segments: &'a [SegmentReader],
}
impl<'a> SourceList<'a> {
pub fn find_node(&self, id: u64) -> Result<Option<NodeRecord>, EngineError> {
if let Some(node) = self.active.get_node(id) {
return Ok(Some(node.clone()));
}
if self.active.deleted_nodes().contains_key(&id) {
return Ok(None);
}
for epoch in self.immutable {
if let Some(node) = epoch.memtable.get_node(id) {
return Ok(Some(node.clone()));
}
if epoch.memtable.deleted_nodes().contains_key(&id) {
return Ok(None);
}
}
for seg in self.segments {
if seg.is_node_deleted(id) {
return Ok(None);
}
if let Some(node) = seg.get_node(id)? {
return Ok(Some(node));
}
}
Ok(None)
}
pub fn find_edge(&self, id: u64) -> Result<Option<EdgeRecord>, EngineError> {
if let Some(edge) = self.active.get_edge(id) {
return Ok(Some(edge.clone()));
}
if self.active.deleted_edges().contains_key(&id) {
return Ok(None);
}
for epoch in self.immutable {
if let Some(edge) = epoch.memtable.get_edge(id) {
return Ok(Some(edge.clone()));
}
if epoch.memtable.deleted_edges().contains_key(&id) {
return Ok(None);
}
}
for seg in self.segments {
if seg.is_edge_deleted(id) {
return Ok(None);
}
if let Some(edge) = seg.get_edge(id)? {
return Ok(Some(edge));
}
}
Ok(None)
}
pub fn find_node_by_key(
&self,
type_id: u32,
key: &str,
) -> Result<Option<NodeRecord>, EngineError> {
if let Some(node) = self.active.node_by_key(type_id, key) {
return Ok(Some(node.clone()));
}
for (i, epoch) in self.immutable.iter().enumerate() {
if let Some(node) = epoch.memtable.node_by_key(type_id, key) {
if self.is_node_tombstoned_above_immutable(node.id, i) {
return Ok(None);
}
return Ok(Some(node.clone()));
}
}
for (s, seg) in self.segments.iter().enumerate() {
if let Some(node) = seg.node_by_key(type_id, key)? {
if self.is_node_tombstoned_above_segment(node.id, s) {
return Ok(None);
}
return Ok(Some(node));
}
}
Ok(None)
}
pub fn find_edge_by_triple(
&self,
from: u64,
to: u64,
type_id: u32,
) -> Result<Option<EdgeRecord>, EngineError> {
if let Some(edge) = self.active.edge_by_triple(from, to, type_id) {
return Ok(Some(edge.clone()));
}
for (i, epoch) in self.immutable.iter().enumerate() {
if let Some(edge) = epoch.memtable.edge_by_triple(from, to, type_id) {
if self.is_edge_tombstoned_above_immutable(edge.id, i) {
return Ok(None);
}
return Ok(Some(edge.clone()));
}
}
for (s, seg) in self.segments.iter().enumerate() {
if let Some(edge) = seg.edge_by_triple(from, to, type_id)? {
if self.is_edge_tombstoned_above_segment(edge.id, s) {
return Ok(None);
}
return Ok(Some(edge));
}
}
Ok(None)
}
pub fn is_node_deleted(&self, id: u64) -> bool {
if self.active.nodes().contains_key(&id) {
return false;
}
if self.active.deleted_nodes().contains_key(&id) {
return true;
}
for epoch in self.immutable {
if epoch.memtable.nodes().contains_key(&id) {
return false;
}
if epoch.memtable.deleted_nodes().contains_key(&id) {
return true;
}
}
for seg in self.segments {
if seg.is_node_deleted(id) {
return true;
}
if seg.has_node(id) {
return false;
}
}
false
}
pub fn is_edge_deleted(&self, id: u64) -> bool {
if self.active.edges().contains_key(&id) {
return false;
}
if self.active.deleted_edges().contains_key(&id) {
return true;
}
for epoch in self.immutable {
if epoch.memtable.edges().contains_key(&id) {
return false;
}
if epoch.memtable.deleted_edges().contains_key(&id) {
return true;
}
}
for seg in self.segments {
if seg.is_edge_deleted(id) {
return true;
}
if seg.has_edge(id) {
return false;
}
}
false
}
pub fn collect_deleted_nodes(&self) -> NodeIdSet {
let mut deleted = NodeIdSet::default();
for &id in self.active.deleted_nodes().keys() {
deleted.insert(id);
}
for epoch in self.immutable {
for &id in epoch.memtable.deleted_nodes().keys() {
deleted.insert(id);
}
}
for seg in self.segments {
for &id in seg.deleted_node_tombstones().keys() {
deleted.insert(id);
}
}
deleted
}
pub fn collect_deleted_edges(&self) -> NodeIdSet {
let mut deleted = NodeIdSet::default();
for &id in self.active.deleted_edges().keys() {
deleted.insert(id);
}
for epoch in self.immutable {
for &id in epoch.memtable.deleted_edges().keys() {
deleted.insert(id);
}
}
for seg in self.segments {
for &id in seg.deleted_edge_tombstones().keys() {
deleted.insert(id);
}
}
deleted
}
fn is_node_tombstoned_above_immutable(&self, node_id: u64, imm_idx: usize) -> bool {
if self.active.deleted_nodes().contains_key(&node_id) {
return true;
}
for epoch in &self.immutable[..imm_idx] {
if epoch.memtable.deleted_nodes().contains_key(&node_id) {
return true;
}
}
false
}
fn is_node_tombstoned_above_segment(&self, node_id: u64, seg_idx: usize) -> bool {
if self.active.deleted_nodes().contains_key(&node_id) {
return true;
}
for epoch in self.immutable {
if epoch.memtable.deleted_nodes().contains_key(&node_id) {
return true;
}
}
for seg in &self.segments[..seg_idx] {
if seg.is_node_deleted(node_id) {
return true;
}
}
false
}
fn is_edge_tombstoned_above_immutable(&self, edge_id: u64, imm_idx: usize) -> bool {
if self.active.deleted_edges().contains_key(&edge_id) {
return true;
}
for epoch in &self.immutable[..imm_idx] {
if epoch.memtable.deleted_edges().contains_key(&edge_id) {
return true;
}
}
false
}
fn is_edge_tombstoned_above_segment(&self, edge_id: u64, seg_idx: usize) -> bool {
if self.active.deleted_edges().contains_key(&edge_id) {
return true;
}
for epoch in self.immutable {
if epoch.memtable.deleted_edges().contains_key(&edge_id) {
return true;
}
}
for seg in &self.segments[..seg_idx] {
if seg.is_edge_deleted(edge_id) {
return true;
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::Memtable;
use crate::types::WalOp;
use std::sync::Arc;
fn wrap_imm(mt: Memtable) -> ImmutableEpoch {
ImmutableEpoch {
epoch_id: 0,
wal_generation_id: 0,
memtable: Arc::new(mt),
in_flight: false,
}
}
fn make_node(id: u64, key: &str, type_id: u32) -> NodeRecord {
NodeRecord {
id,
key: key.to_string(),
type_id,
props: Default::default(),
created_at: 1000,
updated_at: 1000,
weight: 1.0,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
fn make_edge(id: u64, from: u64, to: u64, type_id: u32) -> EdgeRecord {
EdgeRecord {
id,
from,
to,
type_id,
props: Default::default(),
created_at: 1000,
updated_at: 1000,
weight: 1.0,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}
}
#[test]
fn test_find_node_active_memtable() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, "a", 1)), 1);
let sources = SourceList {
active: &mt,
immutable: &[],
segments: &[],
};
let node = sources.find_node(1).unwrap();
assert!(node.is_some());
assert_eq!(node.unwrap().key, "a");
assert!(sources.find_node(999).unwrap().is_none());
}
#[test]
fn test_find_node_tombstoned_in_active() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, "a", 1)), 1);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let sources = SourceList {
active: &mt,
immutable: &[],
segments: &[],
};
assert!(sources.find_node(1).unwrap().is_none());
}
#[test]
fn test_find_node_immutable_memtable() {
let active = Memtable::new();
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertNode(make_node(1, "a", 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let node = sources.find_node(1).unwrap();
assert!(node.is_some());
assert_eq!(node.unwrap().key, "a");
}
#[test]
fn test_find_node_active_wins_over_immutable() {
let mut active = Memtable::new();
active.apply_op(&WalOp::UpsertNode(make_node(1, "updated", 1)), 2);
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertNode(make_node(1, "old", 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let node = sources.find_node(1).unwrap().unwrap();
assert_eq!(node.key, "updated");
}
#[test]
fn test_find_node_active_tombstone_shadows_immutable() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertNode(make_node(1, "a", 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(sources.find_node(1).unwrap().is_none());
}
#[test]
fn test_find_node_newer_immutable_wins() {
let active = Memtable::new();
let mut imm0 = Memtable::new(); imm0.apply_op(&WalOp::UpsertNode(make_node(1, "newer", 1)), 2);
let mut imm1 = Memtable::new(); imm1.apply_op(&WalOp::UpsertNode(make_node(1, "older", 1)), 1);
let imm_slice = [wrap_imm(imm0), wrap_imm(imm1)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let node = sources.find_node(1).unwrap().unwrap();
assert_eq!(node.key, "newer");
}
#[test]
fn test_find_node_immutable_tombstone_shadows_older_immutable() {
let active = Memtable::new();
let mut imm0 = Memtable::new(); imm0.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm1 = Memtable::new(); imm1.apply_op(&WalOp::UpsertNode(make_node(1, "a", 1)), 1);
let imm_slice = [wrap_imm(imm0), wrap_imm(imm1)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(sources.find_node(1).unwrap().is_none());
}
#[test]
fn test_find_edge_active_memtable() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 10, 20, 1)), 1);
let sources = SourceList {
active: &mt,
immutable: &[],
segments: &[],
};
let edge = sources.find_edge(1).unwrap();
assert!(edge.is_some());
assert_eq!(edge.unwrap().from, 10);
}
#[test]
fn test_find_edge_tombstoned() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertEdge(make_edge(1, 10, 20, 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(sources.find_edge(1).unwrap().is_none());
}
#[test]
fn test_find_node_by_key_active() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, "alice", 1)), 1);
let sources = SourceList {
active: &mt,
immutable: &[],
segments: &[],
};
let node = sources.find_node_by_key(1, "alice").unwrap();
assert!(node.is_some());
assert_eq!(node.unwrap().id, 1);
assert!(sources.find_node_by_key(2, "alice").unwrap().is_none());
assert!(sources.find_node_by_key(1, "bob").unwrap().is_none());
}
#[test]
fn test_find_node_by_key_tombstoned_in_newer_source() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertNode(make_node(1, "alice", 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(sources.find_node_by_key(1, "alice").unwrap().is_none());
}
#[test]
fn test_find_edge_by_triple_active() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 10, 20, 1)), 1);
let sources = SourceList {
active: &mt,
immutable: &[],
segments: &[],
};
let edge = sources.find_edge_by_triple(10, 20, 1).unwrap();
assert!(edge.is_some());
assert_eq!(edge.unwrap().id, 1);
assert!(sources.find_edge_by_triple(10, 20, 2).unwrap().is_none());
}
#[test]
fn test_find_edge_by_triple_tombstoned_in_newer_source() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm = Memtable::new();
imm.apply_op(&WalOp::UpsertEdge(make_edge(1, 10, 20, 1)), 1);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(sources.find_edge_by_triple(10, 20, 1).unwrap().is_none());
}
#[test]
fn test_is_node_deleted() {
let mut active = Memtable::new();
active.apply_op(&WalOp::UpsertNode(make_node(1, "live", 1)), 1);
active.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 2000,
},
2,
);
let sources = SourceList {
active: &active,
immutable: &[],
segments: &[],
};
assert!(!sources.is_node_deleted(1)); assert!(sources.is_node_deleted(2)); assert!(!sources.is_node_deleted(3)); }
#[test]
fn test_is_edge_deleted() {
let mut active = Memtable::new();
active.apply_op(&WalOp::UpsertEdge(make_edge(1, 10, 20, 1)), 1);
active.apply_op(
&WalOp::DeleteEdge {
id: 2,
deleted_at: 2000,
},
2,
);
let sources = SourceList {
active: &active,
immutable: &[],
segments: &[],
};
assert!(!sources.is_edge_deleted(1));
assert!(sources.is_edge_deleted(2));
assert!(!sources.is_edge_deleted(3));
}
#[test]
fn test_is_node_deleted_active_live_overrides_immutable_tombstone() {
let mut active = Memtable::new();
active.apply_op(&WalOp::UpsertNode(make_node(1, "revived", 1)), 3);
let mut imm = Memtable::new();
imm.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
assert!(!sources.is_node_deleted(1));
}
#[test]
fn test_collect_deleted_nodes_across_sources() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 1000,
},
1,
);
let mut imm = Memtable::new();
imm.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 1000,
},
2,
);
imm.apply_op(
&WalOp::DeleteNode {
id: 3,
deleted_at: 1000,
},
3,
);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let deleted = sources.collect_deleted_nodes();
assert!(deleted.contains(&1));
assert!(deleted.contains(&2));
assert!(deleted.contains(&3));
assert_eq!(deleted.len(), 3);
}
#[test]
fn test_collect_deleted_edges_across_sources() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteEdge {
id: 10,
deleted_at: 1000,
},
1,
);
let mut imm = Memtable::new();
imm.apply_op(
&WalOp::DeleteEdge {
id: 20,
deleted_at: 1000,
},
2,
);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let deleted = sources.collect_deleted_edges();
assert!(deleted.contains(&10));
assert!(deleted.contains(&20));
assert_eq!(deleted.len(), 2);
}
#[test]
fn test_collect_deleted_nodes_deduplicates() {
let mut active = Memtable::new();
active.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 2000,
},
2,
);
let mut imm = Memtable::new();
imm.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 1000,
},
1,
);
let imm_slice = [wrap_imm(imm)];
let sources = SourceList {
active: &active,
immutable: &imm_slice,
segments: &[],
};
let deleted = sources.collect_deleted_nodes();
assert!(deleted.contains(&1));
assert_eq!(deleted.len(), 1);
}
}