Skip to main content

haystack_server/session/
affinity.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4/// Tracks per-session connector hit counts for reranking optimization.
5pub struct ConnectorAffinity {
6    /// Connector name → hit count since last rerank.
7    hits: HashMap<String, u64>,
8    /// Ranked connector names (most-accessed first).
9    ranked: Vec<String>,
10    /// Last rerank timestamp.
11    last_rerank: Instant,
12    /// Rerank interval in seconds.
13    rerank_interval_secs: u64,
14    /// Cache of entity_id → owning connector name for fast hisRead routing.
15    ownership_cache: HashMap<String, String>,
16}
17
18impl ConnectorAffinity {
19    pub fn new() -> Self {
20        Self {
21            hits: HashMap::new(),
22            ranked: Vec::new(),
23            last_rerank: Instant::now(),
24            rerank_interval_secs: 60,
25            ownership_cache: HashMap::new(),
26        }
27    }
28
29    /// Record a hit on a connector.
30    pub fn record_hit(&mut self, connector: &str) {
31        *self.hits.entry(connector.to_string()).or_insert(0) += 1;
32        self.maybe_rerank();
33    }
34
35    /// Record entity ownership for fast routing.
36    pub fn record_ownership(&mut self, entity_id: &str, connector: &str) {
37        self.ownership_cache
38            .insert(entity_id.to_string(), connector.to_string());
39    }
40
41    /// Get the ranked connector order (most-accessed first).
42    pub fn ranked_connectors(&self) -> &[String] {
43        &self.ranked
44    }
45
46    /// Look up which connector owns an entity (cached).
47    pub fn owner_of(&self, entity_id: &str) -> Option<&str> {
48        self.ownership_cache.get(entity_id).map(|s| s.as_str())
49    }
50
51    fn maybe_rerank(&mut self) {
52        if self.last_rerank.elapsed().as_secs() >= self.rerank_interval_secs {
53            self.rerank();
54        }
55    }
56
57    fn rerank(&mut self) {
58        let mut pairs: Vec<_> = self.hits.iter().collect();
59        pairs.sort_by(|a, b| b.1.cmp(a.1));
60        self.ranked = pairs.into_iter().map(|(k, _)| k.clone()).collect();
61        self.hits.clear();
62        self.last_rerank = Instant::now();
63    }
64}
65
66impl Default for ConnectorAffinity {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    #[test]
77    fn record_hit_and_ownership() {
78        let mut aff = ConnectorAffinity::new();
79        aff.record_hit("conn_a");
80        aff.record_hit("conn_b");
81        aff.record_ownership("entity1", "conn_a");
82        assert_eq!(aff.owner_of("entity1"), Some("conn_a"));
83        assert_eq!(aff.owner_of("missing"), None);
84    }
85
86    #[test]
87    fn rerank_orders_by_hits() {
88        let mut aff = ConnectorAffinity::new();
89        // Accumulate hits without triggering auto-rerank
90        aff.record_hit("low");
91        aff.record_hit("high");
92        aff.record_hit("high");
93        aff.record_hit("high");
94
95        // Force a rerank
96        aff.rerank();
97
98        let ranked = aff.ranked_connectors();
99        assert_eq!(ranked.len(), 2);
100        assert_eq!(ranked[0], "high");
101        assert_eq!(ranked[1], "low");
102    }
103
104    #[test]
105    fn ownership_cache_overwrites() {
106        let mut aff = ConnectorAffinity::new();
107        aff.record_ownership("e1", "conn_a");
108        assert_eq!(aff.owner_of("e1"), Some("conn_a"));
109        aff.record_ownership("e1", "conn_b");
110        assert_eq!(aff.owner_of("e1"), Some("conn_b"));
111    }
112
113    #[test]
114    fn default_creates_empty() {
115        let aff = ConnectorAffinity::default();
116        assert!(aff.ranked_connectors().is_empty());
117        assert_eq!(aff.owner_of("any"), None);
118    }
119}