use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use super::super::file::FileId;
use super::super::node::NodeId;
use super::super::storage::CsrGraph;
use super::delta::{DeltaBuffer, DeltaEdge, DeltaOp, EdgeKey};
use super::kind::EdgeKind;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EdgeStoreError {
InvalidNode(NodeId),
CsrError(String),
DeltaBufferFull {
current_bytes: usize,
requested_bytes: usize,
limit: usize,
},
EdgeSizeExceeded {
edge_bytes: usize,
reservation_bytes: usize,
},
}
impl std::fmt::Display for EdgeStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidNode(id) => write!(f, "invalid node: {id:?}"),
Self::CsrError(msg) => write!(f, "CSR error: {msg}"),
Self::DeltaBufferFull {
current_bytes,
requested_bytes,
limit,
} => write!(
f,
"delta buffer full: {current_bytes} + {requested_bytes} > {limit} bytes"
),
Self::EdgeSizeExceeded {
edge_bytes,
reservation_bytes,
} => write!(
f,
"edge size {edge_bytes} exceeds reservation {reservation_bytes} bytes"
),
}
}
}
impl std::error::Error for EdgeStoreError {}
#[derive(Debug, Clone, PartialEq)]
pub struct StoreEdgeRef {
pub source: NodeId,
pub target: NodeId,
pub kind: EdgeKind,
pub seq: u64,
pub file: FileId,
pub spans: Vec<crate::graph::node::Span>,
}
type DeltaFromEntry = (
u64,
bool,
NodeId,
EdgeKind,
FileId,
Vec<crate::graph::node::Span>,
);
type DeltaToEntry = (u64, bool, NodeId, FileId, Vec<crate::graph::node::Span>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeStore {
csr: Option<CsrGraph>,
csr_tombstones: Vec<bool>,
csr_version: u64,
delta: DeltaBuffer,
}
impl EdgeStore {
#[must_use]
pub fn new() -> Self {
Self {
csr: None,
csr_tombstones: Vec::new(),
csr_version: 0,
delta: DeltaBuffer::new(),
}
}
#[must_use]
pub fn with_csr(csr: CsrGraph) -> Self {
let edge_count = csr.edge_count();
Self {
csr: Some(csr),
csr_tombstones: vec![false; edge_count],
csr_version: 1,
delta: DeltaBuffer::new(),
}
}
#[must_use]
pub fn csr(&self) -> Option<&CsrGraph> {
self.csr.as_ref()
}
#[must_use]
pub fn delta(&self) -> &DeltaBuffer {
&self.delta
}
pub fn delta_mut(&mut self) -> &mut DeltaBuffer {
&mut self.delta
}
#[must_use]
pub fn csr_version(&self) -> u64 {
self.csr_version
}
#[must_use]
pub fn delta_count(&self) -> usize {
self.delta.len()
}
#[must_use]
pub fn seq_counter(&self) -> u64 {
self.delta.current_seq()
}
#[must_use]
pub fn edge_count_approx(&self) -> usize {
let csr_edges = self
.csr
.as_ref()
.map_or(0, super::super::storage::csr::CsrGraph::edge_count);
let tombstones = self.csr_tombstones.iter().filter(|&&t| t).count();
let delta_adds = self.delta.iter().filter(|e| e.is_add()).count();
csr_edges.saturating_sub(tombstones) + delta_adds
}
pub fn add_edge(
&mut self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
) -> DeltaEdge {
self.add_edge_with_spans(source, target, kind, file, Vec::new())
}
pub fn add_edge_with_spans(
&mut self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
spans: Vec<crate::graph::node::Span>,
) -> DeltaEdge {
let seq = self.delta.next_seq();
let edge = DeltaEdge::with_spans(source, target, kind, seq, DeltaOp::Add, file, spans);
self.delta.push(edge.clone());
edge
}
pub fn remove_edge(
&mut self,
source: NodeId,
target: NodeId,
kind: EdgeKind,
file: FileId,
) -> DeltaEdge {
self.tombstone_csr_edge(source, target, &kind);
let seq = self.delta.next_seq();
let edge = DeltaEdge::new(source, target, kind, seq, DeltaOp::Remove, file);
self.delta.push(edge.clone());
edge
}
fn tombstone_csr_edge(&mut self, source: NodeId, target: NodeId, kind: &EdgeKind) {
let Some(ref csr) = self.csr else {
return;
};
for edge_ref in csr.edges_of(source.index()) {
if edge_ref.target == target && edge_ref.kind == *kind {
if edge_ref.index < self.csr_tombstones.len() {
self.csr_tombstones[edge_ref.index] = true;
}
break;
}
}
}
pub fn push_committed(
&mut self,
edges: Vec<DeltaEdge>,
reservation_bytes: usize,
) -> Result<usize, EdgeStoreError> {
let edge_bytes: usize = edges.iter().map(DeltaEdge::byte_size).sum();
if edge_bytes > reservation_bytes {
return Err(EdgeStoreError::EdgeSizeExceeded {
edge_bytes,
reservation_bytes,
});
}
let max_seq = edges.iter().map(|e| e.seq).max();
for edge in edges {
self.delta.push(edge);
}
if let Some(max) = max_seq {
self.delta.advance_seq_to(max + 1);
}
Ok(edge_bytes)
}
fn update_delta_lww_from_edge(
delta_lww: &mut HashMap<EdgeKey, DeltaFromEntry>,
edge: &DeltaEdge,
) {
let key = edge.edge_key();
if delta_lww
.get(&key)
.is_some_and(|(existing_seq, _, _, _, _, _)| *existing_seq >= edge.seq)
{
return;
}
delta_lww.insert(
key,
(
edge.seq,
edge.is_add(),
edge.target,
edge.kind.clone(),
edge.file,
edge.spans.clone(),
),
);
}
fn update_delta_lww_to_edge(delta_lww: &mut HashMap<EdgeKey, DeltaToEntry>, edge: &DeltaEdge) {
let key = edge.edge_key();
if delta_lww
.get(&key)
.is_some_and(|(existing_seq, _, _, _, _)| *existing_seq >= edge.seq)
{
return;
}
delta_lww.insert(
key,
(
edge.seq,
edge.is_add(),
edge.source,
edge.file,
edge.spans.clone(),
),
);
}
fn build_delta_lww_from_source(&self, source_idx: u32) -> HashMap<EdgeKey, DeltaFromEntry> {
let mut delta_lww = HashMap::new();
for edge in self.delta.iter() {
if edge.source.index() == source_idx {
Self::update_delta_lww_from_edge(&mut delta_lww, edge);
}
}
delta_lww
}
fn build_delta_lww_to_target(&self, target: NodeId) -> HashMap<EdgeKey, DeltaToEntry> {
let mut delta_lww = HashMap::new();
for edge in self.delta.iter() {
if edge.target == target {
Self::update_delta_lww_to_edge(&mut delta_lww, edge);
}
}
delta_lww
}
fn csr_edge_shadowed_by_delta(
source: NodeId,
edge_ref: &super::super::storage::csr::EdgeRef,
delta_lww: &HashMap<EdgeKey, DeltaFromEntry>,
) -> bool {
let key = EdgeKey {
source,
target: edge_ref.target,
kind: edge_ref.kind.clone(),
};
delta_lww
.get(&key)
.is_some_and(|(delta_seq, _, _, _, _, _)| *delta_seq > edge_ref.seq)
}
fn csr_has_edge_with_seq_at_least(
&self,
source_idx: u32,
target: NodeId,
kind: &EdgeKind,
seq: u64,
) -> bool {
self.csr.as_ref().is_some_and(|csr| {
csr.edges_of(source_idx).any(|edge_ref| {
edge_ref.target == target && edge_ref.kind == *kind && edge_ref.seq >= seq
})
})
}
fn append_csr_edges_from(
&self,
source: NodeId,
source_idx: u32,
delta_lww: &HashMap<EdgeKey, DeltaFromEntry>,
result: &mut Vec<StoreEdgeRef>,
) {
let Some(ref csr) = self.csr else {
return;
};
for edge_ref in csr.edges_of(source_idx) {
if self.is_edge_tombstoned(edge_ref.index) {
continue;
}
if Self::csr_edge_shadowed_by_delta(source, &edge_ref, delta_lww) {
continue;
}
result.push(StoreEdgeRef {
source, target: edge_ref.target,
kind: edge_ref.kind,
seq: edge_ref.seq,
file: FileId::INVALID,
spans: edge_ref.spans.clone(),
});
}
}
fn append_delta_edges_from(
&self,
delta_lww: HashMap<EdgeKey, DeltaFromEntry>,
result: &mut Vec<StoreEdgeRef>,
) {
for (key, (seq, is_add, target, kind, file, spans)) in delta_lww {
if !is_add {
continue;
}
if self.csr_has_edge_with_seq_at_least(key.source.index(), target, &kind, seq) {
continue;
}
result.push(StoreEdgeRef {
source: key.source,
target,
kind,
seq,
file,
spans,
});
}
}
pub fn edges_from(&self, source: NodeId) -> Vec<StoreEdgeRef> {
let source_idx = source.index();
let delta_lww = self.build_delta_lww_from_source(source_idx);
let mut result = Vec::new();
self.append_csr_edges_from(source, source_idx, &delta_lww, &mut result);
self.append_delta_edges_from(delta_lww, &mut result);
result
}
pub fn edges_to(&self, target: NodeId) -> Vec<StoreEdgeRef> {
let delta_lww = self.build_delta_lww_to_target(target);
delta_lww
.into_iter()
.filter_map(|(key, (seq, is_add, source, file, spans))| {
if is_add {
Some(StoreEdgeRef {
source, target,
kind: key.kind,
seq,
file, spans,
})
} else {
None
}
})
.collect()
}
pub fn has_edge(&self, source: NodeId, target: NodeId, kind: &EdgeKind) -> bool {
if let Some(exists) = self.check_delta_for_edge(source, target, kind) {
return exists;
}
self.check_csr_for_edge(source, target, kind)
}
fn check_delta_for_edge(
&self,
source: NodeId,
target: NodeId,
kind: &EdgeKind,
) -> Option<bool> {
let mut latest_seq: Option<u64> = None;
let mut latest_is_add = false;
for edge in self.delta.iter() {
if Self::delta_edge_matches(edge, source, target, kind)
&& Self::should_update_latest_seq(latest_seq, edge.seq)
{
latest_seq = Some(edge.seq);
latest_is_add = edge.is_add();
}
}
latest_seq.map(|_| latest_is_add)
}
fn delta_edge_matches(
edge: &DeltaEdge,
source: NodeId,
target: NodeId,
kind: &EdgeKind,
) -> bool {
edge.source == source && edge.target == target && &edge.kind == kind
}
fn should_update_latest_seq(latest_seq: Option<u64>, candidate_seq: u64) -> bool {
latest_seq.is_none_or(|latest| candidate_seq > latest)
}
fn check_csr_for_edge(&self, source: NodeId, target: NodeId, kind: &EdgeKind) -> bool {
let Some(ref csr) = self.csr else {
return false;
};
for edge_ref in csr.edges_of(source.index()) {
if Self::csr_edge_matches(&edge_ref, target, kind) && self.csr_edge_is_live(&edge_ref) {
return true;
}
}
false
}
fn csr_edge_matches(
edge_ref: &super::super::storage::csr::EdgeRef,
target: NodeId,
kind: &EdgeKind,
) -> bool {
edge_ref.target == target && &edge_ref.kind == kind
}
fn csr_edge_is_live(&self, edge_ref: &super::super::storage::csr::EdgeRef) -> bool {
edge_ref.index < self.csr_tombstones.len() && !self.csr_tombstones[edge_ref.index]
}
pub fn clear_file(&mut self, file: FileId) -> usize {
self.delta.clear_file(file)
}
#[must_use]
pub fn stats(&self) -> EdgeStoreStats {
EdgeStoreStats {
csr_edge_count: self
.csr
.as_ref()
.map_or(0, super::super::storage::csr::CsrGraph::edge_count),
csr_version: self.csr_version,
tombstone_count: self.csr_tombstones.iter().filter(|&&t| t).count(),
delta_edge_count: self.delta.len(),
delta_byte_size: self.delta.byte_size(),
delta_file_count: self.delta.file_count(),
}
}
#[must_use]
pub fn is_edge_tombstoned(&self, edge_index: usize) -> bool {
self.csr_tombstones
.get(edge_index)
.copied()
.unwrap_or(false)
}
pub fn swap_csr(&mut self, new_csr: CsrGraph) {
let edge_count = new_csr.edge_count();
self.csr = Some(new_csr);
self.csr_tombstones = vec![false; edge_count];
self.csr_version += 1;
}
pub fn swap_csr_returning_old(
&mut self,
new_csr: CsrGraph,
) -> (Option<CsrGraph>, Vec<bool>, u64) {
let edge_count = new_csr.edge_count();
let old_csr = self.csr.replace(new_csr);
let old_tombstones = std::mem::replace(&mut self.csr_tombstones, vec![false; edge_count]);
self.csr_version += 1;
(old_csr, old_tombstones, self.csr_version)
}
pub fn restore_csr(&mut self, old_csr: Option<CsrGraph>, old_tombstones: Vec<bool>) {
self.csr = old_csr;
self.csr_tombstones = old_tombstones;
self.csr_version = self.csr_version.saturating_sub(1);
}
pub fn clear_delta(&mut self) {
self.delta.clear();
}
pub fn take_delta(&mut self) -> HashMap<FileId, Vec<DeltaEdge>> {
self.delta.take_all()
}
}
impl Default for EdgeStore {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EdgeStoreStats {
pub csr_edge_count: usize,
pub csr_version: u64,
pub tombstone_count: usize,
pub delta_edge_count: usize,
pub delta_byte_size: usize,
pub delta_file_count: usize,
}
#[cfg(test)]
mod tests {
use super::super::super::storage::CsrBuilder;
use super::*;
fn make_csr() -> CsrGraph {
let mut builder = CsrBuilder::new(3);
builder
.add_edge(
0,
NodeId::new(1, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
1,
vec![],
)
.unwrap();
builder
.add_edge(
0,
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
2,
vec![],
)
.unwrap();
builder
.add_edge(
1,
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
3,
vec![],
)
.unwrap();
builder.build().unwrap()
}
#[test]
fn test_edge_store_new() {
let store = EdgeStore::new();
assert!(store.csr().is_none());
assert_eq!(store.delta().len(), 0);
assert_eq!(store.csr_version(), 0);
}
#[test]
fn test_edge_store_with_csr() {
let csr = make_csr();
let edge_count = csr.edge_count();
let store = EdgeStore::with_csr(csr);
assert!(store.csr().is_some());
assert_eq!(store.csr_version(), 1);
assert_eq!(store.stats().csr_edge_count, edge_count);
assert_eq!(store.stats().tombstone_count, 0);
}
#[test]
fn test_add_edge() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
let edge = store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert_eq!(edge.source, source);
assert_eq!(edge.target, target);
assert!(edge.is_add());
assert_eq!(store.delta().len(), 1);
}
#[test]
fn test_add_multiple_edges() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::References,
file,
);
assert_eq!(store.delta().len(), 3);
}
#[test]
fn test_edges_from_delta_only() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let file = FileId::new(10);
store.add_edge(
source,
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
source,
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(4, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let edges = store.edges_from(source);
assert_eq!(edges.len(), 2);
}
#[test]
fn test_remove_edge_delta_only() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let remove = store.remove_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert!(remove.is_remove());
assert_eq!(store.delta().len(), 2);
assert!(!store.has_edge(
source,
target,
&EdgeKind::Calls {
argument_count: 0,
is_async: false
}
));
}
#[test]
fn test_has_edge_in_delta() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
assert!(!store.has_edge(
source,
target,
&EdgeKind::Calls {
argument_count: 0,
is_async: false
}
));
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert!(store.has_edge(
source,
target,
&EdgeKind::Calls {
argument_count: 0,
is_async: false
}
));
assert!(!store.has_edge(source, target, &EdgeKind::References));
}
#[test]
fn test_clear_file() {
let mut store = EdgeStore::new();
let file1 = FileId::new(10);
let file2 = FileId::new(20);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file1,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file2,
);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file1,
);
assert_eq!(store.delta().len(), 3);
let removed = store.clear_file(file1);
assert_eq!(removed, 2);
assert_eq!(store.delta().len(), 1);
}
#[test]
fn test_stats() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let stats = store.stats();
assert_eq!(stats.csr_edge_count, 0);
assert_eq!(stats.csr_version, 0);
assert_eq!(stats.tombstone_count, 0);
assert_eq!(stats.delta_edge_count, 2);
assert!(stats.delta_byte_size > 0);
assert_eq!(stats.delta_file_count, 1);
}
#[test]
fn test_swap_csr() {
let mut store = EdgeStore::new();
assert_eq!(store.csr_version(), 0);
let csr = make_csr();
store.swap_csr(csr);
assert_eq!(store.csr_version(), 1);
assert!(store.csr().is_some());
assert_eq!(store.stats().tombstone_count, 0);
}
#[test]
fn test_clear_delta() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
assert_eq!(store.delta().len(), 2);
store.clear_delta();
assert_eq!(store.delta().len(), 0);
}
#[test]
fn test_take_delta() {
let mut store = EdgeStore::new();
let file1 = FileId::new(10);
let file2 = FileId::new(20);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file1,
);
store.add_edge(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file2,
);
let taken = store.take_delta();
assert_eq!(taken.len(), 2); assert_eq!(store.delta().len(), 0);
}
#[test]
fn test_default() {
let store: EdgeStore = EdgeStore::default();
assert!(store.csr().is_none());
}
#[test]
fn test_edge_store_error_display() {
let err1 = EdgeStoreError::InvalidNode(NodeId::new(42, 1));
assert!(format!("{err1}").contains("invalid node"));
let err2 = EdgeStoreError::CsrError("test error".to_string());
assert!(format!("{err2}").contains("CSR error"));
let err3 = EdgeStoreError::DeltaBufferFull {
current_bytes: 100,
requested_bytes: 50,
limit: 120,
};
assert!(format!("{err3}").contains("delta buffer full"));
let err4 = EdgeStoreError::EdgeSizeExceeded {
edge_bytes: 200,
reservation_bytes: 100,
};
assert!(format!("{err4}").contains("edge size"));
assert!(format!("{err4}").contains("exceeds reservation"));
}
#[test]
fn test_edges_to() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
let target = NodeId::new(5, 0);
store.add_edge(
NodeId::new(1, 0),
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(2, 0),
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
NodeId::new(1, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let edges_to_target = store.edges_to(target);
assert_eq!(edges_to_target.len(), 2);
}
#[test]
fn test_push_committed_validates_size() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
let edge1 = DeltaEdge::new(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
1,
DeltaOp::Add,
file,
);
let edge2 = DeltaEdge::new(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
2,
DeltaOp::Add,
file,
);
let edge_batch = vec![edge1.clone(), edge2.clone()];
let actual_size = edge1.byte_size() + edge2.byte_size();
let result = store.push_committed(edge_batch, actual_size - 1);
assert!(result.is_err());
assert_eq!(store.delta().len(), 0);
}
#[test]
fn test_push_committed_accepts_valid() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
let edge1 = DeltaEdge::new(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
1,
DeltaOp::Add,
file,
);
let edge2 = DeltaEdge::new(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
2,
DeltaOp::Add,
file,
);
let actual_size = edge1.byte_size() + edge2.byte_size();
let edge_batch = vec![edge1, edge2];
let result = store.push_committed(edge_batch.clone(), actual_size);
assert!(result.is_ok());
assert_eq!(result.unwrap(), actual_size);
assert_eq!(store.delta().len(), 2);
let mut store2 = EdgeStore::new();
let result2 = store2.push_committed(edge_batch, actual_size + 100);
assert!(result2.is_ok());
assert_eq!(result2.unwrap(), actual_size);
}
#[test]
fn test_edge_exceeds_reservation_error() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
let edge = DeltaEdge::new(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
1,
DeltaOp::Add,
file,
);
let edge_bytes = edge.byte_size();
let reservation_bytes = edge_bytes - 1;
let result = store.push_committed(vec![edge], reservation_bytes);
assert!(matches!(
result,
Err(EdgeStoreError::EdgeSizeExceeded {
edge_bytes: eb,
reservation_bytes: rb,
}) if eb == edge_bytes && rb == reservation_bytes
));
}
#[test]
fn test_edges_from_applies_lww_removes() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.remove_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let edges = store.edges_from(source);
assert!(
edges.is_empty(),
"edges_from should not return removed edges, got {edges:?}"
);
}
#[test]
fn test_edges_from_lww_add_after_remove() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.remove_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let edges = store.edges_from(source);
assert_eq!(edges.len(), 1, "should have 1 edge after add->remove->add");
}
#[test]
fn test_edges_to_applies_lww_removes() {
let mut store = EdgeStore::new();
let source = NodeId::new(1, 0);
let target = NodeId::new(2, 0);
let file = FileId::new(10);
store.add_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
store.remove_edge(
source,
target,
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
file,
);
let edges = store.edges_to(target);
assert!(
edges.is_empty(),
"edges_to should not return removed edges, got {edges:?}"
);
}
#[test]
fn test_push_committed_advances_seq_counter() {
let mut store = EdgeStore::new();
let file = FileId::new(10);
let edge1 = DeltaEdge::new(
NodeId::new(1, 0),
NodeId::new(2, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
100, DeltaOp::Add,
file,
);
let edge2 = DeltaEdge::new(
NodeId::new(2, 0),
NodeId::new(3, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
50, DeltaOp::Add,
file,
);
let edge_batch = vec![edge1.clone(), edge2.clone()];
let reservation = edge1.byte_size() + edge2.byte_size() + 100;
assert_eq!(store.delta().current_seq(), 0);
store.push_committed(edge_batch, reservation).unwrap();
assert_eq!(
store.delta().current_seq(),
101,
"seq counter should be advanced to max(pushed_seq) + 1"
);
let next = store.delta().next_seq();
assert_eq!(next, 101, "next_seq should return the advanced value");
}
#[test]
fn test_push_committed_empty_preserves_counter() {
let mut store = EdgeStore::new();
store.delta().next_seq(); store.delta().next_seq(); assert_eq!(store.delta().current_seq(), 2);
store.push_committed(vec![], 1000).unwrap();
assert_eq!(
store.delta().current_seq(),
2,
"empty push should not change counter"
);
}
}