1use anyhow::Result;
2use lazily::{Context as LazyContext, SlotHandle};
3use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::types::*;
7
8pub(crate) fn graph_node_matches_filters(
9 node: &GraphNode,
10 filters: &[GraphPropertyFilter],
11) -> bool {
12 filters
13 .iter()
14 .all(|filter| node.properties.get(&filter.key) == Some(&filter.value))
15}
16
17#[derive(Debug, Clone, Eq, PartialEq)]
18pub(crate) struct ScoredQueueEntry {
19 pub id: String,
20 pub depth: usize,
21 pub score: i64,
22}
23
24impl Ord for ScoredQueueEntry {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.score
27 .cmp(&other.score)
28 .then_with(|| other.depth.cmp(&self.depth))
29 .then_with(|| self.id.cmp(&other.id))
30 }
31}
32
33impl PartialOrd for ScoredQueueEntry {
34 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
35 Some(self.cmp(other))
36 }
37}
38
39pub(crate) fn compute_neighborhood_score(
40 strategy: &NeighborhoodScoring,
41 depth: usize,
42 edge_kind: &str,
43 _node: &GraphNode,
44 degree_map: &BTreeMap<String, usize>,
45) -> i64 {
46 match strategy {
47 NeighborhoodScoring::BreadthFirst => {
48 (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0)
49 }
50 NeighborhoodScoring::EdgeKindWeighted => {
51 let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
52 let kind_score = edge_kind_weighted_score(edge_kind);
53 depth_score.saturating_add(kind_score)
54 }
55 NeighborhoodScoring::DegreeWeighted => {
56 let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
57 let degree = degree_map.values().copied().max().unwrap_or(1) as i64;
58 let degree_bonus = if degree <= 3 {
59 20
60 } else if degree <= 10 {
61 10
62 } else {
63 0
64 };
65 depth_score.saturating_add(degree_bonus)
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy)]
71pub(crate) struct NeighborhoodScoreContext {
72 pub now_unix: i64,
73}
74
75impl NeighborhoodScoreContext {
76 pub(crate) fn from_options(options: &RankedNeighborhoodOptions) -> Self {
77 Self {
78 now_unix: options.observed_at_now_unix.unwrap_or_else(unix_now),
79 }
80 }
81}
82
83pub(crate) fn compute_ranked_neighborhood_score(
84 options: &RankedNeighborhoodOptions,
85 context: NeighborhoodScoreContext,
86 depth: usize,
87 edge_kind: &str,
88 node: &GraphNode,
89 degree_map: &BTreeMap<String, usize>,
90) -> i64 {
91 compute_neighborhood_score(&options.scoring, depth, edge_kind, node, degree_map)
92 .saturating_add(observed_at_decay_bonus(
93 node,
94 context.now_unix,
95 options.observed_at_half_life_secs,
96 options.observed_at_weight,
97 ))
98 .saturating_add(memory_node_signal_bonus(node, options.memory_node_boost))
99}
100
101pub(crate) fn graph_node_observed_at_unix(node: &GraphNode) -> Option<i64> {
102 node.freshness
103 .as_ref()
104 .and_then(|freshness| freshness.observed_at_unix)
105 .or_else(|| graph_node_i64_property(node, "observed_at_unix"))
106 .or_else(|| graph_node_i64_property(node, "max_observed_at_unix"))
107}
108
109pub(crate) fn observed_at_decay_bonus(
110 node: &GraphNode,
111 now_unix: i64,
112 half_life_secs: i64,
113 weight: i64,
114) -> i64 {
115 if weight <= 0 {
116 return 0;
117 }
118 let Some(observed_at_unix) = graph_node_observed_at_unix(node) else {
119 return 0;
120 };
121 let half_life_secs = half_life_secs.max(1);
122 let age_secs = now_unix.saturating_sub(observed_at_unix).max(0);
123 match age_secs / half_life_secs {
124 0 => weight,
125 1 => weight / 2,
126 2 | 3 => weight / 4,
127 _ => 0,
128 }
129}
130
131pub(crate) fn memory_node_signal_bonus(node: &GraphNode, configured_boost: i64) -> i64 {
132 if configured_boost <= 0 {
133 return 0;
134 }
135 let provider_memory = node
136 .properties
137 .get("provider")
138 .is_some_and(|provider| provider == "tsift-memory");
139 let authored_kind = matches!(node.kind.as_str(), "finding" | "decision" | "note");
140 let memory_kind = node.kind.starts_with("memory_") || provider_memory || authored_kind;
141 if !memory_kind || node.kind == "memory_projection" {
142 return 0;
143 }
144
145 let base = match node.kind.as_str() {
146 "finding" | "decision" | "memory_event" => configured_boost,
147 "note" | "memory_session" => configured_boost / 2,
148 "source_handle" | "semantic_concept" | "semantic_vector_handle" if provider_memory => {
149 configured_boost / 2
150 }
151 _ if provider_memory => configured_boost / 2,
152 _ => configured_boost,
153 };
154
155 base.saturating_add(confidence_bonus(node, configured_boost))
156}
157
158fn confidence_bonus(node: &GraphNode, configured_boost: i64) -> i64 {
159 node.properties
160 .get("confidence")
161 .and_then(|value| value.parse::<f64>().ok())
162 .map(|confidence| (configured_boost as f64 * confidence.clamp(0.0, 1.0)).round() as i64)
163 .unwrap_or(0)
164}
165
166fn graph_node_i64_property(node: &GraphNode, key: &str) -> Option<i64> {
167 node.properties.get(key)?.parse().ok()
168}
169
170fn unix_now() -> i64 {
171 SystemTime::now()
172 .duration_since(UNIX_EPOCH)
173 .map(|duration| duration.as_secs() as i64)
174 .unwrap_or_default()
175}
176
177#[derive(Clone)]
178pub(crate) struct NeighborhoodLayerState {
179 pub nodes: BTreeMap<String, GraphNode>,
180 pub edges: BTreeMap<(String, String, String), GraphEdge>,
181 pub frontier: Vec<String>,
182}
183
184#[derive(Clone)]
185pub(crate) struct NeighborhoodFetchedLayer {
186 pub edges: Vec<GraphEdge>,
187 pub nodes: Vec<GraphNode>,
188}
189
190#[derive(Clone)]
191pub(crate) struct RankedNeighborhoodLayerState {
192 pub nodes: BTreeMap<String, GraphNode>,
193 pub edges: BTreeMap<(String, String, String), GraphEdge>,
194 pub queue: BinaryHeap<ScoredQueueEntry>,
195 pub seen: BTreeSet<String>,
196 pub pruned_count: usize,
197 pub total_discovered: usize,
198 pub degree_map: BTreeMap<String, usize>,
199}
200
201#[derive(Clone)]
202pub(crate) struct RankedNeighborhoodFetchedExpansion {
203 pub edges: Vec<GraphEdge>,
204 pub neighbor_nodes: BTreeMap<String, GraphNode>,
205}
206
207pub(crate) fn graph_cache_error(message: String) -> anyhow::Error {
208 anyhow::anyhow!("{message}")
209}
210
211pub(crate) fn fetch_neighborhood_layer<S: GraphStore + ?Sized>(
212 store: &S,
213 frontier: &[String],
214 known_nodes: &BTreeMap<String, GraphNode>,
215 kind: Option<&str>,
216) -> Result<NeighborhoodFetchedLayer> {
217 let mut edges = Vec::new();
218 let mut missing_ids = Vec::new();
219 for current in frontier {
220 let outgoing = store.outgoing_edges(current, kind)?;
221 for edge in outgoing {
222 if !known_nodes.contains_key(&edge.to_id) {
223 missing_ids.push(edge.to_id.clone());
224 }
225 edges.push(edge);
226 }
227 }
228 missing_ids.sort();
229 missing_ids.dedup();
230 let mut nodes = Vec::new();
231 for id in missing_ids {
232 if !known_nodes.contains_key(&id)
233 && let Some(node) = store.node(&id)?
234 {
235 nodes.push(node);
236 }
237 }
238 Ok(NeighborhoodFetchedLayer { edges, nodes })
239}
240
241pub(crate) fn fetch_ranked_neighborhood_expansion<S: GraphStore + ?Sized>(
242 store: &S,
243 entry: &ScoredQueueEntry,
244 state: &RankedNeighborhoodLayerState,
245 kind: Option<&str>,
246) -> Result<RankedNeighborhoodFetchedExpansion> {
247 let edges = store.outgoing_edges(&entry.id, kind)?;
248 let mut neighbor_nodes = BTreeMap::new();
249 for edge in &edges {
250 if state.seen.contains(&edge.to_id) {
251 continue;
252 }
253 if let Some(node) = store.node(&edge.to_id)? {
254 neighbor_nodes.insert(edge.to_id.clone(), node);
255 }
256 }
257 Ok(RankedNeighborhoodFetchedExpansion {
258 edges,
259 neighbor_nodes,
260 })
261}
262
263pub(crate) fn edge_kind_weighted_score(edge_kind: &str) -> i64 {
264 match edge_kind {
265 "semantic_relation" => 34,
266 "mentions_entity" | "mentions_concept" | "tagged_entity" | "tagged_concept"
267 | "related_concept" => 28,
268 "mentions" => 22,
269 "calls" => 20,
270 "requests_context" | "scopes_context" | "scopes_source" | "explains_result" => 18,
271 "defines" | "contains" | "belongs_to" => 12,
272 kind if kind.contains("community") => 20,
273 kind if kind.contains("semantic")
274 || kind.contains("concept")
275 || kind.contains("entity") =>
276 {
277 24
278 }
279 _ => 8,
280 }
281}
282
283pub(crate) fn graph_edge_matches_filters(
284 edge: &GraphEdge,
285 filters: &[GraphPropertyFilter],
286) -> bool {
287 filters
288 .iter()
289 .all(|filter| edge.properties.get(&filter.key) == Some(&filter.value))
290}
291
292pub fn apply_graph_query_page(
293 mut nodes: Vec<GraphNode>,
294 mut edges: Vec<GraphEdge>,
295 options: GraphQueryOptions,
296 mut diagnostics: Vec<String>,
297) -> GraphPagedSubgraph {
298 nodes.sort_by(|left, right| left.id.cmp(&right.id));
299 edges.sort_by(|left, right| {
300 left.from_id
301 .cmp(&right.from_id)
302 .then(left.kind.cmp(&right.kind))
303 .then(left.to_id.cmp(&right.to_id))
304 });
305
306 let before_filter = nodes.len();
307 if !options.property_filters.is_empty() {
308 nodes.retain(|node| graph_node_matches_filters(node, &options.property_filters));
309 }
310 let after_filter = nodes.len();
311
312 if let Some(cursor) = &options.cursor {
313 nodes.retain(|node| node.id > *cursor);
314 }
315
316 let before_limit = nodes.len();
317 let mut next_cursor = None;
318 if let Some(limit) = options.limit
319 && nodes.len() > limit
320 {
321 next_cursor = nodes
322 .get(limit.saturating_sub(1))
323 .map(|node| node.id.clone());
324 nodes.truncate(limit);
325 }
326
327 let node_ids = nodes
328 .iter()
329 .map(|node| node.id.as_str())
330 .collect::<BTreeSet<_>>();
331 edges.retain(|edge| {
332 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
333 });
334
335 if after_filter != before_filter {
336 diagnostics.push(format!(
337 "property filters removed {} node(s)",
338 before_filter.saturating_sub(after_filter)
339 ));
340 }
341 if options.cursor.is_some() {
342 diagnostics.push("cursor is exclusive and ordered by node id".to_string());
343 }
344 if next_cursor.is_some() {
345 diagnostics.push(
346 "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
347 );
348 }
349
350 GraphPagedSubgraph {
351 page: GraphQueryPage {
352 cursor: options.cursor,
353 limit: options.limit,
354 next_cursor,
355 returned_nodes: nodes.len(),
356 returned_edges: edges.len(),
357 truncated: options.limit.is_some_and(|limit| before_limit > limit),
358 diagnostics,
359 },
360 nodes,
361 edges,
362 }
363}
364
365pub fn apply_graph_edge_query_page(
366 mut edges: Vec<GraphEdge>,
367 options: GraphQueryOptions,
368 mut diagnostics: Vec<String>,
369) -> GraphPagedSubgraph {
370 edges.sort_by_key(graph_edge_id);
371
372 let before_filter = edges.len();
373 if !options.property_filters.is_empty() {
374 edges.retain(|edge| graph_edge_matches_filters(edge, &options.property_filters));
375 }
376 let after_filter = edges.len();
377
378 if let Some(cursor) = &options.cursor {
379 edges.retain(|edge| graph_edge_id(edge) > *cursor);
380 }
381
382 let before_limit = edges.len();
383 let mut next_cursor = None;
384 if let Some(limit) = options.limit
385 && edges.len() > limit
386 {
387 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
388 edges.truncate(limit);
389 }
390
391 if after_filter != before_filter {
392 diagnostics.push(format!(
393 "edge property filters removed {} edge(s)",
394 before_filter.saturating_sub(after_filter)
395 ));
396 }
397 if options.cursor.is_some() {
398 diagnostics.push("cursor is exclusive and ordered by edge id".to_string());
399 }
400 if next_cursor.is_some() {
401 diagnostics.push(
402 "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
403 );
404 }
405
406 GraphPagedSubgraph {
407 page: GraphQueryPage {
408 cursor: options.cursor,
409 limit: options.limit,
410 next_cursor,
411 returned_nodes: 0,
412 returned_edges: edges.len(),
413 truncated: options.limit.is_some_and(|limit| before_limit > limit),
414 diagnostics,
415 },
416 nodes: Vec::new(),
417 edges,
418 }
419}
420
421pub fn parse_graph_semantic_vector_property(value: &str) -> Option<Vec<f64>> {
422 let parsed = value
423 .split(',')
424 .map(|part| part.trim().parse::<f64>())
425 .collect::<std::result::Result<Vec<_>, _>>()
426 .ok()?;
427 (!parsed.is_empty() && parsed.iter().all(|value| value.is_finite())).then_some(parsed)
428}
429
430pub fn graph_semantic_cosine(left: &[f64], right: &[f64]) -> f64 {
431 if left.len() != right.len() || left.is_empty() {
432 return 0.0;
433 }
434 left.iter()
435 .zip(right)
436 .map(|(left, right)| left * right)
437 .sum::<f64>()
438}
439
440pub fn graph_semantic_seeded_edge_other_id<'a>(
441 edge: &'a GraphEdge,
442 current_id: &str,
443) -> Option<&'a str> {
444 if edge.from_id == current_id {
445 Some(edge.to_id.as_str())
446 } else if edge.to_id == current_id {
447 Some(edge.from_id.as_str())
448 } else {
449 None
450 }
451}
452
453pub fn graph_semantic_seeded_edge_score(edge: &GraphEdge, current_id: &str) -> i64 {
454 let mut score = edge_kind_weighted_score(&edge.kind).saturating_mul(10);
455 score += if edge.from_id == current_id { 8 } else { 4 };
456 score += match edge.kind.as_str() {
457 "mentions_concept" | "mentions_entity" | "tagged_concept" | "tagged_entity"
458 | "related_concept" => 30,
459 "semantic_relation" => 28,
460 "calls" => 24,
461 "mentions" => 22,
462 "requests_context" | "scopes_context" | "scopes_source" | "explains_result" => 18,
463 "defines" | "contains" | "belongs_to" => 12,
464 _ => 0,
465 };
466 score
467}
468
469pub trait GraphStore {
470 fn upsert_node(&self, node: &GraphNode) -> Result<()>;
471 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()>;
472 fn delete_node(&self, id: &str) -> Result<usize>;
473 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize>;
474 fn node(&self, id: &str) -> Result<Option<GraphNode>>;
475 fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
476 let mut nodes = Vec::new();
477 let mut seen = BTreeSet::new();
478 for id in ids {
479 if !seen.insert(id.clone()) {
480 continue;
481 }
482 if let Some(node) = self.node(id)? {
483 nodes.push(node);
484 }
485 }
486 nodes.sort_by(|left, right| left.id.cmp(&right.id));
487 Ok(nodes)
488 }
489 fn all_nodes(&self) -> Result<Vec<GraphNode>>;
490 fn all_edges(&self) -> Result<Vec<GraphEdge>>;
491 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
492 Ok(self
493 .all_edges()?
494 .into_iter()
495 .find(|edge| graph_edge_id(edge) == edge_id))
496 }
497 fn graph_counts(&self) -> Result<(usize, usize)> {
498 Ok((self.all_nodes()?.len(), self.all_edges()?.len()))
499 }
500 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
501 let mut edges = self
502 .all_edges()?
503 .into_iter()
504 .filter(|edge| edge.from_id != edge.to_id)
505 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
506 .collect::<Vec<_>>();
507 edges.sort_by(|left, right| {
508 left.from_id
509 .cmp(&right.from_id)
510 .then(left.kind.cmp(&right.kind))
511 .then(left.to_id.cmp(&right.to_id))
512 });
513 Ok(edges.into_iter().next())
514 }
515 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
516 let mut probes = Vec::new();
517 for edge in self.all_edges()? {
518 if edge.from_id == edge.to_id {
519 continue;
520 }
521 if let Some((key, value)) = edge
522 .properties
523 .iter()
524 .next()
525 .map(|(key, value)| (key.clone(), value.clone()))
526 {
527 probes.push((edge, GraphPropertyFilter { key, value }));
528 }
529 }
530 probes.sort_by(|(left_edge, left_filter), (right_edge, right_filter)| {
531 left_filter
532 .key
533 .cmp(&right_filter.key)
534 .then(left_filter.value.cmp(&right_filter.value))
535 .then_with(|| graph_edge_id(left_edge).cmp(&graph_edge_id(right_edge)))
536 });
537 Ok(probes.into_iter().next())
538 }
539 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>>;
540 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>>;
541 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
542 let mut edges = self
543 .all_edges()?
544 .into_iter()
545 .filter(|edge| edge.from_id == node_id || edge.to_id == node_id)
546 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
547 .collect::<Vec<_>>();
548 edges.sort_by_key(graph_edge_id);
549 Ok(edges)
550 }
551 fn semantic_seeded_expansion_edges(
552 &self,
553 current_id: &str,
554 options: &SemanticSeededNeighborhoodOptions,
555 ) -> Result<SemanticSeededNeighborhoodExpansion> {
556 let mut expansion_edges_by_key = BTreeMap::<String, GraphEdge>::new();
557 for edge in self.outgoing_edges(current_id, None)? {
558 expansion_edges_by_key
559 .entry(graph_edge_id(&edge))
560 .or_insert(edge);
561 }
562 for edge in self.incident_edges(current_id, None)? {
563 expansion_edges_by_key
564 .entry(graph_edge_id(&edge))
565 .or_insert(edge);
566 }
567 let mut edges = expansion_edges_by_key.into_values().collect::<Vec<_>>();
568 edges.sort_by(|left, right| {
569 graph_semantic_seeded_edge_score(right, current_id)
570 .cmp(&graph_semantic_seeded_edge_score(left, current_id))
571 .then_with(|| graph_edge_id(left).cmp(&graph_edge_id(right)))
572 });
573 let mut skipped_by_edge_cap = 0usize;
574 if options.edge_scan_cap > 0 && edges.len() > options.edge_scan_cap {
575 skipped_by_edge_cap = edges.len() - options.edge_scan_cap;
576 edges.truncate(options.edge_scan_cap);
577 }
578 Ok(SemanticSeededNeighborhoodExpansion {
579 edges,
580 skipped_by_edge_cap,
581 })
582 }
583 fn paged_edges(
584 &self,
585 kind: Option<&str>,
586 options: GraphQueryOptions,
587 ) -> Result<GraphPagedSubgraph> {
588 let edges = self
589 .all_edges()?
590 .into_iter()
591 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
592 .collect::<Vec<_>>();
593 Ok(apply_graph_edge_query_page(edges, options, Vec::new()))
594 }
595 fn paged_incident_edges(
596 &self,
597 node_id: &str,
598 kind: Option<&str>,
599 options: GraphQueryOptions,
600 ) -> Result<GraphPagedSubgraph> {
601 Ok(apply_graph_edge_query_page(
602 self.incident_edges(node_id, kind)?,
603 options,
604 Vec::new(),
605 ))
606 }
607 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
608 let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
609 for from_id in node_ids {
610 for edge in self.outgoing_edges(from_id, None)? {
611 if node_ids.contains(&edge.to_id) {
612 edges
613 .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
614 .or_insert(edge);
615 }
616 }
617 }
618 Ok(edges.into_values().collect())
619 }
620 fn shortest_path(
621 &self,
622 from_id: &str,
623 to_id: &str,
624 kind: Option<&str>,
625 ) -> Result<Option<GraphPath>>;
626 fn shortest_path_with_max_hops(
627 &self,
628 from_id: &str,
629 to_id: &str,
630 kind: Option<&str>,
631 max_hops: Option<usize>,
632 ) -> Result<Option<GraphPath>> {
633 shortest_path_using_outgoing(from_id, to_id, kind, max_hops, |current, kind| {
634 self.outgoing_edges(current, kind)
635 })
636 }
637 fn neighborhood(
638 &self,
639 center_id: &str,
640 depth: usize,
641 kind: Option<&str>,
642 ) -> Result<Option<GraphSubgraph>> {
643 let Some(center) = self.node(center_id)? else {
644 return Ok(None);
645 };
646 let ctx = LazyContext::new();
647 let initial = NeighborhoodLayerState {
648 nodes: BTreeMap::from([(center_id.to_string(), center)]),
649 edges: BTreeMap::new(),
650 frontier: vec![center_id.to_string()],
651 };
652 let mut layer: SlotHandle<std::result::Result<NeighborhoodLayerState, String>> =
653 ctx.slot(move |_| Ok(initial.clone()));
654 let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
655
656 for _ in 0..depth {
657 if state.frontier.is_empty() {
658 break;
659 }
660 let fetched = fetch_neighborhood_layer(self, &state.frontier, &state.nodes, kind)?;
661 let previous = layer;
662 layer = ctx.slot(move |ctx| {
663 let mut state = ctx.get(&previous)?;
664 for edge in &fetched.edges {
665 let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
666 state.edges.entry(edge_key).or_insert_with(|| edge.clone());
667 }
668 state.frontier.clear();
669 for node in &fetched.nodes {
670 if !state.nodes.contains_key(&node.id) {
671 state.frontier.push(node.id.clone());
672 state.nodes.insert(node.id.clone(), node.clone());
673 }
674 }
675 Ok(state)
676 });
677 state = ctx.get(&layer).map_err(graph_cache_error)?;
678 }
679
680 Ok(Some(
681 GraphSubgraph {
682 nodes: state.nodes.into_values().collect(),
683 edges: state.edges.into_values().collect(),
684 }
685 .sorted(),
686 ))
687 }
688 fn paged_nodes_by_kind(
689 &self,
690 kind: &str,
691 options: GraphQueryOptions,
692 ) -> Result<GraphPagedSubgraph> {
693 Ok(apply_graph_query_page(
694 self.nodes_by_kind(kind)?,
695 Vec::new(),
696 options,
697 Vec::new(),
698 ))
699 }
700 fn paged_neighborhood(
701 &self,
702 center_id: &str,
703 depth: usize,
704 kind: Option<&str>,
705 options: GraphQueryOptions,
706 ) -> Result<Option<GraphPagedSubgraph>> {
707 Ok(self.neighborhood(center_id, depth, kind)?.map(|subgraph| {
708 apply_graph_query_page(subgraph.nodes, subgraph.edges, options, Vec::new())
709 }))
710 }
711 fn semantic_seeded_neighborhood(
712 &self,
713 seed_ids: &[String],
714 options: &SemanticSeededNeighborhoodOptions,
715 ) -> Result<SemanticSeededNeighborhoodResult> {
716 let seed_rank = seed_ids
717 .iter()
718 .enumerate()
719 .map(|(idx, seed)| (seed.clone(), idx))
720 .collect::<BTreeMap<_, _>>();
721 let seed_nodes_by_id = self
722 .nodes_by_ids(seed_ids)?
723 .into_iter()
724 .map(|node| (node.id.clone(), node))
725 .collect::<BTreeMap<_, _>>();
726 let mut nodes = BTreeMap::<String, GraphNode>::new();
727 let mut edges = BTreeMap::<String, GraphEdge>::new();
728 let mut node_score_by_id = BTreeMap::<String, i64>::new();
729 let mut queue = VecDeque::<(String, usize)>::new();
730 let mut seen_at_depth = BTreeMap::<String, usize>::new();
731 let mut missing_seed_ids = Vec::new();
732 let mut skipped_by_edge_cap = 0usize;
733 let mut skipped_by_node_cap = 0usize;
734
735 for (idx, seed_id) in seed_ids.iter().enumerate() {
736 if let Some(node) = seed_nodes_by_id.get(seed_id) {
737 nodes.entry(seed_id.clone()).or_insert_with(|| node.clone());
738 node_score_by_id
739 .entry(seed_id.clone())
740 .or_insert(1_000_000i64.saturating_sub(idx as i64));
741 if !seen_at_depth.contains_key(seed_id) {
742 queue.push_back((seed_id.clone(), 0));
743 seen_at_depth.insert(seed_id.clone(), 0);
744 }
745 } else {
746 missing_seed_ids.push(seed_id.clone());
747 }
748 }
749
750 while let Some((current_id, current_depth)) = queue.pop_front() {
751 if current_depth >= options.depth {
752 continue;
753 }
754
755 let expansion = self.semantic_seeded_expansion_edges(¤t_id, options)?;
756 skipped_by_edge_cap = skipped_by_edge_cap.saturating_add(expansion.skipped_by_edge_cap);
757 let mut candidates = Vec::new();
758 let mut missing_candidate_ids = Vec::new();
759 for edge in expansion.edges {
760 let Some(other_id) = graph_semantic_seeded_edge_other_id(&edge, ¤t_id) else {
761 continue;
762 };
763 let other_id = other_id.to_string();
764 let edge_score = graph_semantic_seeded_edge_score(&edge, ¤t_id)
765 .saturating_add(
766 (options.depth.saturating_sub(current_depth) as i64).saturating_mul(5),
767 );
768 if !nodes.contains_key(&other_id) {
769 missing_candidate_ids.push(other_id.clone());
770 }
771 candidates.push((edge, other_id, edge_score));
772 }
773
774 missing_candidate_ids.sort();
775 missing_candidate_ids.dedup();
776 let fetched_nodes_by_id = self
777 .nodes_by_ids(&missing_candidate_ids)?
778 .into_iter()
779 .map(|node| (node.id.clone(), node))
780 .collect::<BTreeMap<_, _>>();
781
782 for (edge, other_id, edge_score) in candidates {
783 let other_known = nodes.contains_key(&other_id);
784 if !other_known && nodes.len() >= options.node_discovery_cap {
785 skipped_by_node_cap = skipped_by_node_cap.saturating_add(1);
786 continue;
787 }
788 node_score_by_id
789 .entry(other_id.clone())
790 .and_modify(|score| *score = (*score).max(edge_score))
791 .or_insert(edge_score);
792 let edge_key = graph_edge_id(&edge);
793 edges.entry(edge_key).or_insert(edge);
794 if !other_known && let Some(node) = fetched_nodes_by_id.get(&other_id) {
795 nodes.insert(other_id.clone(), node.clone());
796 }
797 if !nodes.contains_key(&other_id) {
798 continue;
799 }
800 let next_depth = current_depth + 1;
801 let should_queue = seen_at_depth
802 .get(&other_id)
803 .is_none_or(|seen_depth| next_depth < *seen_depth);
804 if should_queue {
805 seen_at_depth.insert(other_id.clone(), next_depth);
806 queue.push_back((other_id, next_depth));
807 }
808 }
809 }
810
811 let mut nodes = nodes.into_values().collect::<Vec<_>>();
812 nodes.sort_by(|left, right| {
813 seed_rank
814 .get(&left.id)
815 .copied()
816 .unwrap_or(usize::MAX)
817 .cmp(&seed_rank.get(&right.id).copied().unwrap_or(usize::MAX))
818 .then_with(|| {
819 node_score_by_id
820 .get(&right.id)
821 .copied()
822 .unwrap_or_default()
823 .cmp(&node_score_by_id.get(&left.id).copied().unwrap_or_default())
824 })
825 .then(left.id.cmp(&right.id))
826 });
827
828 let total_discovered = nodes.len();
829 let truncated = options.limit > 0 && nodes.len() > options.limit;
830 if truncated {
831 nodes.truncate(options.limit);
832 }
833
834 let node_ids = nodes
835 .iter()
836 .map(|node| node.id.as_str())
837 .collect::<BTreeSet<_>>();
838 let mut edges = edges
839 .into_values()
840 .filter(|edge| {
841 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
842 })
843 .collect::<Vec<_>>();
844 edges.sort_by_key(graph_edge_id);
845
846 Ok(SemanticSeededNeighborhoodResult {
847 nodes,
848 edges,
849 skipped_by_edge_cap,
850 skipped_by_node_cap,
851 missing_seed_ids,
852 total_discovered,
853 truncated,
854 })
855 }
856 fn ranked_neighborhood(
857 &self,
858 center_id: &str,
859 options: &RankedNeighborhoodOptions,
860 ) -> Result<Option<RankedNeighborhoodResult>> {
861 let Some(center) = self.node(center_id)? else {
862 return Ok(None);
863 };
864 let ctx = LazyContext::new();
865 let initial = RankedNeighborhoodLayerState {
866 nodes: BTreeMap::from([(center_id.to_string(), center)]),
867 edges: BTreeMap::new(),
868 queue: BinaryHeap::from([ScoredQueueEntry {
869 id: center_id.to_string(),
870 depth: 0usize,
871 score: i64::MAX,
872 }]),
873 seen: BTreeSet::from([center_id.to_string()]),
874 pruned_count: 0,
875 total_discovered: 1,
876 degree_map: BTreeMap::new(),
877 };
878 let mut layer: SlotHandle<std::result::Result<RankedNeighborhoodLayerState, String>> =
879 ctx.slot(move |_| Ok(initial.clone()));
880 let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
881 let options = options.clone();
882 let score_context = NeighborhoodScoreContext::from_options(&options);
883
884 while let Some(entry) = state.queue.peek().cloned() {
885 let fetched = if entry.depth < options.depth {
886 Some(fetch_ranked_neighborhood_expansion(
887 self,
888 &entry,
889 &state,
890 options.edge_kind.as_deref(),
891 )?)
892 } else {
893 None
894 };
895 let previous = layer;
896 let options = options.clone();
897 layer = ctx.slot(move |ctx| {
898 let mut state = ctx.get(&previous)?;
899 let Some(entry) = state.queue.pop() else {
900 return Ok(state);
901 };
902 let Some(fetched) = &fetched else {
903 return Ok(state);
904 };
905 let mut candidates = Vec::new();
906 for edge in &fetched.edges {
907 let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
908 state.edges.entry(edge_key).or_insert_with(|| edge.clone());
909 *state.degree_map.entry(edge.from_id.clone()).or_default() += 1;
910 *state.degree_map.entry(edge.to_id.clone()).or_default() += 1;
911 if state.seen.contains(&edge.to_id) {
912 continue;
913 }
914 let Some(neighbor) = fetched.neighbor_nodes.get(&edge.to_id) else {
915 continue;
916 };
917 let score = compute_ranked_neighborhood_score(
918 &options,
919 score_context,
920 entry.depth + 1,
921 &edge.kind,
922 neighbor,
923 &state.degree_map,
924 );
925 candidates.push((edge.to_id.clone(), neighbor.clone(), score));
926 }
927 candidates.sort_by(|a, b| b.2.cmp(&a.2).then_with(|| a.0.cmp(&b.0)));
928 for (to_id, neighbor, score) in candidates {
929 if !state.seen.insert(to_id.clone()) {
930 continue;
931 }
932 state.total_discovered += 1;
933 if state.nodes.len() > options.max_nodes {
934 state.pruned_count += 1;
935 continue;
936 }
937 state.nodes.insert(to_id.clone(), neighbor);
938 state.queue.push(ScoredQueueEntry {
939 id: to_id,
940 depth: entry.depth + 1,
941 score,
942 });
943 }
944 Ok(state)
945 });
946 state = ctx.get(&layer).map_err(graph_cache_error)?;
947 }
948
949 let node_ids: BTreeSet<_> = state.nodes.keys().cloned().collect();
950 state
951 .edges
952 .retain(|_, edge| node_ids.contains(&edge.from_id) && node_ids.contains(&edge.to_id));
953
954 Ok(Some(RankedNeighborhoodResult {
955 nodes: state.nodes.into_values().collect(),
956 edges: state.edges.into_values().collect(),
957 pruned_count: state.pruned_count,
958 total_discovered: state.total_discovered,
959 }))
960 }
961 fn reachable_nodes_by_kind(
962 &self,
963 from_id: &str,
964 kind: &str,
965 depth: usize,
966 limit: usize,
967 ) -> Result<Vec<(GraphNode, GraphPath)>> {
968 let mut rows = BTreeMap::<String, (GraphNode, GraphPath)>::new();
969 let mut seen = BTreeSet::from([from_id.to_string()]);
970 let mut queue = VecDeque::from([(from_id.to_string(), vec![from_id.to_string()])]);
971
972 while let Some((current, path)) = queue.pop_front() {
973 let current_depth = path.len().saturating_sub(1);
974 if current_depth >= depth {
975 continue;
976 }
977 for edge in self.outgoing_edges(¤t, None)? {
978 if !seen.insert(edge.to_id.clone()) {
979 continue;
980 }
981 let Some(node) = self.node(&edge.to_id)? else {
982 continue;
983 };
984 let mut next_path = path.clone();
985 next_path.push(edge.to_id.clone());
986 let graph_path = GraphPath {
987 hops: next_path.len().saturating_sub(1),
988 nodes: next_path.clone(),
989 };
990 if node.kind == kind {
991 rows.entry(node.id.clone()).or_insert((node, graph_path));
992 }
993 queue.push_back((edge.to_id, next_path));
994 }
995 }
996 let mut rows = rows.into_values().collect::<Vec<_>>();
997 rows.sort_by(|(left_node, left_path), (right_node, right_path)| {
998 left_path
999 .hops
1000 .cmp(&right_path.hops)
1001 .then(left_node.label.cmp(&right_node.label))
1002 .then(left_node.id.cmp(&right_node.id))
1003 });
1004 if limit > 0 && rows.len() > limit {
1005 rows.truncate(limit);
1006 }
1007 Ok(rows)
1008 }
1009
1010 fn reachable_nodes_by_kinds(
1011 &self,
1012 from_id: &str,
1013 kinds: &[&str],
1014 depth: usize,
1015 limit: usize,
1016 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
1017 let mut rows = BTreeMap::new();
1018 for kind in kinds {
1019 rows.insert(
1020 (*kind).to_string(),
1021 self.reachable_nodes_by_kind(from_id, kind, depth, limit)?,
1022 );
1023 }
1024 Ok(rows)
1025 }
1026
1027 fn semantic_top_candidates(
1028 &self,
1029 query_vector: &[f64],
1030 kinds: &[&str],
1031 limit: usize,
1032 ) -> Result<Vec<GraphSemanticCandidate>> {
1033 graph_semantic_top_candidates_by_property_scan(self, query_vector, kinds, limit)
1034 }
1035
1036 fn evidence_target_candidates(
1037 &self,
1038 target: &str,
1039 kinds: &[&str],
1040 preferred_path: Option<&str>,
1041 ) -> Result<Vec<GraphNode>> {
1042 let normalized = target.trim().trim_start_matches('#');
1043 let normalized_label = format!("#{normalized}");
1044 let mut rows = Vec::new();
1045 for kind in kinds {
1046 let mut candidates = self
1047 .nodes_by_kind(kind)?
1048 .into_iter()
1049 .filter(|node| {
1050 (node.properties.get("handle").map(String::as_str) == Some(target)
1051 || node.properties.get("ref_id").map(String::as_str) == Some(normalized)
1052 || node.label == target
1053 || node.label == normalized_label)
1054 && preferred_path.is_none_or(|path| {
1055 node.properties.get("path").map(String::as_str) == Some(path)
1056 })
1057 })
1058 .collect::<Vec<_>>();
1059 candidates.sort_by(|left, right| left.id.cmp(&right.id));
1060 rows.extend(candidates);
1061 }
1062 Ok(rows)
1063 }
1064
1065 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
1066 if let Some(node) = self.node(target)? {
1067 return Ok(Some(node));
1068 }
1069 Ok(self
1070 .evidence_target_candidates(target, kinds, None)?
1071 .into_iter()
1072 .next())
1073 }
1074}
1075
1076pub fn graph_semantic_top_candidates_by_property_scan<S: GraphStore + ?Sized>(
1077 store: &S,
1078 query_vector: &[f64],
1079 kinds: &[&str],
1080 limit: usize,
1081) -> Result<Vec<GraphSemanticCandidate>> {
1082 if query_vector.is_empty() || kinds.is_empty() {
1083 return Ok(Vec::new());
1084 }
1085 let mut seen_kinds = BTreeSet::new();
1086 let mut candidates = Vec::new();
1087 for kind in kinds {
1088 if !seen_kinds.insert(*kind) {
1089 continue;
1090 }
1091 for node in store.nodes_by_kind(kind)? {
1092 let Some(vector) = node
1093 .properties
1094 .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
1095 .and_then(|value| parse_graph_semantic_vector_property(value))
1096 else {
1097 continue;
1098 };
1099 if vector.len() != query_vector.len() {
1100 continue;
1101 }
1102 candidates.push(GraphSemanticCandidate {
1103 score: graph_semantic_cosine(query_vector, &vector),
1104 node,
1105 });
1106 }
1107 }
1108 candidates.sort_by(|left, right| {
1109 right
1110 .score
1111 .partial_cmp(&left.score)
1112 .unwrap_or(std::cmp::Ordering::Equal)
1113 .then_with(|| left.node.kind.cmp(&right.node.kind))
1114 .then_with(|| left.node.label.cmp(&right.node.label))
1115 .then_with(|| left.node.id.cmp(&right.node.id))
1116 });
1117 if limit > 0 && candidates.len() > limit {
1118 candidates.truncate(limit);
1119 }
1120 Ok(candidates)
1121}
1122
1123pub fn shortest_path_using_outgoing(
1124 from_id: &str,
1125 to_id: &str,
1126 kind: Option<&str>,
1127 max_hops: Option<usize>,
1128 mut outgoing_edges: impl FnMut(&str, Option<&str>) -> Result<Vec<GraphEdge>>,
1129) -> Result<Option<GraphPath>> {
1130 if from_id == to_id {
1131 return Ok(Some(GraphPath {
1132 nodes: vec![from_id.to_string()],
1133 hops: 0,
1134 }));
1135 }
1136
1137 let mut queue = VecDeque::new();
1138 let mut parent = BTreeMap::<String, (usize, String)>::new();
1139 parent.insert(from_id.to_string(), (0, String::new()));
1140 queue.push_back(from_id.to_string());
1141
1142 while let Some(current) = queue.pop_front() {
1143 let current_depth = parent.get(¤t).map(|(d, _)| *d).unwrap_or(0);
1144 if max_hops.is_some_and(|max_hops| current_depth >= max_hops) {
1145 continue;
1146 }
1147 for edge in outgoing_edges(¤t, kind)? {
1148 if parent.contains_key(&edge.to_id) {
1149 continue;
1150 }
1151 parent.insert(edge.to_id.clone(), (current_depth + 1, current.clone()));
1152 if edge.to_id == to_id {
1153 let mut nodes = vec![to_id.to_string()];
1154 let mut cursor = to_id;
1155 while let Some((_, previous)) = parent.get(cursor) {
1156 if previous.is_empty() {
1157 break;
1158 }
1159 nodes.push(previous.clone());
1160 cursor = previous;
1161 }
1162 nodes.reverse();
1163 let hops = nodes.len().saturating_sub(1);
1164 return Ok(Some(GraphPath { nodes, hops }));
1165 }
1166 queue.push_back(edge.to_id);
1167 }
1168 }
1169
1170 Ok(None)
1171}