use std::collections::HashMap;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::{Deserialize, Serialize};
use super::super::file::FileId;
use super::super::node::NodeId;
use super::kind::EdgeKind;
use crate::graph::node::Span;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum DeltaOp {
#[default]
Add,
Remove,
}
impl fmt::Display for DeltaOp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Add => write!(f, "add"),
Self::Remove => write!(f, "remove"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeltaEdge {
pub source: NodeId,
pub target: NodeId,
pub kind: EdgeKind,
pub seq: u64,
pub op: DeltaOp,
pub file: FileId,
pub spans: Vec<Span>,
}
impl DeltaEdge {
#[must_use]
pub fn new(
source: NodeId,
target: NodeId,
kind: EdgeKind,
seq: u64,
op: DeltaOp,
file: FileId,
) -> Self {
Self {
source,
target,
kind,
seq,
op,
file,
spans: Vec::new(),
}
}
#[must_use]
pub fn with_spans(
source: NodeId,
target: NodeId,
kind: EdgeKind,
seq: u64,
op: DeltaOp,
file: FileId,
spans: Vec<Span>,
) -> Self {
Self {
source,
target,
kind,
seq,
op,
file,
spans,
}
}
#[must_use]
pub fn edge_key(&self) -> EdgeKey {
EdgeKey {
source: self.source,
target: self.target,
kind: self.kind.clone(),
}
}
#[must_use]
#[inline]
pub fn is_add(&self) -> bool {
self.op == DeltaOp::Add
}
#[must_use]
#[inline]
pub fn is_remove(&self) -> bool {
self.op == DeltaOp::Remove
}
#[must_use]
pub fn byte_size(&self) -> usize {
const FIXED_OVERHEAD: usize = 37;
const VEC_OVERHEAD: usize = std::mem::size_of::<Vec<crate::graph::node::Span>>();
let span_data_size = self.spans.len() * std::mem::size_of::<crate::graph::node::Span>();
FIXED_OVERHEAD + VEC_OVERHEAD + span_data_size + self.kind.estimated_size()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EdgeKey {
pub source: NodeId,
pub target: NodeId,
pub kind: EdgeKind,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeltaBuffer {
edges: HashMap<FileId, Vec<DeltaEdge>>,
edge_count: usize,
byte_size: usize,
#[serde(with = "atomic_u64_serde")]
seq_counter: AtomicU64,
}
impl DeltaBuffer {
#[must_use]
pub fn new() -> Self {
Self {
edges: HashMap::default(),
edge_count: 0,
byte_size: 0,
seq_counter: AtomicU64::new(0),
}
}
#[must_use]
pub fn with_capacity(file_count: usize) -> Self {
Self {
edges: HashMap::with_capacity(file_count),
edge_count: 0,
byte_size: 0,
seq_counter: AtomicU64::new(0),
}
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.edge_count
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.edge_count == 0
}
#[must_use]
#[inline]
pub fn byte_size(&self) -> usize {
self.byte_size
}
#[must_use]
#[inline]
pub fn file_count(&self) -> usize {
self.edges.len()
}
#[must_use]
pub fn current_seq(&self) -> u64 {
self.seq_counter.load(Ordering::Acquire)
}
pub fn next_seq(&self) -> u64 {
self.seq_counter.fetch_add(1, Ordering::AcqRel)
}
pub fn push(&mut self, edge: DeltaEdge) {
let size = edge.byte_size();
let file = edge.file;
self.edges.entry(file).or_default().push(edge);
self.edge_count += 1;
self.byte_size += size;
}
pub fn push_many(&mut self, edges: impl IntoIterator<Item = DeltaEdge>) {
for edge in edges {
self.push(edge);
}
}
pub fn iter(&self) -> impl Iterator<Item = &DeltaEdge> {
self.edges.values().flatten()
}
pub fn iter_file(&self, file: FileId) -> impl Iterator<Item = &DeltaEdge> {
self.edges.get(&file).into_iter().flat_map(|v| v.iter())
}
#[must_use]
pub fn edges_for_file(&self, file: FileId) -> &[DeltaEdge] {
self.edges.get(&file).map_or(&[], |v| v.as_slice())
}
pub fn files(&self) -> impl Iterator<Item = FileId> + '_ {
self.edges.keys().copied()
}
pub fn clear_file(&mut self, file: FileId) -> usize {
if let Some(edges) = self.edges.remove(&file) {
let count = edges.len();
let size: usize = edges.iter().map(DeltaEdge::byte_size).sum();
self.edge_count -= count;
self.byte_size -= size;
count
} else {
0
}
}
pub fn clear(&mut self) {
self.edges.clear();
self.edge_count = 0;
self.byte_size = 0;
}
pub fn reset_seq(&mut self, value: u64) {
self.seq_counter.store(value, Ordering::Release);
}
pub fn advance_seq_to(&self, min_value: u64) -> u64 {
loop {
let current = self.seq_counter.load(Ordering::Acquire);
if current >= min_value {
return current;
}
if self
.seq_counter
.compare_exchange_weak(current, min_value, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return min_value;
}
}
}
pub fn take_all(&mut self) -> HashMap<FileId, Vec<DeltaEdge>> {
self.edge_count = 0;
self.byte_size = 0;
std::mem::take(&mut self.edges)
}
#[must_use]
pub fn stats(&self) -> DeltaBufferStats {
DeltaBufferStats {
edge_count: self.edge_count,
byte_size: self.byte_size,
file_count: self.edges.len(),
current_seq: self.current_seq(),
}
}
}
impl Default for DeltaBuffer {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for DeltaBuffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DeltaBuffer(edges={}, bytes={}, files={})",
self.edge_count,
self.byte_size,
self.edges.len()
)
}
}
impl Clone for DeltaBuffer {
fn clone(&self) -> Self {
Self {
edges: self.edges.clone(),
edge_count: self.edge_count,
byte_size: self.byte_size,
seq_counter: AtomicU64::new(self.seq_counter.load(Ordering::SeqCst)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DeltaBufferStats {
pub edge_count: usize,
pub byte_size: usize,
pub file_count: usize,
pub current_seq: u64,
}
#[cfg(test)]
mod tests {
use super::super::kind::EdgeKind;
use super::*;
fn make_edge(source: u32, target: u32, seq: u64, op: DeltaOp, file: u32) -> DeltaEdge {
DeltaEdge::new(
NodeId::new(source, 0),
NodeId::new(target, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
seq,
op,
FileId::new(file),
)
}
#[test]
fn test_delta_op_default() {
assert_eq!(DeltaOp::default(), DeltaOp::Add);
}
#[test]
fn test_delta_op_display() {
assert_eq!(format!("{}", DeltaOp::Add), "add");
assert_eq!(format!("{}", DeltaOp::Remove), "remove");
}
#[test]
fn test_delta_edge_new() {
let edge = make_edge(1, 2, 42, DeltaOp::Add, 10);
assert_eq!(edge.source.index(), 1);
assert_eq!(edge.target.index(), 2);
assert_eq!(edge.seq, 42);
assert_eq!(edge.op, DeltaOp::Add);
assert_eq!(edge.file.index(), 10);
}
#[test]
fn test_delta_edge_is_add_remove() {
let add_edge = make_edge(1, 2, 0, DeltaOp::Add, 1);
let remove_edge = make_edge(1, 2, 0, DeltaOp::Remove, 1);
assert!(add_edge.is_add());
assert!(!add_edge.is_remove());
assert!(!remove_edge.is_add());
assert!(remove_edge.is_remove());
}
#[test]
fn test_delta_edge_edge_key() {
let edge1 = make_edge(1, 2, 100, DeltaOp::Add, 1);
let edge2 = make_edge(1, 2, 200, DeltaOp::Remove, 1);
let edge3 = make_edge(1, 3, 100, DeltaOp::Add, 1);
assert_eq!(edge1.edge_key(), edge2.edge_key());
assert_ne!(edge1.edge_key(), edge3.edge_key());
}
#[test]
fn test_delta_edge_byte_size() {
let edge = make_edge(1, 2, 0, DeltaOp::Add, 1);
assert!(edge.byte_size() > 30);
}
#[test]
fn test_delta_buffer_new() {
let buffer = DeltaBuffer::new();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.byte_size(), 0);
assert_eq!(buffer.file_count(), 0);
}
#[test]
fn test_delta_buffer_with_capacity() {
let buffer = DeltaBuffer::with_capacity(100);
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_delta_buffer_push() {
let mut buffer = DeltaBuffer::new();
let seq = buffer.next_seq();
let edge = make_edge(1, 2, seq, DeltaOp::Add, 10);
let size = edge.byte_size();
buffer.push(edge);
assert_eq!(buffer.len(), 1);
assert!(!buffer.is_empty());
assert_eq!(buffer.byte_size(), size);
assert_eq!(buffer.file_count(), 1);
}
#[test]
fn test_delta_buffer_push_multiple_files() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, buffer.next_seq(), DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, buffer.next_seq(), DeltaOp::Add, 20));
buffer.push(make_edge(5, 6, buffer.next_seq(), DeltaOp::Add, 10));
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.file_count(), 2);
}
#[test]
fn test_delta_buffer_sequence_monotonic() {
let buffer = DeltaBuffer::new();
let seq1 = buffer.next_seq();
let seq2 = buffer.next_seq();
let seq3 = buffer.next_seq();
assert_eq!(seq1, 0);
assert_eq!(seq2, 1);
assert_eq!(seq3, 2);
assert_eq!(buffer.current_seq(), 3);
}
#[test]
fn test_delta_buffer_iter() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, 1, DeltaOp::Add, 10));
let edges: Vec<_> = buffer.iter().collect();
assert_eq!(edges.len(), 2);
}
#[test]
fn test_delta_buffer_iter_file() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, 1, DeltaOp::Add, 20));
buffer.push(make_edge(5, 6, 2, DeltaOp::Add, 10));
let file10_edges: Vec<_> = buffer.iter_file(FileId::new(10)).collect();
assert_eq!(file10_edges.len(), 2);
let file20_edges: Vec<_> = buffer.iter_file(FileId::new(20)).collect();
assert_eq!(file20_edges.len(), 1);
let file30_edges: Vec<_> = buffer.iter_file(FileId::new(30)).collect();
assert_eq!(file30_edges.len(), 0);
}
#[test]
fn test_delta_buffer_edges_for_file() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, 1, DeltaOp::Add, 10));
let edges = buffer.edges_for_file(FileId::new(10));
assert_eq!(edges.len(), 2);
let empty = buffer.edges_for_file(FileId::new(99));
assert!(empty.is_empty());
}
#[test]
fn test_delta_buffer_files() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, 1, DeltaOp::Add, 20));
let files: Vec<_> = buffer.files().collect();
assert_eq!(files.len(), 2);
assert!(files.contains(&FileId::new(10)));
assert!(files.contains(&FileId::new(20)));
}
#[test]
fn test_delta_buffer_clear_file() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, 1, DeltaOp::Add, 20));
buffer.push(make_edge(5, 6, 2, DeltaOp::Add, 10));
let initial_size = buffer.byte_size();
let removed = buffer.clear_file(FileId::new(10));
assert_eq!(removed, 2);
assert_eq!(buffer.len(), 1);
assert_eq!(buffer.file_count(), 1);
assert!(buffer.byte_size() < initial_size);
}
#[test]
fn test_delta_buffer_clear() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, buffer.next_seq(), DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, buffer.next_seq(), DeltaOp::Add, 20));
let seq_before = buffer.current_seq();
buffer.clear();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.byte_size(), 0);
assert_eq!(buffer.file_count(), 0);
assert_eq!(buffer.current_seq(), seq_before);
}
#[test]
fn test_delta_buffer_reset_seq() {
let mut buffer = DeltaBuffer::new();
buffer.next_seq();
buffer.next_seq();
assert_eq!(buffer.current_seq(), 2);
buffer.reset_seq(100);
assert_eq!(buffer.current_seq(), 100);
assert_eq!(buffer.next_seq(), 100);
assert_eq!(buffer.current_seq(), 101);
}
#[test]
fn test_delta_buffer_take_all() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, buffer.next_seq(), DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, buffer.next_seq(), DeltaOp::Add, 20));
let seq_before = buffer.current_seq();
let taken = buffer.take_all();
assert_eq!(taken.len(), 2);
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.current_seq(), seq_before);
}
#[test]
fn test_delta_buffer_push_many() {
let mut buffer = DeltaBuffer::new();
let edges = vec![
make_edge(1, 2, 0, DeltaOp::Add, 10),
make_edge(3, 4, 1, DeltaOp::Add, 10),
make_edge(5, 6, 2, DeltaOp::Add, 20),
];
buffer.push_many(edges);
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.file_count(), 2);
}
#[test]
fn test_delta_buffer_stats() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, buffer.next_seq(), DeltaOp::Add, 10));
buffer.push(make_edge(3, 4, buffer.next_seq(), DeltaOp::Add, 20));
let stats = buffer.stats();
assert_eq!(stats.edge_count, 2);
assert!(stats.byte_size > 0);
assert_eq!(stats.file_count, 2);
assert_eq!(stats.current_seq, 2);
}
#[test]
fn test_delta_buffer_display() {
let mut buffer = DeltaBuffer::new();
buffer.push(make_edge(1, 2, 0, DeltaOp::Add, 10));
let display = format!("{buffer}");
assert!(display.contains("DeltaBuffer"));
assert!(display.contains("edges=1"));
}
#[test]
fn test_delta_buffer_default() {
let buffer: DeltaBuffer = DeltaBuffer::default();
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_delta_buffer_advance_seq_to() {
let buffer = DeltaBuffer::new();
assert_eq!(buffer.current_seq(), 0);
let result = buffer.advance_seq_to(10);
assert_eq!(result, 10);
assert_eq!(buffer.current_seq(), 10);
let result = buffer.advance_seq_to(5);
assert_eq!(result, 10);
assert_eq!(buffer.current_seq(), 10);
let result = buffer.advance_seq_to(15);
assert_eq!(result, 15);
assert_eq!(buffer.current_seq(), 15);
assert_eq!(buffer.next_seq(), 15);
assert_eq!(buffer.current_seq(), 16);
}
}
mod atomic_u64_serde {
use std::sync::atomic::{AtomicU64, Ordering};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(value: &AtomicU64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
value.load(Ordering::SeqCst).serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<AtomicU64, D::Error>
where
D: Deserializer<'de>,
{
let value = u64::deserialize(deserializer)?;
Ok(AtomicU64::new(value))
}
}