offline_intelligence/context_engine/
tier_manager.rs1use crate::memory::Message;
4use crate::memory_db::{MemoryDatabase, StoredMessage, Summary as DbSummary, SessionMetadata};
5use moka::sync::Cache;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{debug, info};
9
10#[derive(Debug, Clone)]
12pub struct TierManagerConfig {
13 pub tier1_max_messages: usize,
14 pub tier2_max_summaries: usize,
15 pub tier2_cache_ttl_seconds: u64,
16 pub enable_tier3_persistence: bool,
17}
18
19impl Default for TierManagerConfig {
20 fn default() -> Self {
21 Self {
22 tier1_max_messages: 50,
23 tier2_max_summaries: 20,
24 tier2_cache_ttl_seconds: 3600,
25 enable_tier3_persistence: true,
26 }
27 }
28}
29
30#[derive(Debug, Clone, Default)]
32pub struct TierStats {
33 pub tier1_count: usize,
34 pub tier2_count: usize,
35 pub tier3_count: usize,
36}
37
38pub struct TierManager {
39 database: Arc<MemoryDatabase>,
40 tier1_cache: Cache<String, (Vec<Message>, Instant)>,
41 tier2_cache: Cache<String, (Vec<DbSummary>, Instant)>,
42 pub config: TierManagerConfig,
43}
44
45impl TierManager {
46 pub fn new(
47 database: Arc<MemoryDatabase>,
48 config: TierManagerConfig
49 ) -> Self {
50 Self {
51 database,
52 tier1_cache: Cache::builder()
53 .max_capacity(1000)
54 .time_to_idle(Duration::from_secs(3600))
55 .build(),
56 tier2_cache: Cache::builder()
57 .max_capacity(500)
58 .time_to_idle(Duration::from_secs(config.tier2_cache_ttl_seconds))
59 .build(),
60 config,
61 }
62 }
63
64 pub async fn store_tier1_content(&self, session_id: &str, messages: &[Message]) {
67 let messages_to_store = if messages.len() > self.config.tier1_max_messages {
69 &messages[messages.len() - self.config.tier1_max_messages..]
70 } else {
71 messages
72 };
73
74 self.tier1_cache.insert(session_id.to_string(), (messages_to_store.to_vec(), Instant::now()));
75 }
76
77 pub async fn get_tier1_content(&self, session_id: &str) -> Option<Vec<Message>> {
78 self.tier1_cache.get(session_id).map(|(m, _)| m)
79 }
80
81 pub async fn get_tier2_content(&self, session_id: &str) -> Option<Vec<DbSummary>> {
84 if let Some((summaries, _)) = self.tier2_cache.get(session_id) {
86 return Some(summaries);
87 }
88
89 match self.database.summaries.get_session_summaries(session_id) {
91 Ok(summaries) => {
92 if !summaries.is_empty() {
94 self.tier2_cache.insert(session_id.to_string(), (summaries.clone(), Instant::now()));
95 }
96 Some(summaries)
97 }
98 Err(e) => {
99 debug!("Error getting summaries from database: {}", e);
100 None
101 }
102 }
103 }
104
105 pub async fn get_tier3_content(
108 &self,
109 session_id: &str,
110 limit: Option<i32>,
111 offset: Option<i32>
112 ) -> anyhow::Result<Vec<StoredMessage>> {
113 self.database.conversations.get_session_messages(session_id, limit, offset)
114 }
115
116 pub async fn search_tier3_content(
117 &self,
118 session_id: &str,
119 query: &str,
120 limit: usize
121 ) -> anyhow::Result<Vec<StoredMessage>> {
122 let messages = self.database.conversations.get_session_messages(session_id, Some(1000), None)?;
123 let query_lower = query.to_lowercase();
124
125 let filtered = messages.into_iter()
126 .filter(|m| m.content.to_lowercase().contains(&query_lower))
127 .take(limit)
128 .collect();
129
130 Ok(filtered)
131 }
132
133 pub async fn store_tier3_content(&self, session_id: &str, messages: &[Message]) -> anyhow::Result<()> {
134 if !self.config.enable_tier3_persistence || messages.is_empty() {
135 return Ok(());
136 }
137
138 self.ensure_session_exists(session_id, None).await?;
140
141 let existing_messages = self.database.conversations.get_session_messages(
143 session_id, Some(10000), Some(0)
144 ).unwrap_or_else(|_| vec![]);
145
146 let new_messages: Vec<&Message> = messages.iter()
148 .filter(|new_msg| {
149 !existing_messages.iter().any(|existing| {
150 existing.content == new_msg.content &&
151 existing.role == new_msg.role
152 })
153 })
154 .collect();
155
156 if new_messages.is_empty() {
157 debug!("No new messages to save, all already exist in database");
158 return Ok(()); }
160
161 let start_index = existing_messages.len() as i32;
162
163 let batch_data: Vec<(String, String, i32, i32, f32)> = new_messages
165 .iter()
166 .enumerate()
167 .map(|(offset, m)| (
168 m.role.clone(),
169 m.content.clone(),
170 start_index + offset as i32, (m.content.len() / 4) as i32,
172 0.5
173 ))
174 .collect();
175
176 if !batch_data.is_empty() {
177 self.database.conversations.store_messages_batch(session_id, &batch_data)?;
178 info!("📝 Stored {} new messages to database for session {}", batch_data.len(), session_id);
179 }
180
181 Ok(())
182 }
183
184 pub async fn search_cross_session_content(
188 &self,
189 current_session_id: &str,
190 query: &str,
191 limit: usize,
192 ) -> anyhow::Result<Vec<StoredMessage>> {
193 let keywords = self.extract_keywords(query);
195
196 if keywords.is_empty() {
197 return Ok(vec![]);
198 }
199
200 self.database.conversations.search_messages_by_topic_across_sessions(
202 &keywords,
203 limit,
204 Some(current_session_id), ).await
206 }
207
208 fn extract_keywords(&self, text: &str) -> Vec<String> {
209 let words: Vec<&str> = text.split_whitespace().collect();
210 words.iter()
211 .filter(|w| w.len() > 3)
212 .map(|w| w.to_lowercase())
213 .filter(|w| !self.is_stop_word(w))
214 .collect()
215 }
216
217 fn is_stop_word(&self, word: &str) -> bool {
218 let stop_words = [
219 "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
220 "of", "with", "by", "is", "am", "are", "was", "were", "be", "been",
221 "being", "have", "has", "had", "do", "does", "did", "will", "would",
222 "shall", "should", "may", "might", "must", "can", "could",
223 ];
224 stop_words.contains(&word)
225 }
226
227 pub async fn get_tier_stats(&self, session_id: &str) -> TierStats {
230 let tier1_count = self.get_tier1_content(session_id).await
231 .map(|m| m.len())
232 .unwrap_or(0);
233
234 let tier2_count = self.get_tier2_content(session_id).await
235 .map(|s| s.len())
236 .unwrap_or(0);
237
238 let tier3_count = match self.database.conversations.get_session_messages(session_id, Some(10000), None) {
239 Ok(messages) => messages.len(),
240 Err(_) => 0,
241 };
242
243 TierStats {
244 tier1_count,
245 tier2_count,
246 tier3_count
247 }
248 }
249
250 pub async fn cleanup_cache(&self, _older_than_seconds: u64) -> usize {
251 let count = self.tier1_cache.entry_count() + self.tier2_cache.entry_count();
252
253 self.tier1_cache.invalidate_all();
256 self.tier2_cache.invalidate_all();
257
258 count as usize
259 }
260
261 pub async fn ensure_session_exists(
263 &self,
264 session_id: &str,
265 title: Option<String>
266 ) -> anyhow::Result<()> {
267 let exists = self.database.conversations.get_session(session_id)?;
268 if exists.is_none() {
269 let metadata = SessionMetadata {
271 title, ..Default::default()
273 };
274 self.database.conversations.create_session_with_id(session_id, Some(metadata))?;
275 }
276 Ok(())
277 }
278}
279
280impl Clone for TierManager {
281 fn clone(&self) -> Self {
282 Self {
283 database: self.database.clone(),
284 tier1_cache: Cache::builder()
285 .max_capacity(1000)
286 .time_to_idle(Duration::from_secs(3600))
287 .build(),
288 tier2_cache: Cache::builder()
289 .max_capacity(500)
290 .time_to_idle(Duration::from_secs(self.config.tier2_cache_ttl_seconds))
291 .build(),
292 config: self.config.clone(),
293 }
294 }
295}