hirn_engine/db/
graph_runtime.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6
7use hirn_core::id::MemoryId;
8use hirn_storage::PhysicalStore;
9
10use crate::cached_graph_store::CachedGraphStore;
11use crate::graph_store::GraphStore;
12use crate::hebbian::HebbianBuffer;
13use crate::index_advisor::{IndexAdvisor, QueryKind};
14use crate::persistent_graph::PersistentGraph;
15
16#[derive(Debug, Clone, Default)]
18pub struct PrefetchStats {
19 pub prefetched_count: u64,
21 pub bytes_estimate: u64,
23 pub cooldown_skips: u64,
25 pub budget_skips: u64,
27}
28
29pub(crate) struct GraphRuntime {
30 cached_graph: CachedGraphStore,
31 hebbian_buffer: HebbianBuffer,
32 episodic_access_buffer: Mutex<HashMap<MemoryId, usize>>,
33 semantic_access_buffer: Mutex<HashMap<MemoryId, usize>>,
34 reconsolidation_tracker: crate::consolidation::ReconsolidationTracker,
35 prefetch_cooldown: Mutex<HashMap<MemoryId, Instant>>,
36 prefetch_stats: Mutex<PrefetchStats>,
37 index_advisor: IndexAdvisor,
38 cached_community_result: Mutex<Option<crate::consolidation::CommunityResult>>,
39}
40
41impl GraphRuntime {
42 pub(crate) fn new(storage: Arc<dyn PhysicalStore>) -> Self {
43 let persistent_graph = PersistentGraph::new(storage);
44 let cached_graph = CachedGraphStore::new(Arc::new(persistent_graph));
45
46 Self {
47 cached_graph,
48 hebbian_buffer: HebbianBuffer::new(),
49 episodic_access_buffer: Mutex::new(HashMap::new()),
50 semantic_access_buffer: Mutex::new(HashMap::new()),
51 reconsolidation_tracker: crate::consolidation::ReconsolidationTracker::new(),
52 prefetch_cooldown: Mutex::new(HashMap::new()),
53 prefetch_stats: Mutex::new(PrefetchStats::default()),
54 index_advisor: IndexAdvisor::new(),
55 cached_community_result: Mutex::new(None),
56 }
57 }
58
59 pub(crate) fn cached_graph(&self) -> &CachedGraphStore {
60 &self.cached_graph
61 }
62
63 pub(crate) fn persistent_graph(&self) -> &PersistentGraph {
64 self.cached_graph.cold()
65 }
66
67 pub(crate) fn graph_store(&self) -> &dyn GraphStore {
68 &self.cached_graph as &dyn GraphStore
69 }
70
71 pub(crate) fn reconsolidation_tracker(&self) -> &crate::consolidation::ReconsolidationTracker {
72 &self.reconsolidation_tracker
73 }
74
75 pub(crate) fn open_reconsolidation_window(&self, id: MemoryId, window_secs: u64) {
76 self.reconsolidation_tracker.open_window(id, window_secs);
77 }
78
79 pub(crate) fn push_hebbian(&self, ids: Vec<MemoryId>) -> bool {
80 self.hebbian_buffer.push(ids)
81 }
82
83 pub(crate) fn reset_hebbian_counter(&self) {
84 self.hebbian_buffer.reset_counter();
85 }
86
87 pub(crate) fn pop_hebbian(&self) -> Option<Vec<MemoryId>> {
88 self.hebbian_buffer.pop()
89 }
90
91 pub(crate) fn buffer_semantic_access(&self, id: MemoryId) {
92 let mut buffer = self.semantic_access_buffer.lock();
93 *buffer.entry(id).or_insert(0) += 1;
94 }
95
96 pub(crate) fn buffer_episodic_access(&self, id: MemoryId) {
97 let mut buffer = self.episodic_access_buffer.lock();
98 *buffer.entry(id).or_insert(0) += 1;
99 }
100
101 pub(crate) fn drain_episodic_access(&self) -> HashMap<MemoryId, usize> {
102 let mut buffer = self.episodic_access_buffer.lock();
103 std::mem::take(&mut *buffer)
104 }
105
106 pub(crate) fn drain_semantic_access(&self) -> HashMap<MemoryId, usize> {
107 let mut buffer = self.semantic_access_buffer.lock();
108 std::mem::take(&mut *buffer)
109 }
110
111 pub(crate) fn take_cached_community_result(
112 &self,
113 ) -> Option<crate::consolidation::CommunityResult> {
114 self.cached_community_result.lock().take()
115 }
116
117 pub(crate) fn set_cached_community_result(
118 &self,
119 result: crate::consolidation::CommunityResult,
120 ) {
121 *self.cached_community_result.lock() = Some(result);
122 }
123
124 pub(crate) fn prefetch_stats(&self) -> PrefetchStats {
125 self.prefetch_stats.lock().clone()
126 }
127
128 pub(crate) fn index_advisor(&self) -> &IndexAdvisor {
129 &self.index_advisor
130 }
131
132 pub(crate) fn record_query(&self, dataset: &str, query_kind: QueryKind, elapsed: Duration) {
133 self.index_advisor
134 .record_query(dataset, query_kind, elapsed);
135 }
136
137 pub(crate) fn apply_prefetch_cooldown(
138 &self,
139 ids: &mut Vec<MemoryId>,
140 now: Instant,
141 cooldown: Duration,
142 ) {
143 let cooldown_map = self.prefetch_cooldown.lock();
144 let pre_len = ids.len();
145 ids.retain(|id| {
146 cooldown_map
147 .get(id)
148 .map_or(true, |&last| now.duration_since(last) >= cooldown)
149 });
150 let skipped = pre_len - ids.len();
151 if skipped > 0 {
152 self.prefetch_stats.lock().cooldown_skips += skipped as u64;
153 }
154 }
155
156 pub(crate) fn apply_prefetch_budget(&self, ids: &mut Vec<MemoryId>, max_records: usize) {
157 if ids.len() > max_records {
158 let skipped = (ids.len() - max_records) as u64;
159 ids.truncate(max_records);
160 self.prefetch_stats.lock().budget_skips += skipped;
161 }
162 }
163
164 pub(crate) fn finish_prefetch(
165 &self,
166 ids: &[MemoryId],
167 now: Instant,
168 cooldown: Duration,
169 prefetched: u64,
170 bytes: u64,
171 ) {
172 {
173 let mut cooldown_map = self.prefetch_cooldown.lock();
174 for id in ids {
175 cooldown_map.insert(*id, now);
176 }
177 cooldown_map.retain(|_, instant| now.duration_since(*instant) < cooldown * 2);
178 }
179
180 let mut stats = self.prefetch_stats.lock();
181 stats.prefetched_count += prefetched;
182 stats.bytes_estimate += bytes;
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 use hirn_storage::memory_store::MemoryStore;
191
192 #[test]
193 fn new_runtime_starts_with_empty_graph_side_state() {
194 let runtime = GraphRuntime::new(Arc::new(MemoryStore::new()));
195
196 assert!(runtime.pop_hebbian().is_none());
197 assert!(runtime.drain_semantic_access().is_empty());
198 assert!(runtime.take_cached_community_result().is_none());
199
200 let stats = runtime.prefetch_stats();
201 assert_eq!(stats.prefetched_count, 0);
202 assert_eq!(stats.bytes_estimate, 0);
203 assert_eq!(stats.cooldown_skips, 0);
204 assert_eq!(stats.budget_skips, 0);
205 }
206
207 #[test]
208 fn buffers_and_prefetch_guards_update_runtime_metrics() {
209 let runtime = GraphRuntime::new(Arc::new(MemoryStore::new()));
210 let first = MemoryId::new();
211 let second = MemoryId::new();
212 let third = MemoryId::new();
213
214 runtime.buffer_semantic_access(first);
215 runtime.buffer_semantic_access(first);
216 runtime.buffer_semantic_access(second);
217
218 let access = runtime.drain_semantic_access();
219 assert_eq!(access.get(&first), Some(&2));
220 assert_eq!(access.get(&second), Some(&1));
221 assert!(runtime.drain_semantic_access().is_empty());
222
223 let now = Instant::now();
224 runtime.finish_prefetch(&[first, second], now, Duration::from_mins(1), 2, 512);
225
226 let mut cooldown_candidates = vec![first, second, third];
227 runtime.apply_prefetch_cooldown(&mut cooldown_candidates, now, Duration::from_mins(1));
228 assert_eq!(cooldown_candidates, vec![third]);
229
230 let mut budget_candidates = vec![third, MemoryId::new(), MemoryId::new()];
231 runtime.apply_prefetch_budget(&mut budget_candidates, 1);
232 assert_eq!(budget_candidates.len(), 1);
233
234 let stats = runtime.prefetch_stats();
235 assert_eq!(stats.prefetched_count, 2);
236 assert_eq!(stats.bytes_estimate, 512);
237 assert_eq!(stats.cooldown_skips, 2);
238 assert_eq!(stats.budget_skips, 2);
239 }
240}