use crate::key::{Dep, DynKey, QueryKindId};
use crate::revision::Revision;
use crate::runtime::{Runtime, RuntimeEvent};
use std::collections::{HashMap, HashSet};
use std::io::{self, Write};
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub struct DependencyGraph {
pub forward_deps: HashMap<DynKey, Vec<Dep>>,
pub reverse_deps: HashMap<DynKey, Vec<DynKey>>,
}
impl DependencyGraph {
pub fn from_runtime(runtime: &Runtime) -> Self {
let forward_deps = runtime
.deps_by_query_snapshot()
.into_iter()
.map(|(key, deps)| (key, deps.to_vec()))
.collect();
let reverse_deps = runtime
.reverse_deps_snapshot()
.into_iter()
.map(|(key, dependents)| (key, dependents.into_iter().collect()))
.collect();
Self {
forward_deps,
reverse_deps,
}
}
pub fn write_dot<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
let mut file = std::fs::File::create(path)?;
self.write_dot_to(&mut file)
}
pub fn write_dot_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writeln!(writer, "digraph dependencies {{")?;
writeln!(writer, " rankdir=LR;")?;
writeln!(writer, " node [shape=box];")?;
writeln!(writer)?;
let mut all_nodes = HashSet::new();
for (query, deps) in &self.forward_deps {
all_nodes.insert(query.clone());
for dep in deps {
all_nodes.insert(DynKey {
kind: dep.kind,
key: dep.key.clone(),
});
}
}
for node in &all_nodes {
let node_id = format!("{}_{:x}", node.kind.0, node.key.hash());
let label = format!("kind_{}\\nkey_{:x}", node.kind.0, node.key.hash());
writeln!(writer, " {} [label=\"{}\"];", node_id, label)?;
}
writeln!(writer)?;
for (query, deps) in &self.forward_deps {
let query_id = format!("{}_{:x}", query.kind.0, query.key.hash());
for dep in deps {
let dep_id = format!("{}_{:x}", dep.kind.0, dep.key.hash());
writeln!(writer, " {} -> {};", query_id, dep_id)?;
}
}
writeln!(writer, "}}")?;
Ok(())
}
pub fn root_queries(&self) -> Vec<DynKey> {
self.forward_deps
.iter()
.filter(|(_, deps)| deps.is_empty())
.map(|(key, _)| key.clone())
.collect()
}
pub fn leaf_queries(&self) -> Vec<DynKey> {
let mut all_queries: HashSet<DynKey> = self.forward_deps.keys().cloned().collect();
for dependents in self.reverse_deps.values() {
for dependent in dependents {
all_queries.remove(dependent);
}
}
all_queries.into_iter().collect()
}
pub fn find_paths(&self, start: &DynKey, end: &DynKey) -> Vec<Vec<Dep>> {
let mut paths = Vec::new();
let mut current_path = Vec::new();
let mut visited = HashSet::new();
self.find_paths_recursive(start, end, &mut current_path, &mut visited, &mut paths, 0);
paths
}
fn find_paths_recursive(
&self,
current: &DynKey,
end: &DynKey,
path: &mut Vec<Dep>,
visited: &mut HashSet<DynKey>,
results: &mut Vec<Vec<Dep>>,
depth: usize,
) {
const MAX_DEPTH: usize = 1000;
if depth >= MAX_DEPTH {
return;
}
if current == end {
results.push(path.clone());
return;
}
if visited.contains(current) {
return;
}
visited.insert(current.clone());
if let Some(deps) = self.forward_deps.get(current) {
for dep in deps {
let dep_key = DynKey {
kind: dep.kind,
key: dep.key.clone(),
};
path.push(dep.clone());
self.find_paths_recursive(&dep_key, end, path, visited, results, depth + 1);
path.pop();
}
}
visited.remove(current);
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub forward_deps_count: usize,
pub reverse_deps_count: usize,
pub total_dependency_edges: usize,
pub root_query_count: usize,
pub dep_count_histogram: HashMap<usize, usize>,
}
impl CacheStats {
pub fn collect(runtime: &Runtime) -> Self {
let forward = runtime.deps_by_query_snapshot();
let reverse = runtime.reverse_deps_snapshot();
let forward_deps_count = forward.len();
let reverse_deps_count = reverse.len();
let total_dependency_edges: usize = forward.values().map(|deps| deps.len()).sum();
let root_query_count = forward.values().filter(|deps| deps.is_empty()).count();
let mut dep_count_histogram = HashMap::new();
for deps in forward.values() {
*dep_count_histogram.entry(deps.len()).or_insert(0) += 1;
}
Self {
forward_deps_count,
reverse_deps_count,
total_dependency_edges,
root_query_count,
dep_count_histogram,
}
}
pub fn format(&self) -> String {
let mut s = String::new();
s.push_str("Cache Statistics:\n");
s.push_str(&format!(" Forward deps: {}\n", self.forward_deps_count));
s.push_str(&format!(" Reverse deps: {}\n", self.reverse_deps_count));
s.push_str(&format!(" Total edges: {}\n", self.total_dependency_edges));
s.push_str(&format!(" Root queries: {}\n", self.root_query_count));
if !self.dep_count_histogram.is_empty() {
s.push_str("\n Dependency count distribution:\n");
let mut counts: Vec<_> = self.dep_count_histogram.iter().collect();
counts.sort_by_key(|(count, _)| *count);
for (count, queries) in counts {
s.push_str(&format!(" {} deps: {} queries\n", count, queries));
}
}
s
}
}
#[derive(Debug, Clone)]
pub enum TraceEvent {
RevisionBumped {
revision: Revision,
timestamp: Instant,
},
InputSet {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
timestamp: Instant,
},
InputRemoved {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
timestamp: Instant,
},
QueryInvalidated {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
by_kind: QueryKindId,
by_key_hash: u64,
timestamp: Instant,
},
QueryChanged {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
timestamp: Instant,
},
}
pub struct TraceCollector {
events: Arc<Mutex<Vec<TraceEvent>>>,
_handle: tokio::task::JoinHandle<()>,
}
impl TraceCollector {
pub fn start(runtime: &Runtime) -> Self {
let mut rx = runtime.subscribe_events();
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let handle = tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
let timestamp = Instant::now();
let trace_event = match event {
RuntimeEvent::RevisionBumped { revision } => TraceEvent::RevisionBumped {
revision,
timestamp,
},
RuntimeEvent::RevisionSet { .. } => {
continue;
}
RuntimeEvent::InputSet {
revision,
kind,
key_hash,
..
} => TraceEvent::InputSet {
revision,
kind,
key_hash,
timestamp,
},
RuntimeEvent::InputRemoved {
revision,
kind,
key_hash,
..
} => TraceEvent::InputRemoved {
revision,
kind,
key_hash,
timestamp,
},
RuntimeEvent::QueryInvalidated {
revision,
kind,
key_hash,
by_kind,
by_key_hash,
..
} => TraceEvent::QueryInvalidated {
revision,
kind,
key_hash,
by_kind,
by_key_hash,
timestamp,
},
RuntimeEvent::QueryChanged {
revision,
kind,
key_hash,
..
} => TraceEvent::QueryChanged {
revision,
kind,
key_hash,
timestamp,
},
};
events_clone.lock().await.push(trace_event);
}
});
Self {
events,
_handle: handle,
}
}
pub async fn stop(self) -> Vec<TraceEvent> {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let events = self.events.lock().await;
events.clone()
}
pub async fn snapshot(&self) -> Vec<TraceEvent> {
self.events.lock().await.clone()
}
}
#[derive(Debug, Clone)]
pub struct TraceAnalysis {
pub total_events: usize,
pub input_changes: usize,
pub invalidations: usize,
pub recomputations: usize,
pub duration: Duration,
pub events_by_revision: HashMap<Revision, usize>,
}
impl TraceAnalysis {
pub fn from_trace(trace: &[TraceEvent]) -> Self {
if trace.is_empty() {
return Self {
total_events: 0,
input_changes: 0,
invalidations: 0,
recomputations: 0,
duration: Duration::ZERO,
events_by_revision: HashMap::new(),
};
}
let mut input_changes = 0;
let mut invalidations = 0;
let mut recomputations = 0;
let mut events_by_revision: HashMap<Revision, usize> = HashMap::new();
let first_timestamp = match trace.first() {
Some(TraceEvent::RevisionBumped { timestamp, .. })
| Some(TraceEvent::InputSet { timestamp, .. })
| Some(TraceEvent::InputRemoved { timestamp, .. })
| Some(TraceEvent::QueryInvalidated { timestamp, .. })
| Some(TraceEvent::QueryChanged { timestamp, .. }) => *timestamp,
None => Instant::now(),
};
let last_timestamp = match trace.last() {
Some(TraceEvent::RevisionBumped { timestamp, .. })
| Some(TraceEvent::InputSet { timestamp, .. })
| Some(TraceEvent::InputRemoved { timestamp, .. })
| Some(TraceEvent::QueryInvalidated { timestamp, .. })
| Some(TraceEvent::QueryChanged { timestamp, .. }) => *timestamp,
None => first_timestamp,
};
for event in trace {
let revision = match event {
TraceEvent::RevisionBumped { revision, .. }
| TraceEvent::InputSet { revision, .. }
| TraceEvent::InputRemoved { revision, .. }
| TraceEvent::QueryInvalidated { revision, .. }
| TraceEvent::QueryChanged { revision, .. } => *revision,
};
*events_by_revision.entry(revision).or_insert(0) += 1;
match event {
TraceEvent::InputSet { .. } | TraceEvent::InputRemoved { .. } => {
input_changes += 1;
}
TraceEvent::QueryInvalidated { .. } => {
invalidations += 1;
}
TraceEvent::QueryChanged { .. } => {
recomputations += 1;
}
_ => {}
}
}
Self {
total_events: trace.len(),
input_changes,
invalidations,
recomputations,
duration: last_timestamp.duration_since(first_timestamp),
events_by_revision,
}
}
pub fn format(&self) -> String {
let mut s = String::new();
s.push_str("Trace Analysis:\n");
s.push_str(&format!(" Total events: {}\n", self.total_events));
s.push_str(&format!(" Input changes: {}\n", self.input_changes));
s.push_str(&format!(" Invalidations: {}\n", self.invalidations));
s.push_str(&format!(" Recomputations: {}\n", self.recomputations));
s.push_str(&format!(" Duration: {:?}\n", self.duration));
if !self.events_by_revision.is_empty() {
s.push_str("\n Events by revision:\n");
let mut revisions: Vec<_> = self.events_by_revision.iter().collect();
revisions.sort_by_key(|(rev, _)| rev.0);
for (revision, count) in revisions {
s.push_str(&format!(" r{}: {} events\n", revision.0, count));
}
}
s
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dependency_graph_empty() {
let runtime = Runtime::new();
let graph = DependencyGraph::from_runtime(&runtime);
assert_eq!(graph.forward_deps.len(), 0);
assert_eq!(graph.reverse_deps.len(), 0);
}
#[test]
fn test_cache_stats_empty() {
let runtime = Runtime::new();
let stats = CacheStats::collect(&runtime);
assert_eq!(stats.forward_deps_count, 0);
assert_eq!(stats.reverse_deps_count, 0);
assert_eq!(stats.total_dependency_edges, 0);
assert_eq!(stats.root_query_count, 0);
}
#[test]
fn test_stats_format() {
let runtime = Runtime::new();
let stats = CacheStats::collect(&runtime);
let formatted = stats.format();
assert!(formatted.contains("Cache Statistics"));
assert!(formatted.contains("Forward deps: 0"));
}
#[test]
fn test_trace_analysis_empty() {
let trace: Vec<TraceEvent> = vec![];
let analysis = TraceAnalysis::from_trace(&trace);
assert_eq!(analysis.total_events, 0);
assert_eq!(analysis.input_changes, 0);
assert_eq!(analysis.invalidations, 0);
assert_eq!(analysis.recomputations, 0);
}
}