Skip to main content

tsift_core/
store.rs

1use anyhow::Result;
2use lazily::{Context as LazyContext, SlotHandle};
3use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::types::*;
7
8pub(crate) fn graph_node_matches_filters(
9    node: &GraphNode,
10    filters: &[GraphPropertyFilter],
11) -> bool {
12    filters
13        .iter()
14        .all(|filter| node.properties.get(&filter.key) == Some(&filter.value))
15}
16
17#[derive(Debug, Clone, Eq, PartialEq)]
18pub(crate) struct ScoredQueueEntry {
19    pub id: String,
20    pub depth: usize,
21    pub score: i64,
22}
23
24impl Ord for ScoredQueueEntry {
25    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26        self.score
27            .cmp(&other.score)
28            .then_with(|| other.depth.cmp(&self.depth))
29            .then_with(|| self.id.cmp(&other.id))
30    }
31}
32
33impl PartialOrd for ScoredQueueEntry {
34    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
35        Some(self.cmp(other))
36    }
37}
38
39pub(crate) fn compute_neighborhood_score(
40    strategy: &NeighborhoodScoring,
41    depth: usize,
42    edge_kind: &str,
43    _node: &GraphNode,
44    degree_map: &BTreeMap<String, usize>,
45) -> i64 {
46    match strategy {
47        NeighborhoodScoring::BreadthFirst => {
48            (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0)
49        }
50        NeighborhoodScoring::EdgeKindWeighted => {
51            let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
52            let kind_score = edge_kind_weighted_score(edge_kind);
53            depth_score.saturating_add(kind_score)
54        }
55        NeighborhoodScoring::DegreeWeighted => {
56            let depth_score = (120i64.saturating_sub((depth as i64).saturating_mul(18))).max(0);
57            let degree = degree_map.values().copied().max().unwrap_or(1) as i64;
58            let degree_bonus = if degree <= 3 {
59                20
60            } else if degree <= 10 {
61                10
62            } else {
63                0
64            };
65            depth_score.saturating_add(degree_bonus)
66        }
67    }
68}
69
70#[derive(Debug, Clone, Copy)]
71pub(crate) struct NeighborhoodScoreContext {
72    pub now_unix: i64,
73}
74
75impl NeighborhoodScoreContext {
76    pub(crate) fn from_options(options: &RankedNeighborhoodOptions) -> Self {
77        Self {
78            now_unix: options.observed_at_now_unix.unwrap_or_else(unix_now),
79        }
80    }
81}
82
83pub(crate) fn compute_ranked_neighborhood_score(
84    options: &RankedNeighborhoodOptions,
85    context: NeighborhoodScoreContext,
86    depth: usize,
87    edge_kind: &str,
88    node: &GraphNode,
89    degree_map: &BTreeMap<String, usize>,
90) -> i64 {
91    compute_neighborhood_score(&options.scoring, depth, edge_kind, node, degree_map)
92        .saturating_add(observed_at_decay_bonus(
93            node,
94            context.now_unix,
95            options.observed_at_half_life_secs,
96            options.observed_at_weight,
97        ))
98        .saturating_add(memory_node_signal_bonus(node, options.memory_node_boost))
99}
100
101pub(crate) fn graph_node_observed_at_unix(node: &GraphNode) -> Option<i64> {
102    node.freshness
103        .as_ref()
104        .and_then(|freshness| freshness.observed_at_unix)
105        .or_else(|| graph_node_i64_property(node, "observed_at_unix"))
106        .or_else(|| graph_node_i64_property(node, "max_observed_at_unix"))
107}
108
109pub(crate) fn observed_at_decay_bonus(
110    node: &GraphNode,
111    now_unix: i64,
112    half_life_secs: i64,
113    weight: i64,
114) -> i64 {
115    if weight <= 0 {
116        return 0;
117    }
118    let Some(observed_at_unix) = graph_node_observed_at_unix(node) else {
119        return 0;
120    };
121    let half_life_secs = half_life_secs.max(1);
122    let age_secs = now_unix.saturating_sub(observed_at_unix).max(0);
123    match age_secs / half_life_secs {
124        0 => weight,
125        1 => weight / 2,
126        2 | 3 => weight / 4,
127        _ => 0,
128    }
129}
130
131pub(crate) fn memory_node_signal_bonus(node: &GraphNode, configured_boost: i64) -> i64 {
132    if configured_boost <= 0 {
133        return 0;
134    }
135    let provider_memory = node
136        .properties
137        .get("provider")
138        .is_some_and(|provider| provider == "tsift-memory");
139    let authored_kind = matches!(node.kind.as_str(), "finding" | "decision" | "note");
140    let memory_kind = node.kind.starts_with("memory_") || provider_memory || authored_kind;
141    if !memory_kind || node.kind == "memory_projection" {
142        return 0;
143    }
144
145    let base = match node.kind.as_str() {
146        "finding" | "decision" | "memory_event" => configured_boost,
147        "note" | "memory_session" => configured_boost / 2,
148        "source_handle" | "semantic_concept" | "semantic_vector_handle" if provider_memory => {
149            configured_boost / 2
150        }
151        _ if provider_memory => configured_boost / 2,
152        _ => configured_boost,
153    };
154
155    base.saturating_add(confidence_bonus(node, configured_boost))
156}
157
158fn confidence_bonus(node: &GraphNode, configured_boost: i64) -> i64 {
159    node.properties
160        .get("confidence")
161        .and_then(|value| value.parse::<f64>().ok())
162        .map(|confidence| (configured_boost as f64 * confidence.clamp(0.0, 1.0)).round() as i64)
163        .unwrap_or(0)
164}
165
166fn graph_node_i64_property(node: &GraphNode, key: &str) -> Option<i64> {
167    node.properties.get(key)?.parse().ok()
168}
169
170fn unix_now() -> i64 {
171    SystemTime::now()
172        .duration_since(UNIX_EPOCH)
173        .map(|duration| duration.as_secs() as i64)
174        .unwrap_or_default()
175}
176
177#[derive(Clone)]
178pub(crate) struct NeighborhoodLayerState {
179    pub nodes: BTreeMap<String, GraphNode>,
180    pub edges: BTreeMap<(String, String, String), GraphEdge>,
181    pub frontier: Vec<String>,
182}
183
184#[derive(Clone)]
185pub(crate) struct NeighborhoodFetchedLayer {
186    pub edges: Vec<GraphEdge>,
187    pub nodes: Vec<GraphNode>,
188}
189
190#[derive(Clone)]
191pub(crate) struct RankedNeighborhoodLayerState {
192    pub nodes: BTreeMap<String, GraphNode>,
193    pub edges: BTreeMap<(String, String, String), GraphEdge>,
194    pub queue: BinaryHeap<ScoredQueueEntry>,
195    pub seen: BTreeSet<String>,
196    pub pruned_count: usize,
197    pub total_discovered: usize,
198    pub degree_map: BTreeMap<String, usize>,
199}
200
201#[derive(Clone)]
202pub(crate) struct RankedNeighborhoodFetchedExpansion {
203    pub edges: Vec<GraphEdge>,
204    pub neighbor_nodes: BTreeMap<String, GraphNode>,
205}
206
207pub(crate) fn graph_cache_error(message: String) -> anyhow::Error {
208    anyhow::anyhow!("{message}")
209}
210
211pub(crate) fn fetch_neighborhood_layer<S: GraphStore + ?Sized>(
212    store: &S,
213    frontier: &[String],
214    known_nodes: &BTreeMap<String, GraphNode>,
215    kind: Option<&str>,
216) -> Result<NeighborhoodFetchedLayer> {
217    let mut edges = Vec::new();
218    let mut missing_ids = Vec::new();
219    for current in frontier {
220        let outgoing = store.outgoing_edges(current, kind)?;
221        for edge in outgoing {
222            if !known_nodes.contains_key(&edge.to_id) {
223                missing_ids.push(edge.to_id.clone());
224            }
225            edges.push(edge);
226        }
227    }
228    missing_ids.sort();
229    missing_ids.dedup();
230    let mut nodes = Vec::new();
231    for id in missing_ids {
232        if !known_nodes.contains_key(&id)
233            && let Some(node) = store.node(&id)?
234        {
235            nodes.push(node);
236        }
237    }
238    Ok(NeighborhoodFetchedLayer { edges, nodes })
239}
240
241pub(crate) fn fetch_ranked_neighborhood_expansion<S: GraphStore + ?Sized>(
242    store: &S,
243    entry: &ScoredQueueEntry,
244    state: &RankedNeighborhoodLayerState,
245    kind: Option<&str>,
246) -> Result<RankedNeighborhoodFetchedExpansion> {
247    let edges = store.outgoing_edges(&entry.id, kind)?;
248    let mut neighbor_nodes = BTreeMap::new();
249    for edge in &edges {
250        if state.seen.contains(&edge.to_id) {
251            continue;
252        }
253        if let Some(node) = store.node(&edge.to_id)? {
254            neighbor_nodes.insert(edge.to_id.clone(), node);
255        }
256    }
257    Ok(RankedNeighborhoodFetchedExpansion {
258        edges,
259        neighbor_nodes,
260    })
261}
262
263pub(crate) fn edge_kind_weighted_score(edge_kind: &str) -> i64 {
264    match edge_kind {
265        "semantic_relation" => 34,
266        "mentions_entity" | "mentions_concept" | "tagged_entity" | "tagged_concept"
267        | "related_concept" => 28,
268        "mentions" => 22,
269        "calls" => 20,
270        "requests_context" | "scopes_context" | "scopes_source" | "explains_result" => 18,
271        "defines" | "contains" | "belongs_to" => 12,
272        kind if kind.contains("community") => 20,
273        kind if kind.contains("semantic")
274            || kind.contains("concept")
275            || kind.contains("entity") =>
276        {
277            24
278        }
279        _ => 8,
280    }
281}
282
283pub(crate) fn graph_edge_matches_filters(
284    edge: &GraphEdge,
285    filters: &[GraphPropertyFilter],
286) -> bool {
287    filters
288        .iter()
289        .all(|filter| edge.properties.get(&filter.key) == Some(&filter.value))
290}
291
292pub fn apply_graph_query_page(
293    mut nodes: Vec<GraphNode>,
294    mut edges: Vec<GraphEdge>,
295    options: GraphQueryOptions,
296    mut diagnostics: Vec<String>,
297) -> GraphPagedSubgraph {
298    nodes.sort_by(|left, right| left.id.cmp(&right.id));
299    edges.sort_by(|left, right| {
300        left.from_id
301            .cmp(&right.from_id)
302            .then(left.kind.cmp(&right.kind))
303            .then(left.to_id.cmp(&right.to_id))
304    });
305
306    let before_filter = nodes.len();
307    if !options.property_filters.is_empty() {
308        nodes.retain(|node| graph_node_matches_filters(node, &options.property_filters));
309    }
310    let after_filter = nodes.len();
311
312    if let Some(cursor) = &options.cursor {
313        nodes.retain(|node| node.id > *cursor);
314    }
315
316    let before_limit = nodes.len();
317    let mut next_cursor = None;
318    if let Some(limit) = options.limit
319        && nodes.len() > limit
320    {
321        next_cursor = nodes
322            .get(limit.saturating_sub(1))
323            .map(|node| node.id.clone());
324        nodes.truncate(limit);
325    }
326
327    let node_ids = nodes
328        .iter()
329        .map(|node| node.id.as_str())
330        .collect::<BTreeSet<_>>();
331    edges.retain(|edge| {
332        node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
333    });
334
335    if after_filter != before_filter {
336        diagnostics.push(format!(
337            "property filters removed {} node(s)",
338            before_filter.saturating_sub(after_filter)
339        ));
340    }
341    if options.cursor.is_some() {
342        diagnostics.push("cursor is exclusive and ordered by node id".to_string());
343    }
344    if next_cursor.is_some() {
345        diagnostics.push(
346            "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
347        );
348    }
349
350    GraphPagedSubgraph {
351        page: GraphQueryPage {
352            cursor: options.cursor,
353            limit: options.limit,
354            next_cursor,
355            returned_nodes: nodes.len(),
356            returned_edges: edges.len(),
357            truncated: options.limit.is_some_and(|limit| before_limit > limit),
358            diagnostics,
359        },
360        nodes,
361        edges,
362    }
363}
364
365pub fn apply_graph_edge_query_page(
366    mut edges: Vec<GraphEdge>,
367    options: GraphQueryOptions,
368    mut diagnostics: Vec<String>,
369) -> GraphPagedSubgraph {
370    edges.sort_by_key(graph_edge_id);
371
372    let before_filter = edges.len();
373    if !options.property_filters.is_empty() {
374        edges.retain(|edge| graph_edge_matches_filters(edge, &options.property_filters));
375    }
376    let after_filter = edges.len();
377
378    if let Some(cursor) = &options.cursor {
379        edges.retain(|edge| graph_edge_id(edge) > *cursor);
380    }
381
382    let before_limit = edges.len();
383    let mut next_cursor = None;
384    if let Some(limit) = options.limit
385        && edges.len() > limit
386    {
387        next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
388        edges.truncate(limit);
389    }
390
391    if after_filter != before_filter {
392        diagnostics.push(format!(
393            "edge property filters removed {} edge(s)",
394            before_filter.saturating_sub(after_filter)
395        ));
396    }
397    if options.cursor.is_some() {
398        diagnostics.push("cursor is exclusive and ordered by edge id".to_string());
399    }
400    if next_cursor.is_some() {
401        diagnostics.push(
402            "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
403        );
404    }
405
406    GraphPagedSubgraph {
407        page: GraphQueryPage {
408            cursor: options.cursor,
409            limit: options.limit,
410            next_cursor,
411            returned_nodes: 0,
412            returned_edges: edges.len(),
413            truncated: options.limit.is_some_and(|limit| before_limit > limit),
414            diagnostics,
415        },
416        nodes: Vec::new(),
417        edges,
418    }
419}
420
421pub fn parse_graph_semantic_vector_property(value: &str) -> Option<Vec<f64>> {
422    let parsed = value
423        .split(',')
424        .map(|part| part.trim().parse::<f64>())
425        .collect::<std::result::Result<Vec<_>, _>>()
426        .ok()?;
427    (!parsed.is_empty() && parsed.iter().all(|value| value.is_finite())).then_some(parsed)
428}
429
430pub fn graph_semantic_cosine(left: &[f64], right: &[f64]) -> f64 {
431    if left.len() != right.len() || left.is_empty() {
432        return 0.0;
433    }
434    left.iter()
435        .zip(right)
436        .map(|(left, right)| left * right)
437        .sum::<f64>()
438}
439
440pub fn graph_semantic_seeded_edge_other_id<'a>(
441    edge: &'a GraphEdge,
442    current_id: &str,
443) -> Option<&'a str> {
444    if edge.from_id == current_id {
445        Some(edge.to_id.as_str())
446    } else if edge.to_id == current_id {
447        Some(edge.from_id.as_str())
448    } else {
449        None
450    }
451}
452
453pub fn graph_semantic_seeded_edge_score(edge: &GraphEdge, current_id: &str) -> i64 {
454    let mut score = edge_kind_weighted_score(&edge.kind).saturating_mul(10);
455    score += if edge.from_id == current_id { 8 } else { 4 };
456    score += match edge.kind.as_str() {
457        "mentions_concept" | "mentions_entity" | "tagged_concept" | "tagged_entity"
458        | "related_concept" => 30,
459        "semantic_relation" => 28,
460        "calls" => 24,
461        "mentions" => 22,
462        "requests_context" | "scopes_context" | "scopes_source" | "explains_result" => 18,
463        "defines" | "contains" | "belongs_to" => 12,
464        _ => 0,
465    };
466    score
467}
468
469pub trait GraphStore {
470    fn upsert_node(&self, node: &GraphNode) -> Result<()>;
471    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()>;
472    fn delete_node(&self, id: &str) -> Result<usize>;
473    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize>;
474    fn node(&self, id: &str) -> Result<Option<GraphNode>>;
475    fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
476        let mut nodes = Vec::new();
477        let mut seen = BTreeSet::new();
478        for id in ids {
479            if !seen.insert(id.clone()) {
480                continue;
481            }
482            if let Some(node) = self.node(id)? {
483                nodes.push(node);
484            }
485        }
486        nodes.sort_by(|left, right| left.id.cmp(&right.id));
487        Ok(nodes)
488    }
489    fn all_nodes(&self) -> Result<Vec<GraphNode>>;
490    fn all_edges(&self) -> Result<Vec<GraphEdge>>;
491    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
492        Ok(self
493            .all_edges()?
494            .into_iter()
495            .find(|edge| graph_edge_id(edge) == edge_id))
496    }
497    fn graph_counts(&self) -> Result<(usize, usize)> {
498        Ok((self.all_nodes()?.len(), self.all_edges()?.len()))
499    }
500    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
501        let mut edges = self
502            .all_edges()?
503            .into_iter()
504            .filter(|edge| edge.from_id != edge.to_id)
505            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
506            .collect::<Vec<_>>();
507        edges.sort_by(|left, right| {
508            left.from_id
509                .cmp(&right.from_id)
510                .then(left.kind.cmp(&right.kind))
511                .then(left.to_id.cmp(&right.to_id))
512        });
513        Ok(edges.into_iter().next())
514    }
515    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
516        let mut probes = Vec::new();
517        for edge in self.all_edges()? {
518            if edge.from_id == edge.to_id {
519                continue;
520            }
521            if let Some((key, value)) = edge
522                .properties
523                .iter()
524                .next()
525                .map(|(key, value)| (key.clone(), value.clone()))
526            {
527                probes.push((edge, GraphPropertyFilter { key, value }));
528            }
529        }
530        probes.sort_by(|(left_edge, left_filter), (right_edge, right_filter)| {
531            left_filter
532                .key
533                .cmp(&right_filter.key)
534                .then(left_filter.value.cmp(&right_filter.value))
535                .then_with(|| graph_edge_id(left_edge).cmp(&graph_edge_id(right_edge)))
536        });
537        Ok(probes.into_iter().next())
538    }
539    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>>;
540    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>>;
541    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
542        let mut edges = self
543            .all_edges()?
544            .into_iter()
545            .filter(|edge| edge.from_id == node_id || edge.to_id == node_id)
546            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
547            .collect::<Vec<_>>();
548        edges.sort_by_key(graph_edge_id);
549        Ok(edges)
550    }
551    fn semantic_seeded_expansion_edges(
552        &self,
553        current_id: &str,
554        options: &SemanticSeededNeighborhoodOptions,
555    ) -> Result<SemanticSeededNeighborhoodExpansion> {
556        let mut expansion_edges_by_key = BTreeMap::<String, GraphEdge>::new();
557        for edge in self.outgoing_edges(current_id, None)? {
558            expansion_edges_by_key
559                .entry(graph_edge_id(&edge))
560                .or_insert(edge);
561        }
562        for edge in self.incident_edges(current_id, None)? {
563            expansion_edges_by_key
564                .entry(graph_edge_id(&edge))
565                .or_insert(edge);
566        }
567        let mut edges = expansion_edges_by_key.into_values().collect::<Vec<_>>();
568        edges.sort_by(|left, right| {
569            graph_semantic_seeded_edge_score(right, current_id)
570                .cmp(&graph_semantic_seeded_edge_score(left, current_id))
571                .then_with(|| graph_edge_id(left).cmp(&graph_edge_id(right)))
572        });
573        let mut skipped_by_edge_cap = 0usize;
574        if options.edge_scan_cap > 0 && edges.len() > options.edge_scan_cap {
575            skipped_by_edge_cap = edges.len() - options.edge_scan_cap;
576            edges.truncate(options.edge_scan_cap);
577        }
578        Ok(SemanticSeededNeighborhoodExpansion {
579            edges,
580            skipped_by_edge_cap,
581        })
582    }
583    fn paged_edges(
584        &self,
585        kind: Option<&str>,
586        options: GraphQueryOptions,
587    ) -> Result<GraphPagedSubgraph> {
588        let edges = self
589            .all_edges()?
590            .into_iter()
591            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
592            .collect::<Vec<_>>();
593        Ok(apply_graph_edge_query_page(edges, options, Vec::new()))
594    }
595    fn paged_incident_edges(
596        &self,
597        node_id: &str,
598        kind: Option<&str>,
599        options: GraphQueryOptions,
600    ) -> Result<GraphPagedSubgraph> {
601        Ok(apply_graph_edge_query_page(
602            self.incident_edges(node_id, kind)?,
603            options,
604            Vec::new(),
605        ))
606    }
607    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
608        let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
609        for from_id in node_ids {
610            for edge in self.outgoing_edges(from_id, None)? {
611                if node_ids.contains(&edge.to_id) {
612                    edges
613                        .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
614                        .or_insert(edge);
615                }
616            }
617        }
618        Ok(edges.into_values().collect())
619    }
620    fn shortest_path(
621        &self,
622        from_id: &str,
623        to_id: &str,
624        kind: Option<&str>,
625    ) -> Result<Option<GraphPath>>;
626    fn shortest_path_with_max_hops(
627        &self,
628        from_id: &str,
629        to_id: &str,
630        kind: Option<&str>,
631        max_hops: Option<usize>,
632    ) -> Result<Option<GraphPath>> {
633        shortest_path_using_outgoing(from_id, to_id, kind, max_hops, |current, kind| {
634            self.outgoing_edges(current, kind)
635        })
636    }
637    fn neighborhood(
638        &self,
639        center_id: &str,
640        depth: usize,
641        kind: Option<&str>,
642    ) -> Result<Option<GraphSubgraph>> {
643        let Some(center) = self.node(center_id)? else {
644            return Ok(None);
645        };
646        let ctx = LazyContext::new();
647        let initial = NeighborhoodLayerState {
648            nodes: BTreeMap::from([(center_id.to_string(), center)]),
649            edges: BTreeMap::new(),
650            frontier: vec![center_id.to_string()],
651        };
652        let mut layer: SlotHandle<std::result::Result<NeighborhoodLayerState, String>> =
653            ctx.slot(move |_| Ok(initial.clone()));
654        let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
655
656        for _ in 0..depth {
657            if state.frontier.is_empty() {
658                break;
659            }
660            let fetched = fetch_neighborhood_layer(self, &state.frontier, &state.nodes, kind)?;
661            let previous = layer;
662            layer = ctx.slot(move |ctx| {
663                let mut state = ctx.get(&previous)?;
664                for edge in &fetched.edges {
665                    let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
666                    state.edges.entry(edge_key).or_insert_with(|| edge.clone());
667                }
668                state.frontier.clear();
669                for node in &fetched.nodes {
670                    if !state.nodes.contains_key(&node.id) {
671                        state.frontier.push(node.id.clone());
672                        state.nodes.insert(node.id.clone(), node.clone());
673                    }
674                }
675                Ok(state)
676            });
677            state = ctx.get(&layer).map_err(graph_cache_error)?;
678        }
679
680        Ok(Some(
681            GraphSubgraph {
682                nodes: state.nodes.into_values().collect(),
683                edges: state.edges.into_values().collect(),
684            }
685            .sorted(),
686        ))
687    }
688    fn paged_nodes_by_kind(
689        &self,
690        kind: &str,
691        options: GraphQueryOptions,
692    ) -> Result<GraphPagedSubgraph> {
693        Ok(apply_graph_query_page(
694            self.nodes_by_kind(kind)?,
695            Vec::new(),
696            options,
697            Vec::new(),
698        ))
699    }
700    fn paged_neighborhood(
701        &self,
702        center_id: &str,
703        depth: usize,
704        kind: Option<&str>,
705        options: GraphQueryOptions,
706    ) -> Result<Option<GraphPagedSubgraph>> {
707        Ok(self.neighborhood(center_id, depth, kind)?.map(|subgraph| {
708            apply_graph_query_page(subgraph.nodes, subgraph.edges, options, Vec::new())
709        }))
710    }
711    fn semantic_seeded_neighborhood(
712        &self,
713        seed_ids: &[String],
714        options: &SemanticSeededNeighborhoodOptions,
715    ) -> Result<SemanticSeededNeighborhoodResult> {
716        let seed_rank = seed_ids
717            .iter()
718            .enumerate()
719            .map(|(idx, seed)| (seed.clone(), idx))
720            .collect::<BTreeMap<_, _>>();
721        let seed_nodes_by_id = self
722            .nodes_by_ids(seed_ids)?
723            .into_iter()
724            .map(|node| (node.id.clone(), node))
725            .collect::<BTreeMap<_, _>>();
726        let mut nodes = BTreeMap::<String, GraphNode>::new();
727        let mut edges = BTreeMap::<String, GraphEdge>::new();
728        let mut node_score_by_id = BTreeMap::<String, i64>::new();
729        let mut queue = VecDeque::<(String, usize)>::new();
730        let mut seen_at_depth = BTreeMap::<String, usize>::new();
731        let mut missing_seed_ids = Vec::new();
732        let mut skipped_by_edge_cap = 0usize;
733        let mut skipped_by_node_cap = 0usize;
734
735        for (idx, seed_id) in seed_ids.iter().enumerate() {
736            if let Some(node) = seed_nodes_by_id.get(seed_id) {
737                nodes.entry(seed_id.clone()).or_insert_with(|| node.clone());
738                node_score_by_id
739                    .entry(seed_id.clone())
740                    .or_insert(1_000_000i64.saturating_sub(idx as i64));
741                if !seen_at_depth.contains_key(seed_id) {
742                    queue.push_back((seed_id.clone(), 0));
743                    seen_at_depth.insert(seed_id.clone(), 0);
744                }
745            } else {
746                missing_seed_ids.push(seed_id.clone());
747            }
748        }
749
750        while let Some((current_id, current_depth)) = queue.pop_front() {
751            if current_depth >= options.depth {
752                continue;
753            }
754
755            let expansion = self.semantic_seeded_expansion_edges(&current_id, options)?;
756            skipped_by_edge_cap = skipped_by_edge_cap.saturating_add(expansion.skipped_by_edge_cap);
757            let mut candidates = Vec::new();
758            let mut missing_candidate_ids = Vec::new();
759            for edge in expansion.edges {
760                let Some(other_id) = graph_semantic_seeded_edge_other_id(&edge, &current_id) else {
761                    continue;
762                };
763                let other_id = other_id.to_string();
764                let edge_score = graph_semantic_seeded_edge_score(&edge, &current_id)
765                    .saturating_add(
766                        (options.depth.saturating_sub(current_depth) as i64).saturating_mul(5),
767                    );
768                if !nodes.contains_key(&other_id) {
769                    missing_candidate_ids.push(other_id.clone());
770                }
771                candidates.push((edge, other_id, edge_score));
772            }
773
774            missing_candidate_ids.sort();
775            missing_candidate_ids.dedup();
776            let fetched_nodes_by_id = self
777                .nodes_by_ids(&missing_candidate_ids)?
778                .into_iter()
779                .map(|node| (node.id.clone(), node))
780                .collect::<BTreeMap<_, _>>();
781
782            for (edge, other_id, edge_score) in candidates {
783                let other_known = nodes.contains_key(&other_id);
784                if !other_known && nodes.len() >= options.node_discovery_cap {
785                    skipped_by_node_cap = skipped_by_node_cap.saturating_add(1);
786                    continue;
787                }
788                node_score_by_id
789                    .entry(other_id.clone())
790                    .and_modify(|score| *score = (*score).max(edge_score))
791                    .or_insert(edge_score);
792                let edge_key = graph_edge_id(&edge);
793                edges.entry(edge_key).or_insert(edge);
794                if !other_known && let Some(node) = fetched_nodes_by_id.get(&other_id) {
795                    nodes.insert(other_id.clone(), node.clone());
796                }
797                if !nodes.contains_key(&other_id) {
798                    continue;
799                }
800                let next_depth = current_depth + 1;
801                let should_queue = seen_at_depth
802                    .get(&other_id)
803                    .is_none_or(|seen_depth| next_depth < *seen_depth);
804                if should_queue {
805                    seen_at_depth.insert(other_id.clone(), next_depth);
806                    queue.push_back((other_id, next_depth));
807                }
808            }
809        }
810
811        let mut nodes = nodes.into_values().collect::<Vec<_>>();
812        nodes.sort_by(|left, right| {
813            seed_rank
814                .get(&left.id)
815                .copied()
816                .unwrap_or(usize::MAX)
817                .cmp(&seed_rank.get(&right.id).copied().unwrap_or(usize::MAX))
818                .then_with(|| {
819                    node_score_by_id
820                        .get(&right.id)
821                        .copied()
822                        .unwrap_or_default()
823                        .cmp(&node_score_by_id.get(&left.id).copied().unwrap_or_default())
824                })
825                .then(left.id.cmp(&right.id))
826        });
827
828        let total_discovered = nodes.len();
829        let truncated = options.limit > 0 && nodes.len() > options.limit;
830        if truncated {
831            nodes.truncate(options.limit);
832        }
833
834        let node_ids = nodes
835            .iter()
836            .map(|node| node.id.as_str())
837            .collect::<BTreeSet<_>>();
838        let mut edges = edges
839            .into_values()
840            .filter(|edge| {
841                node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
842            })
843            .collect::<Vec<_>>();
844        edges.sort_by_key(graph_edge_id);
845
846        Ok(SemanticSeededNeighborhoodResult {
847            nodes,
848            edges,
849            skipped_by_edge_cap,
850            skipped_by_node_cap,
851            missing_seed_ids,
852            total_discovered,
853            truncated,
854        })
855    }
856    fn ranked_neighborhood(
857        &self,
858        center_id: &str,
859        options: &RankedNeighborhoodOptions,
860    ) -> Result<Option<RankedNeighborhoodResult>> {
861        let Some(center) = self.node(center_id)? else {
862            return Ok(None);
863        };
864        let ctx = LazyContext::new();
865        let initial = RankedNeighborhoodLayerState {
866            nodes: BTreeMap::from([(center_id.to_string(), center)]),
867            edges: BTreeMap::new(),
868            queue: BinaryHeap::from([ScoredQueueEntry {
869                id: center_id.to_string(),
870                depth: 0usize,
871                score: i64::MAX,
872            }]),
873            seen: BTreeSet::from([center_id.to_string()]),
874            pruned_count: 0,
875            total_discovered: 1,
876            degree_map: BTreeMap::new(),
877        };
878        let mut layer: SlotHandle<std::result::Result<RankedNeighborhoodLayerState, String>> =
879            ctx.slot(move |_| Ok(initial.clone()));
880        let mut state = ctx.get(&layer).map_err(graph_cache_error)?;
881        let options = options.clone();
882        let score_context = NeighborhoodScoreContext::from_options(&options);
883
884        while let Some(entry) = state.queue.peek().cloned() {
885            let fetched = if entry.depth < options.depth {
886                Some(fetch_ranked_neighborhood_expansion(
887                    self,
888                    &entry,
889                    &state,
890                    options.edge_kind.as_deref(),
891                )?)
892            } else {
893                None
894            };
895            let previous = layer;
896            let options = options.clone();
897            layer = ctx.slot(move |ctx| {
898                let mut state = ctx.get(&previous)?;
899                let Some(entry) = state.queue.pop() else {
900                    return Ok(state);
901                };
902                let Some(fetched) = &fetched else {
903                    return Ok(state);
904                };
905                let mut candidates = Vec::new();
906                for edge in &fetched.edges {
907                    let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
908                    state.edges.entry(edge_key).or_insert_with(|| edge.clone());
909                    *state.degree_map.entry(edge.from_id.clone()).or_default() += 1;
910                    *state.degree_map.entry(edge.to_id.clone()).or_default() += 1;
911                    if state.seen.contains(&edge.to_id) {
912                        continue;
913                    }
914                    let Some(neighbor) = fetched.neighbor_nodes.get(&edge.to_id) else {
915                        continue;
916                    };
917                    let score = compute_ranked_neighborhood_score(
918                        &options,
919                        score_context,
920                        entry.depth + 1,
921                        &edge.kind,
922                        neighbor,
923                        &state.degree_map,
924                    );
925                    candidates.push((edge.to_id.clone(), neighbor.clone(), score));
926                }
927                candidates.sort_by(|a, b| b.2.cmp(&a.2).then_with(|| a.0.cmp(&b.0)));
928                for (to_id, neighbor, score) in candidates {
929                    if !state.seen.insert(to_id.clone()) {
930                        continue;
931                    }
932                    state.total_discovered += 1;
933                    if state.nodes.len() > options.max_nodes {
934                        state.pruned_count += 1;
935                        continue;
936                    }
937                    state.nodes.insert(to_id.clone(), neighbor);
938                    state.queue.push(ScoredQueueEntry {
939                        id: to_id,
940                        depth: entry.depth + 1,
941                        score,
942                    });
943                }
944                Ok(state)
945            });
946            state = ctx.get(&layer).map_err(graph_cache_error)?;
947        }
948
949        let node_ids: BTreeSet<_> = state.nodes.keys().cloned().collect();
950        state
951            .edges
952            .retain(|_, edge| node_ids.contains(&edge.from_id) && node_ids.contains(&edge.to_id));
953
954        Ok(Some(RankedNeighborhoodResult {
955            nodes: state.nodes.into_values().collect(),
956            edges: state.edges.into_values().collect(),
957            pruned_count: state.pruned_count,
958            total_discovered: state.total_discovered,
959        }))
960    }
961    fn reachable_nodes_by_kind(
962        &self,
963        from_id: &str,
964        kind: &str,
965        depth: usize,
966        limit: usize,
967    ) -> Result<Vec<(GraphNode, GraphPath)>> {
968        let mut rows = BTreeMap::<String, (GraphNode, GraphPath)>::new();
969        let mut seen = BTreeSet::from([from_id.to_string()]);
970        let mut queue = VecDeque::from([(from_id.to_string(), vec![from_id.to_string()])]);
971
972        while let Some((current, path)) = queue.pop_front() {
973            let current_depth = path.len().saturating_sub(1);
974            if current_depth >= depth {
975                continue;
976            }
977            for edge in self.outgoing_edges(&current, None)? {
978                if !seen.insert(edge.to_id.clone()) {
979                    continue;
980                }
981                let Some(node) = self.node(&edge.to_id)? else {
982                    continue;
983                };
984                let mut next_path = path.clone();
985                next_path.push(edge.to_id.clone());
986                let graph_path = GraphPath {
987                    hops: next_path.len().saturating_sub(1),
988                    nodes: next_path.clone(),
989                };
990                if node.kind == kind {
991                    rows.entry(node.id.clone()).or_insert((node, graph_path));
992                }
993                queue.push_back((edge.to_id, next_path));
994            }
995        }
996        let mut rows = rows.into_values().collect::<Vec<_>>();
997        rows.sort_by(|(left_node, left_path), (right_node, right_path)| {
998            left_path
999                .hops
1000                .cmp(&right_path.hops)
1001                .then(left_node.label.cmp(&right_node.label))
1002                .then(left_node.id.cmp(&right_node.id))
1003        });
1004        if limit > 0 && rows.len() > limit {
1005            rows.truncate(limit);
1006        }
1007        Ok(rows)
1008    }
1009
1010    fn reachable_nodes_by_kinds(
1011        &self,
1012        from_id: &str,
1013        kinds: &[&str],
1014        depth: usize,
1015        limit: usize,
1016    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
1017        let mut rows = BTreeMap::new();
1018        for kind in kinds {
1019            rows.insert(
1020                (*kind).to_string(),
1021                self.reachable_nodes_by_kind(from_id, kind, depth, limit)?,
1022            );
1023        }
1024        Ok(rows)
1025    }
1026
1027    fn semantic_top_candidates(
1028        &self,
1029        query_vector: &[f64],
1030        kinds: &[&str],
1031        limit: usize,
1032    ) -> Result<Vec<GraphSemanticCandidate>> {
1033        graph_semantic_top_candidates_by_property_scan(self, query_vector, kinds, limit)
1034    }
1035
1036    fn evidence_target_candidates(
1037        &self,
1038        target: &str,
1039        kinds: &[&str],
1040        preferred_path: Option<&str>,
1041    ) -> Result<Vec<GraphNode>> {
1042        let normalized = target.trim().trim_start_matches('#');
1043        let normalized_label = format!("#{normalized}");
1044        let mut rows = Vec::new();
1045        for kind in kinds {
1046            let mut candidates = self
1047                .nodes_by_kind(kind)?
1048                .into_iter()
1049                .filter(|node| {
1050                    (node.properties.get("handle").map(String::as_str) == Some(target)
1051                        || node.properties.get("ref_id").map(String::as_str) == Some(normalized)
1052                        || node.label == target
1053                        || node.label == normalized_label)
1054                        && preferred_path.is_none_or(|path| {
1055                            node.properties.get("path").map(String::as_str) == Some(path)
1056                        })
1057                })
1058                .collect::<Vec<_>>();
1059            candidates.sort_by(|left, right| left.id.cmp(&right.id));
1060            rows.extend(candidates);
1061        }
1062        Ok(rows)
1063    }
1064
1065    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
1066        if let Some(node) = self.node(target)? {
1067            return Ok(Some(node));
1068        }
1069        Ok(self
1070            .evidence_target_candidates(target, kinds, None)?
1071            .into_iter()
1072            .next())
1073    }
1074}
1075
1076pub fn graph_semantic_top_candidates_by_property_scan<S: GraphStore + ?Sized>(
1077    store: &S,
1078    query_vector: &[f64],
1079    kinds: &[&str],
1080    limit: usize,
1081) -> Result<Vec<GraphSemanticCandidate>> {
1082    if query_vector.is_empty() || kinds.is_empty() {
1083        return Ok(Vec::new());
1084    }
1085    let mut seen_kinds = BTreeSet::new();
1086    let mut candidates = Vec::new();
1087    for kind in kinds {
1088        if !seen_kinds.insert(*kind) {
1089            continue;
1090        }
1091        for node in store.nodes_by_kind(kind)? {
1092            let Some(vector) = node
1093                .properties
1094                .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
1095                .and_then(|value| parse_graph_semantic_vector_property(value))
1096            else {
1097                continue;
1098            };
1099            if vector.len() != query_vector.len() {
1100                continue;
1101            }
1102            candidates.push(GraphSemanticCandidate {
1103                score: graph_semantic_cosine(query_vector, &vector),
1104                node,
1105            });
1106        }
1107    }
1108    candidates.sort_by(|left, right| {
1109        right
1110            .score
1111            .partial_cmp(&left.score)
1112            .unwrap_or(std::cmp::Ordering::Equal)
1113            .then_with(|| left.node.kind.cmp(&right.node.kind))
1114            .then_with(|| left.node.label.cmp(&right.node.label))
1115            .then_with(|| left.node.id.cmp(&right.node.id))
1116    });
1117    if limit > 0 && candidates.len() > limit {
1118        candidates.truncate(limit);
1119    }
1120    Ok(candidates)
1121}
1122
1123pub fn shortest_path_using_outgoing(
1124    from_id: &str,
1125    to_id: &str,
1126    kind: Option<&str>,
1127    max_hops: Option<usize>,
1128    mut outgoing_edges: impl FnMut(&str, Option<&str>) -> Result<Vec<GraphEdge>>,
1129) -> Result<Option<GraphPath>> {
1130    if from_id == to_id {
1131        return Ok(Some(GraphPath {
1132            nodes: vec![from_id.to_string()],
1133            hops: 0,
1134        }));
1135    }
1136
1137    let mut queue = VecDeque::new();
1138    let mut parent = BTreeMap::<String, (usize, String)>::new();
1139    parent.insert(from_id.to_string(), (0, String::new()));
1140    queue.push_back(from_id.to_string());
1141
1142    while let Some(current) = queue.pop_front() {
1143        let current_depth = parent.get(&current).map(|(d, _)| *d).unwrap_or(0);
1144        if max_hops.is_some_and(|max_hops| current_depth >= max_hops) {
1145            continue;
1146        }
1147        for edge in outgoing_edges(&current, kind)? {
1148            if parent.contains_key(&edge.to_id) {
1149                continue;
1150            }
1151            parent.insert(edge.to_id.clone(), (current_depth + 1, current.clone()));
1152            if edge.to_id == to_id {
1153                let mut nodes = vec![to_id.to_string()];
1154                let mut cursor = to_id;
1155                while let Some((_, previous)) = parent.get(cursor) {
1156                    if previous.is_empty() {
1157                        break;
1158                    }
1159                    nodes.push(previous.clone());
1160                    cursor = previous;
1161                }
1162                nodes.reverse();
1163                let hops = nodes.len().saturating_sub(1);
1164                return Ok(Some(GraphPath { nodes, hops }));
1165            }
1166            queue.push_back(edge.to_id);
1167        }
1168    }
1169
1170    Ok(None)
1171}