Skip to main content

offline_intelligence/context_engine/
tier_manager.rs

1//! Manages the three-tier memory system with robust persistence and indexing
2
3use crate::memory::Message;
4use crate::memory_db::{MemoryDatabase, StoredMessage, Summary as DbSummary, SessionMetadata};
5use moka::sync::Cache;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::{debug, info};
9
10/// Configuration for tier management
11#[derive(Debug, Clone)]
12pub struct TierManagerConfig {
13    pub tier1_max_messages: usize,
14    pub tier2_max_summaries: usize,
15    pub tier2_cache_ttl_seconds: u64,
16    pub enable_tier3_persistence: bool,
17}
18
19impl Default for TierManagerConfig {
20    fn default() -> Self {
21        Self {
22            tier1_max_messages: 50,
23            tier2_max_summaries: 20,
24            tier2_cache_ttl_seconds: 3600,
25            enable_tier3_persistence: true,
26        }
27    }
28}
29
30/// Statistics about tier usage
31#[derive(Debug, Clone, Default)]
32pub struct TierStats {
33    pub tier1_count: usize,
34    pub tier2_count: usize,
35    pub tier3_count: usize,
36}
37
38pub struct TierManager {
39    database: Arc<MemoryDatabase>,
40    tier1_cache: Cache<String, (Vec<Message>, Instant)>,
41    tier2_cache: Cache<String, (Vec<DbSummary>, Instant)>,
42    pub config: TierManagerConfig,
43}
44
45impl TierManager {
46    pub fn new(
47        database: Arc<MemoryDatabase>, 
48        config: TierManagerConfig
49    ) -> Self {
50        Self {
51            database,
52            tier1_cache: Cache::builder()
53                .max_capacity(1000)
54                .time_to_idle(Duration::from_secs(3600))
55                .build(),
56            tier2_cache: Cache::builder()
57                .max_capacity(500)
58                .time_to_idle(Duration::from_secs(config.tier2_cache_ttl_seconds))
59                .build(),
60            config,
61        }
62    }
63
64    // --- Tier 1 (Cache) Methods ---
65
66    pub async fn store_tier1_content(&self, session_id: &str, messages: &[Message]) {
67        // Apply tier1 max messages limit
68        let messages_to_store = if messages.len() > self.config.tier1_max_messages {
69            &messages[messages.len() - self.config.tier1_max_messages..]
70        } else {
71            messages
72        };
73        
74        self.tier1_cache.insert(session_id.to_string(), (messages_to_store.to_vec(), Instant::now()));
75    }
76
77    pub async fn get_tier1_content(&self, session_id: &str) -> Option<Vec<Message>> {
78        self.tier1_cache.get(session_id).map(|(m, _)| m)
79    }
80
81    // --- Tier 2 (Summary) Methods ---
82
83    pub async fn get_tier2_content(&self, session_id: &str) -> Option<Vec<DbSummary>> {
84        // Check cache first
85        if let Some((summaries, _)) = self.tier2_cache.get(session_id) {
86            return Some(summaries);
87        }
88        
89        // Fall back to database
90        match self.database.summaries.get_session_summaries(session_id) {
91            Ok(summaries) => {
92                // Cache the results
93                if !summaries.is_empty() {
94                    self.tier2_cache.insert(session_id.to_string(), (summaries.clone(), Instant::now()));
95                }
96                Some(summaries)
97            }
98            Err(e) => {
99                debug!("Error getting summaries from database: {}", e);
100                None
101            }
102        }
103    }
104
105    // --- Tier 3 (Database) Methods ---
106
107    pub async fn get_tier3_content(
108        &self, 
109        session_id: &str, 
110        limit: Option<i32>, 
111        offset: Option<i32>
112    ) -> anyhow::Result<Vec<StoredMessage>> {
113        self.database.conversations.get_session_messages(session_id, limit, offset)
114    }
115
116    pub async fn search_tier3_content(
117        &self, 
118        session_id: &str, 
119        query: &str, 
120        limit: usize
121    ) -> anyhow::Result<Vec<StoredMessage>> {
122        let messages = self.database.conversations.get_session_messages(session_id, Some(1000), None)?;
123        let query_lower = query.to_lowercase();
124        
125        let filtered = messages.into_iter()
126            .filter(|m| m.content.to_lowercase().contains(&query_lower))
127            .take(limit)
128            .collect();
129        
130        Ok(filtered)
131    }
132
133    pub async fn store_tier3_content(&self, session_id: &str, messages: &[Message]) -> anyhow::Result<()> {
134        if !self.config.enable_tier3_persistence || messages.is_empty() {
135            return Ok(());
136        }
137        
138        // Ensure session exists in database
139        self.ensure_session_exists(session_id, None).await?;
140        
141        // Get existing messages to find the next index AND check for duplicates
142        let existing_messages = self.database.conversations.get_session_messages(
143            session_id, Some(10000), Some(0)
144        ).unwrap_or_else(|_| vec![]);
145        
146        // Filter out messages that already exist (simple content-based deduplication)
147        let new_messages: Vec<&Message> = messages.iter()
148            .filter(|new_msg| {
149                !existing_messages.iter().any(|existing| {
150                    existing.content == new_msg.content && 
151                    existing.role == new_msg.role
152                })
153            })
154            .collect();
155        
156        if new_messages.is_empty() {
157            debug!("No new messages to save, all already exist in database");
158            return Ok(()); // Nothing new to save
159        }
160        
161        let start_index = existing_messages.len() as i32;
162        
163        // Create batch data for ONLY new messages
164        let batch_data: Vec<(String, String, i32, i32, f32)> = new_messages
165            .iter()
166            .enumerate()
167            .map(|(offset, m)| (
168                m.role.clone(), 
169                m.content.clone(),
170                start_index + offset as i32, // Ensure unique index
171                (m.content.len() / 4) as i32, 
172                0.5
173            ))
174            .collect();
175        
176        if !batch_data.is_empty() {
177            self.database.conversations.store_messages_batch(session_id, &batch_data)?;
178            info!("📝 Stored {} new messages to database for session {}", batch_data.len(), session_id);
179        }
180        
181        Ok(())
182    }
183
184    // --- Cross-Session Content Methods ---
185
186    /// Searches across all sessions except the current one based on keyword extraction
187    pub async fn search_cross_session_content(
188        &self,
189        current_session_id: &str,
190        query: &str,
191        limit: usize,
192    ) -> anyhow::Result<Vec<StoredMessage>> {
193        // Extract keywords from query
194        let keywords = self.extract_keywords(query);
195        
196        if keywords.is_empty() {
197            return Ok(vec![]);
198        }
199
200        // Search across ALL sessions except current one
201        self.database.conversations.search_messages_by_topic_across_sessions(
202            &keywords,
203            limit,
204            Some(current_session_id), // Exclude current session
205        ).await
206    }
207
208    fn extract_keywords(&self, text: &str) -> Vec<String> {
209        let words: Vec<&str> = text.split_whitespace().collect();
210        words.iter()
211            .filter(|w| w.len() > 3)
212            .map(|w| w.to_lowercase())
213            .filter(|w| !self.is_stop_word(w))
214            .collect()
215    }
216
217    fn is_stop_word(&self, word: &str) -> bool {
218        let stop_words = [
219            "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
220            "of", "with", "by", "is", "am", "are", "was", "were", "be", "been",
221            "being", "have", "has", "had", "do", "does", "did", "will", "would",
222            "shall", "should", "may", "might", "must", "can", "could",
223        ];
224        stop_words.contains(&word)
225    }
226
227    // --- Maintenance & Stats ---
228
229    pub async fn get_tier_stats(&self, session_id: &str) -> TierStats {
230        let tier1_count = self.get_tier1_content(session_id).await
231            .map(|m| m.len())
232            .unwrap_or(0);
233        
234        let tier2_count = self.get_tier2_content(session_id).await
235            .map(|s| s.len())
236            .unwrap_or(0);
237        
238        let tier3_count = match self.database.conversations.get_session_messages(session_id, Some(10000), None) {
239            Ok(messages) => messages.len(),
240            Err(_) => 0,
241        };
242
243        TierStats { 
244            tier1_count, 
245            tier2_count, 
246            tier3_count 
247        }
248    }
249
250    pub async fn cleanup_cache(&self, _older_than_seconds: u64) -> usize {
251        let count = self.tier1_cache.entry_count() + self.tier2_cache.entry_count();
252        
253        // Invalidate entries older than threshold
254        // Note: Moka automatically handles TTL, but we force cleanup
255        self.tier1_cache.invalidate_all();
256        self.tier2_cache.invalidate_all();
257        
258        count as usize
259    }
260
261    /// Chat persistence: Ensure session exists in database with provided ID (no auto-generated placeholders)
262    pub async fn ensure_session_exists(
263        &self, 
264        session_id: &str, 
265        title: Option<String>
266    ) -> anyhow::Result<()> {
267        let exists = self.database.conversations.get_session(session_id)?;
268        if exists.is_none() {
269            // Create session with null title initially - title set via API after generation
270            let metadata = SessionMetadata {
271                title, // None initially; title updated later via update_conversation_title API
272                ..Default::default()
273            };
274            self.database.conversations.create_session_with_id(session_id, Some(metadata))?;
275        }
276        Ok(())
277    }
278}
279
280impl Clone for TierManager {
281    fn clone(&self) -> Self {
282        Self {
283            database: self.database.clone(),
284            tier1_cache: Cache::builder()
285                .max_capacity(1000)
286                .time_to_idle(Duration::from_secs(3600))
287                .build(),
288            tier2_cache: Cache::builder()
289                .max_capacity(500)
290                .time_to_idle(Duration::from_secs(self.config.tier2_cache_ttl_seconds))
291                .build(),
292            config: self.config.clone(),
293        }
294    }
295}