use anyhow::Result;
use lazily::{Context as LazyContext, SlotHandle};
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
use crate::types::*;
pub(crate) fn graph_node_matches_filters(
node: &GraphNode,
filters: &[GraphPropertyFilter],
) -> bool {
filters
.iter()
.all(|filter| node.properties.get(&filter.key) == Some(&filter.value))
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct ScoredQueueEntry {
pub id: String,
pub depth: usize,
pub score: i64,
}
impl Ord for ScoredQueueEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.score
.cmp(&other.score)
.then_with(|| other.depth.cmp(&self.depth))
.then_with(|| self.id.cmp(&other.id))
}
}
impl PartialOrd for ScoredQueueEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
pub(crate) fn compute_neighborhood_score(
strategy: &NeighborhoodScoring,
depth: usize,
edge_kind: &str,
_node: &GraphNode,
degree_map: &BTreeMap<String, usize>,
) -> i64 {
match strategy {
NeighborhoodScoring::BreadthFirst => {
(120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0)
}
NeighborhoodScoring::EdgeKindWeighted => {
let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
let kind_score = edge_kind_weighted_score(edge_kind);
depth_score.saturating_add(kind_score)
}
NeighborhoodScoring::DegreeWeighted => {
let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
let degree = degree_map.values().copied().max().unwrap_or(1) as i64;
let degree_bonus = if degree <= 3 {
20
} else if degree <= 10 {
10
} else {
0
};
depth_score.saturating_add(degree_bonus)
}
}
}
#[derive(Clone)]
pub(crate) struct NeighborhoodLayerState {
pub nodes: BTreeMap<String, GraphNode>,
pub edges: BTreeMap<(String, String, String), GraphEdge>,
pub frontier: Vec<String>,
}
#[derive(Clone)]
pub(crate) struct NeighborhoodFetchedLayer {
pub edges: Vec<GraphEdge>,
pub nodes: Vec<GraphNode>,
}
#[derive(Clone)]
pub(crate) struct RankedNeighborhoodLayerState {
pub nodes: BTreeMap<String, GraphNode>,
pub edges: BTreeMap<(String, String, String), GraphEdge>,
pub queue: BinaryHeap<ScoredQueueEntry>,
pub seen: BTreeSet<String>,
pub pruned_count: usize,
pub total_discovered: usize,
pub degree_map: BTreeMap<String, usize>,
}
#[derive(Clone)]
pub(crate) struct RankedNeighborhoodFetchedExpansion {
pub edges: Vec<GraphEdge>,
pub neighbor_nodes: BTreeMap<String, GraphNode>,
}
pub(crate) fn graph_cache_error(message: String) -> anyhow::Error {
anyhow::anyhow!("{message}")
}
pub(crate) fn fetch_neighborhood_layer<S: GraphStore + ?Sized>(
store: &S,
frontier: &[String],
known_nodes: &BTreeMap<String, GraphNode>,
kind: Option<&str>,
) -> Result<NeighborhoodFetchedLayer> {
let mut edges = Vec::new();
let mut missing_ids = Vec::new();
for current in frontier {
let outgoing = store.outgoing_edges(current, kind)?;
for edge in outgoing {
if !known_nodes.contains_key(&edge.to_id) {
missing_ids.push(edge.to_id.clone());
}
edges.push(edge);
}
}
missing_ids.sort();
missing_ids.dedup();
let mut nodes = Vec::new();
for id in missing_ids {
if !known_nodes.contains_key(&id)
&& let Some(node) = store.node(&id)?
{
nodes.push(node);
}
}
Ok(NeighborhoodFetchedLayer { edges, nodes })
}
pub(crate) fn fetch_ranked_neighborhood_expansion<S: GraphStore + ?Sized>(
store: &S,
entry: &ScoredQueueEntry,
state: &RankedNeighborhoodLayerState,
kind: Option<&str>,
) -> Result<RankedNeighborhoodFetchedExpansion> {
let edges = store.outgoing_edges(&entry.id, kind)?;
let mut neighbor_nodes = BTreeMap::new();
for edge in &edges {
if state.seen.contains(&edge.to_id) {
continue;
}
if let Some(node) = store.node(&edge.to_id)? {
neighbor_nodes.insert(edge.to_id.clone(), node);
}
}
Ok(RankedNeighborhoodFetchedExpansion {
edges,
neighbor_nodes,
})
}
pub(crate) fn edge_kind_weighted_score(edge_kind: &str) -> i64 {
match edge_kind {
"semantic_relation" => 34,
"mentions_entity" | "mentions_concept" | "tagged_entity" | "tagged_concept"
| "related_concept" => 28,
"mentions" => 22,
"calls" => 20,
"requests_context" | "scopes_context" | "scopes_source" | "explains_result" => 18,
"defines" | "contains" | "belongs_to" => 12,
kind if kind.contains("community") => 20,
kind if kind.contains("semantic")
|| kind.contains("concept")
|| kind.contains("entity") =>
{
24
}
_ => 8,
}
}
pub(crate) fn graph_edge_matches_filters(
edge: &GraphEdge,
filters: &[GraphPropertyFilter],
) -> bool {
filters
.iter()
.all(|filter| edge.properties.get(&filter.key) == Some(&filter.value))
}
pub fn apply_graph_query_page(
mut nodes: Vec<GraphNode>,
mut edges: Vec<GraphEdge>,
options: GraphQueryOptions,
mut diagnostics: Vec<String>,
) -> GraphPagedSubgraph {
nodes.sort_by(|left, right| left.id.cmp(&right.id));
edges.sort_by(|left, right| {
left.from_id
.cmp(&right.from_id)
.then(left.kind.cmp(&right.kind))
.then(left.to_id.cmp(&right.to_id))
});
let before_filter = nodes.len();
if !options.property_filters.is_empty() {
nodes.retain(|node| graph_node_matches_filters(node, &options.property_filters));
}
let after_filter = nodes.len();
if let Some(cursor) = &options.cursor {
nodes.retain(|node| node.id > *cursor);
}
let before_limit = nodes.len();
let mut next_cursor = None;
if let Some(limit) = options.limit
&& nodes.len() > limit
{
next_cursor = nodes
.get(limit.saturating_sub(1))
.map(|node| node.id.clone());
nodes.truncate(limit);
}
let node_ids = nodes
.iter()
.map(|node| node.id.as_str())
.collect::<BTreeSet<_>>();
edges.retain(|edge| {
node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
});
if after_filter != before_filter {
diagnostics.push(format!(
"property filters removed {} node(s)",
before_filter.saturating_sub(after_filter)
));
}
if options.cursor.is_some() {
diagnostics.push("cursor is exclusive and ordered by node id".to_string());
}
if next_cursor.is_some() {
diagnostics.push(
"result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
);
}
GraphPagedSubgraph {
page: GraphQueryPage {
cursor: options.cursor,
limit: options.limit,
next_cursor,
returned_nodes: nodes.len(),
returned_edges: edges.len(),
truncated: options.limit.is_some_and(|limit| before_limit > limit),
diagnostics,
},
nodes,
edges,
}
}
pub fn apply_graph_edge_query_page(
mut edges: Vec<GraphEdge>,
options: GraphQueryOptions,
mut diagnostics: Vec<String>,
) -> GraphPagedSubgraph {
edges.sort_by_key(graph_edge_id);
let before_filter = edges.len();
if !options.property_filters.is_empty() {
edges.retain(|edge| graph_edge_matches_filters(edge, &options.property_filters));
}
let after_filter = edges.len();
if let Some(cursor) = &options.cursor {
edges.retain(|edge| graph_edge_id(edge) > *cursor);
}
let before_limit = edges.len();
let mut next_cursor = None;
if let Some(limit) = options.limit
&& edges.len() > limit
{
next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
edges.truncate(limit);
}
if after_filter != before_filter {
diagnostics.push(format!(
"edge property filters removed {} edge(s)",
before_filter.saturating_sub(after_filter)
));
}
if options.cursor.is_some() {
diagnostics.push("cursor is exclusive and ordered by edge id".to_string());
}
if next_cursor.is_some() {
diagnostics.push(
"result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
);
}
GraphPagedSubgraph {
page: GraphQueryPage {
cursor: options.cursor,
limit: options.limit,
next_cursor,
returned_nodes: 0,
returned_edges: edges.len(),
truncated: options.limit.is_some_and(|limit| before_limit > limit),
diagnostics,
},
nodes: Vec::new(),
edges,
}
}
pub trait GraphStore {
fn upsert_node(&self, node: &GraphNode) -> Result<()>;
fn upsert_edge(&self, edge: &GraphEdge) -> Result<()>;
fn delete_node(&self, id: &str) -> Result<usize>;
fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize>;
fn node(&self, id: &str) -> Result<Option<GraphNode>>;
fn all_nodes(&self) -> Result<Vec<GraphNode>>;
fn all_edges(&self) -> Result<Vec<GraphEdge>>;
fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
Ok(self
.all_edges()?
.into_iter()
.find(|edge| graph_edge_id(edge) == edge_id))
}
fn graph_counts(&self) -> Result<(usize, usize)> {
Ok((self.all_nodes()?.len(), self.all_edges()?.len()))
}
fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
let mut edges = self
.all_edges()?
.into_iter()
.filter(|edge| edge.from_id != edge.to_id)
.filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
.collect::<Vec<_>>();
edges.sort_by(|left, right| {
left.from_id
.cmp(&right.from_id)
.then(left.kind.cmp(&right.kind))
.then(left.to_id.cmp(&right.to_id))
});
Ok(edges.into_iter().next())
}
fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
let mut probes = Vec::new();
for edge in self.all_edges()? {
if edge.from_id == edge.to_id {
continue;
}
if let Some((key, value)) = edge
.properties
.iter()
.next()
.map(|(key, value)| (key.clone(), value.clone()))
{
probes.push((edge, GraphPropertyFilter { key, value }));
}
}
probes.sort_by(|(left_edge, left_filter), (right_edge, right_filter)| {
left_filter
.key
.cmp(&right_filter.key)
.then(left_filter.value.cmp(&right_filter.value))
.then_with(|| graph_edge_id(left_edge).cmp(&graph_edge_id(right_edge)))
});
Ok(probes.into_iter().next())
}
fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>>;
fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>>;
fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
let mut edges = self
.all_edges()?
.into_iter()
.filter(|edge| edge.from_id == node_id || edge.to_id == node_id)
.filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
.collect::<Vec<_>>();
edges.sort_by_key(graph_edge_id);
Ok(edges)
}
fn paged_edges(
&self,
kind: Option<&str>,
options: GraphQueryOptions,
) -> Result<GraphPagedSubgraph> {
let edges = self
.all_edges()?
.into_iter()
.filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
.collect::<Vec<_>>();
Ok(apply_graph_edge_query_page(edges, options, Vec::new()))
}
fn paged_incident_edges(
&self,
node_id: &str,
kind: Option<&str>,
options: GraphQueryOptions,
) -> Result<GraphPagedSubgraph> {
Ok(apply_graph_edge_query_page(
self.incident_edges(node_id, kind)?,
options,
Vec::new(),
))
}
fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
for from_id in node_ids {
for edge in self.outgoing_edges(from_id, None)? {
if node_ids.contains(&edge.to_id) {
edges
.entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
.or_insert(edge);
}
}
}
Ok(edges.into_values().collect())
}
fn shortest_path(
&self,
from_id: &str,
to_id: &str,
kind: Option<&str>,
) -> Result<Option<GraphPath>>;
fn shortest_path_with_max_hops(
&self,
from_id: &str,
to_id: &str,
kind: Option<&str>,
max_hops: Option<usize>,
) -> Result<Option<GraphPath>> {
shortest_path_using_outgoing(from_id, to_id, kind, max_hops, |current, kind| {
self.outgoing_edges(current, kind)
})
}
fn neighborhood(
&self,
center_id: &str,
depth: usize,
kind: Option<&str>,
) -> Result<Option<GraphSubgraph>> {
let Some(center) = self.node(center_id)? else {
return Ok(None);
};
let ctx = LazyContext::new();
let initial = NeighborhoodLayerState {
nodes: BTreeMap::from([(center_id.to_string(), center)]),
edges: BTreeMap::new(),
frontier: vec![center_id.to_string()],
};
let mut layer: SlotHandle<std::result::Result<NeighborhoodLayerState, String>> =
ctx.slot(move |_| Ok(initial.clone()));
let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
for _ in 0..depth {
if state.frontier.is_empty() {
break;
}
let fetched = fetch_neighborhood_layer(self, &state.frontier, &state.nodes, kind)?;
let previous = layer;
layer = ctx.slot(move |ctx| {
let mut state = ctx.get(&previous)?;
for edge in &fetched.edges {
let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
state.edges.entry(edge_key).or_insert_with(|| edge.clone());
}
state.frontier.clear();
for node in &fetched.nodes {
if !state.nodes.contains_key(&node.id) {
state.frontier.push(node.id.clone());
state.nodes.insert(node.id.clone(), node.clone());
}
}
Ok(state)
});
state = ctx.get(&layer).map_err(graph_cache_error)?;
}
Ok(Some(
GraphSubgraph {
nodes: state.nodes.into_values().collect(),
edges: state.edges.into_values().collect(),
}
.sorted(),
))
}
fn paged_nodes_by_kind(
&self,
kind: &str,
options: GraphQueryOptions,
) -> Result<GraphPagedSubgraph> {
Ok(apply_graph_query_page(
self.nodes_by_kind(kind)?,
Vec::new(),
options,
Vec::new(),
))
}
fn paged_neighborhood(
&self,
center_id: &str,
depth: usize,
kind: Option<&str>,
options: GraphQueryOptions,
) -> Result<Option<GraphPagedSubgraph>> {
Ok(self.neighborhood(center_id, depth, kind)?.map(|subgraph| {
apply_graph_query_page(subgraph.nodes, subgraph.edges, options, Vec::new())
}))
}
fn ranked_neighborhood(
&self,
center_id: &str,
options: &RankedNeighborhoodOptions,
) -> Result<Option<RankedNeighborhoodResult>> {
let Some(center) = self.node(center_id)? else {
return Ok(None);
};
let ctx = LazyContext::new();
let initial = RankedNeighborhoodLayerState {
nodes: BTreeMap::from([(center_id.to_string(), center)]),
edges: BTreeMap::new(),
queue: BinaryHeap::from([ScoredQueueEntry {
id: center_id.to_string(),
depth: 0usize,
score: i64::MAX,
}]),
seen: BTreeSet::from([center_id.to_string()]),
pruned_count: 0,
total_discovered: 1,
degree_map: BTreeMap::new(),
};
let mut layer: SlotHandle<std::result::Result<RankedNeighborhoodLayerState, String>> =
ctx.slot(move |_| Ok(initial.clone()));
let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
let options = options.clone();
while let Some(entry) = state.queue.peek().cloned() {
let fetched = if entry.depth < options.depth {
Some(fetch_ranked_neighborhood_expansion(
self,
&entry,
&state,
options.edge_kind.as_deref(),
)?)
} else {
None
};
let previous = layer;
let options = options.clone();
layer = ctx.slot(move |ctx| {
let mut state = ctx.get(&previous)?;
let Some(entry) = state.queue.pop() else {
return Ok(state);
};
let Some(fetched) = &fetched else {
return Ok(state);
};
for edge in &fetched.edges {
let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
state.edges.entry(edge_key).or_insert_with(|| edge.clone());
*state.degree_map.entry(edge.from_id.clone()).or_default() += 1;
*state.degree_map.entry(edge.to_id.clone()).or_default() += 1;
if state.seen.contains(&edge.to_id) {
continue;
}
state.seen.insert(edge.to_id.clone());
state.total_discovered += 1;
let Some(neighbor) = fetched.neighbor_nodes.get(&edge.to_id) else {
continue;
};
if state.nodes.len() > options.max_nodes {
state.pruned_count += 1;
continue;
}
let score = compute_neighborhood_score(
&options.scoring,
entry.depth + 1,
&edge.kind,
neighbor,
&state.degree_map,
);
state.nodes.insert(edge.to_id.clone(), neighbor.clone());
state.queue.push(ScoredQueueEntry {
id: edge.to_id.clone(),
depth: entry.depth + 1,
score,
});
}
Ok(state)
});
state = ctx.get(&layer).map_err(graph_cache_error)?;
}
let node_ids: BTreeSet<_> = state.nodes.keys().cloned().collect();
state
.edges
.retain(|_, edge| node_ids.contains(&edge.from_id) && node_ids.contains(&edge.to_id));
Ok(Some(RankedNeighborhoodResult {
nodes: state.nodes.into_values().collect(),
edges: state.edges.into_values().collect(),
pruned_count: state.pruned_count,
total_discovered: state.total_discovered,
}))
}
fn reachable_nodes_by_kind(
&self,
from_id: &str,
kind: &str,
depth: usize,
limit: usize,
) -> Result<Vec<(GraphNode, GraphPath)>> {
let mut rows = BTreeMap::<String, (GraphNode, GraphPath)>::new();
let mut seen = BTreeSet::from([from_id.to_string()]);
let mut queue = VecDeque::from([(from_id.to_string(), vec![from_id.to_string()])]);
while let Some((current, path)) = queue.pop_front() {
let current_depth = path.len().saturating_sub(1);
if current_depth >= depth {
continue;
}
for edge in self.outgoing_edges(¤t, None)? {
if !seen.insert(edge.to_id.clone()) {
continue;
}
let Some(node) = self.node(&edge.to_id)? else {
continue;
};
let mut next_path = path.clone();
next_path.push(edge.to_id.clone());
let graph_path = GraphPath {
hops: next_path.len().saturating_sub(1),
nodes: next_path.clone(),
};
if node.kind == kind {
rows.entry(node.id.clone()).or_insert((node, graph_path));
}
queue.push_back((edge.to_id, next_path));
}
}
let mut rows = rows.into_values().collect::<Vec<_>>();
rows.sort_by(|(left_node, left_path), (right_node, right_path)| {
left_path
.hops
.cmp(&right_path.hops)
.then(left_node.label.cmp(&right_node.label))
.then(left_node.id.cmp(&right_node.id))
});
if limit > 0 && rows.len() > limit {
rows.truncate(limit);
}
Ok(rows)
}
fn reachable_nodes_by_kinds(
&self,
from_id: &str,
kinds: &[&str],
depth: usize,
limit: usize,
) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
let mut rows = BTreeMap::new();
for kind in kinds {
rows.insert(
(*kind).to_string(),
self.reachable_nodes_by_kind(from_id, kind, depth, limit)?,
);
}
Ok(rows)
}
fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
if let Some(node) = self.node(target)? {
return Ok(Some(node));
}
let normalized = target.trim().trim_start_matches('#');
for kind in kinds {
let mut candidates = self
.nodes_by_kind(kind)?
.into_iter()
.filter(|node| {
node.properties.get("handle").map(String::as_str) == Some(target)
|| node.properties.get("ref_id").map(String::as_str) == Some(normalized)
|| node.label == target
|| node.label == format!("#{normalized}")
})
.collect::<Vec<_>>();
candidates.sort_by(|left, right| left.id.cmp(&right.id));
if let Some(candidate) = candidates.into_iter().next() {
return Ok(Some(candidate));
}
}
Ok(None)
}
}
pub fn shortest_path_using_outgoing(
from_id: &str,
to_id: &str,
kind: Option<&str>,
max_hops: Option<usize>,
mut outgoing_edges: impl FnMut(&str, Option<&str>) -> Result<Vec<GraphEdge>>,
) -> Result<Option<GraphPath>> {
if from_id == to_id {
return Ok(Some(GraphPath {
nodes: vec![from_id.to_string()],
hops: 0,
}));
}
let mut queue = VecDeque::new();
let mut parent = BTreeMap::<String, (usize, String)>::new();
parent.insert(from_id.to_string(), (0, String::new()));
queue.push_back(from_id.to_string());
while let Some(current) = queue.pop_front() {
let current_depth = parent.get(¤t).map(|(d, _)| *d).unwrap_or(0);
if max_hops.is_some_and(|max_hops| current_depth >= max_hops) {
continue;
}
for edge in outgoing_edges(¤t, kind)? {
if parent.contains_key(&edge.to_id) {
continue;
}
parent.insert(edge.to_id.clone(), (current_depth + 1, current.clone()));
if edge.to_id == to_id {
let mut nodes = vec![to_id.to_string()];
let mut cursor = to_id;
while let Some((_, previous)) = parent.get(cursor) {
if previous.is_empty() {
break;
}
nodes.push(previous.clone());
cursor = previous;
}
nodes.reverse();
let hops = nodes.len().saturating_sub(1);
return Ok(Some(GraphPath { nodes, hops }));
}
queue.push_back(edge.to_id);
}
}
Ok(None)
}