Skip to main content

hirn_engine/db/
graph_runtime.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6
7use hirn_core::id::MemoryId;
8use hirn_storage::PhysicalStore;
9
10use crate::cached_graph_store::CachedGraphStore;
11use crate::graph_store::GraphStore;
12use crate::hebbian::HebbianBuffer;
13use crate::index_advisor::{IndexAdvisor, QueryKind};
14use crate::persistent_graph::PersistentGraph;
15
16/// Statistics about predictive prefetch activity.
17#[derive(Debug, Clone, Default)]
18pub struct PrefetchStats {
19    /// Total number of records prefetched.
20    pub prefetched_count: u64,
21    /// Approximate bytes prefetched (estimated from record count).
22    pub bytes_estimate: u64,
23    /// Number of prefetch skips due to cooldown.
24    pub cooldown_skips: u64,
25    /// Number of prefetch skips due to byte budget.
26    pub budget_skips: u64,
27}
28
29pub(crate) struct GraphRuntime {
30    cached_graph: CachedGraphStore,
31    hebbian_buffer: HebbianBuffer,
32    episodic_access_buffer: Mutex<HashMap<MemoryId, usize>>,
33    semantic_access_buffer: Mutex<HashMap<MemoryId, usize>>,
34    reconsolidation_tracker: crate::consolidation::ReconsolidationTracker,
35    prefetch_cooldown: Mutex<HashMap<MemoryId, Instant>>,
36    prefetch_stats: Mutex<PrefetchStats>,
37    index_advisor: IndexAdvisor,
38    cached_community_result: Mutex<Option<crate::consolidation::CommunityResult>>,
39}
40
41impl GraphRuntime {
42    pub(crate) fn new(storage: Arc<dyn PhysicalStore>) -> Self {
43        let persistent_graph = PersistentGraph::new(storage);
44        let cached_graph = CachedGraphStore::new(Arc::new(persistent_graph));
45
46        Self {
47            cached_graph,
48            hebbian_buffer: HebbianBuffer::new(),
49            episodic_access_buffer: Mutex::new(HashMap::new()),
50            semantic_access_buffer: Mutex::new(HashMap::new()),
51            reconsolidation_tracker: crate::consolidation::ReconsolidationTracker::new(),
52            prefetch_cooldown: Mutex::new(HashMap::new()),
53            prefetch_stats: Mutex::new(PrefetchStats::default()),
54            index_advisor: IndexAdvisor::new(),
55            cached_community_result: Mutex::new(None),
56        }
57    }
58
59    pub(crate) fn cached_graph(&self) -> &CachedGraphStore {
60        &self.cached_graph
61    }
62
63    pub(crate) fn persistent_graph(&self) -> &PersistentGraph {
64        self.cached_graph.cold()
65    }
66
67    pub(crate) fn graph_store(&self) -> &dyn GraphStore {
68        &self.cached_graph as &dyn GraphStore
69    }
70
71    pub(crate) fn reconsolidation_tracker(&self) -> &crate::consolidation::ReconsolidationTracker {
72        &self.reconsolidation_tracker
73    }
74
75    pub(crate) fn open_reconsolidation_window(&self, id: MemoryId, window_secs: u64) {
76        self.reconsolidation_tracker.open_window(id, window_secs);
77    }
78
79    pub(crate) fn push_hebbian(&self, ids: Vec<MemoryId>) -> bool {
80        self.hebbian_buffer.push(ids)
81    }
82
83    pub(crate) fn reset_hebbian_counter(&self) {
84        self.hebbian_buffer.reset_counter();
85    }
86
87    pub(crate) fn pop_hebbian(&self) -> Option<Vec<MemoryId>> {
88        self.hebbian_buffer.pop()
89    }
90
91    pub(crate) fn buffer_semantic_access(&self, id: MemoryId) {
92        let mut buffer = self.semantic_access_buffer.lock();
93        *buffer.entry(id).or_insert(0) += 1;
94    }
95
96    pub(crate) fn buffer_episodic_access(&self, id: MemoryId) {
97        let mut buffer = self.episodic_access_buffer.lock();
98        *buffer.entry(id).or_insert(0) += 1;
99    }
100
101    pub(crate) fn drain_episodic_access(&self) -> HashMap<MemoryId, usize> {
102        let mut buffer = self.episodic_access_buffer.lock();
103        std::mem::take(&mut *buffer)
104    }
105
106    pub(crate) fn drain_semantic_access(&self) -> HashMap<MemoryId, usize> {
107        let mut buffer = self.semantic_access_buffer.lock();
108        std::mem::take(&mut *buffer)
109    }
110
111    pub(crate) fn take_cached_community_result(
112        &self,
113    ) -> Option<crate::consolidation::CommunityResult> {
114        self.cached_community_result.lock().take()
115    }
116
117    pub(crate) fn set_cached_community_result(
118        &self,
119        result: crate::consolidation::CommunityResult,
120    ) {
121        *self.cached_community_result.lock() = Some(result);
122    }
123
124    pub(crate) fn prefetch_stats(&self) -> PrefetchStats {
125        self.prefetch_stats.lock().clone()
126    }
127
128    pub(crate) fn index_advisor(&self) -> &IndexAdvisor {
129        &self.index_advisor
130    }
131
132    pub(crate) fn record_query(&self, dataset: &str, query_kind: QueryKind, elapsed: Duration) {
133        self.index_advisor
134            .record_query(dataset, query_kind, elapsed);
135    }
136
137    pub(crate) fn apply_prefetch_cooldown(
138        &self,
139        ids: &mut Vec<MemoryId>,
140        now: Instant,
141        cooldown: Duration,
142    ) {
143        let cooldown_map = self.prefetch_cooldown.lock();
144        let pre_len = ids.len();
145        ids.retain(|id| {
146            cooldown_map
147                .get(id)
148                .map_or(true, |&last| now.duration_since(last) >= cooldown)
149        });
150        let skipped = pre_len - ids.len();
151        if skipped > 0 {
152            self.prefetch_stats.lock().cooldown_skips += skipped as u64;
153        }
154    }
155
156    pub(crate) fn apply_prefetch_budget(&self, ids: &mut Vec<MemoryId>, max_records: usize) {
157        if ids.len() > max_records {
158            let skipped = (ids.len() - max_records) as u64;
159            ids.truncate(max_records);
160            self.prefetch_stats.lock().budget_skips += skipped;
161        }
162    }
163
164    pub(crate) fn finish_prefetch(
165        &self,
166        ids: &[MemoryId],
167        now: Instant,
168        cooldown: Duration,
169        prefetched: u64,
170        bytes: u64,
171    ) {
172        {
173            let mut cooldown_map = self.prefetch_cooldown.lock();
174            for id in ids {
175                cooldown_map.insert(*id, now);
176            }
177            cooldown_map.retain(|_, instant| now.duration_since(*instant) < cooldown * 2);
178        }
179
180        let mut stats = self.prefetch_stats.lock();
181        stats.prefetched_count += prefetched;
182        stats.bytes_estimate += bytes;
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    use hirn_storage::memory_store::MemoryStore;
191
192    #[test]
193    fn new_runtime_starts_with_empty_graph_side_state() {
194        let runtime = GraphRuntime::new(Arc::new(MemoryStore::new()));
195
196        assert!(runtime.pop_hebbian().is_none());
197        assert!(runtime.drain_semantic_access().is_empty());
198        assert!(runtime.take_cached_community_result().is_none());
199
200        let stats = runtime.prefetch_stats();
201        assert_eq!(stats.prefetched_count, 0);
202        assert_eq!(stats.bytes_estimate, 0);
203        assert_eq!(stats.cooldown_skips, 0);
204        assert_eq!(stats.budget_skips, 0);
205    }
206
207    #[test]
208    fn buffers_and_prefetch_guards_update_runtime_metrics() {
209        let runtime = GraphRuntime::new(Arc::new(MemoryStore::new()));
210        let first = MemoryId::new();
211        let second = MemoryId::new();
212        let third = MemoryId::new();
213
214        runtime.buffer_semantic_access(first);
215        runtime.buffer_semantic_access(first);
216        runtime.buffer_semantic_access(second);
217
218        let access = runtime.drain_semantic_access();
219        assert_eq!(access.get(&first), Some(&2));
220        assert_eq!(access.get(&second), Some(&1));
221        assert!(runtime.drain_semantic_access().is_empty());
222
223        let now = Instant::now();
224        runtime.finish_prefetch(&[first, second], now, Duration::from_mins(1), 2, 512);
225
226        let mut cooldown_candidates = vec![first, second, third];
227        runtime.apply_prefetch_cooldown(&mut cooldown_candidates, now, Duration::from_mins(1));
228        assert_eq!(cooldown_candidates, vec![third]);
229
230        let mut budget_candidates = vec![third, MemoryId::new(), MemoryId::new()];
231        runtime.apply_prefetch_budget(&mut budget_candidates, 1);
232        assert_eq!(budget_candidates.len(), 1);
233
234        let stats = runtime.prefetch_stats();
235        assert_eq!(stats.prefetched_count, 2);
236        assert_eq!(stats.bytes_estimate, 512);
237        assert_eq!(stats.cooldown_skips, 2);
238        assert_eq!(stats.budget_skips, 2);
239    }
240}