vtcode_core/core/agent/
engine.rs1use crate::core::agent::config::CompactionConfig;
4use crate::core::agent::semantic::SemanticAnalyzer;
5use crate::core::agent::types::{
6 CompactedMessage, CompactionResult, CompactionStatistics, CompactionSuggestion,
7 EnhancedMessage, MessagePriority, MessageType, Urgency,
8};
9use crate::gemini::Content;
10use anyhow::Result;
11use std::collections::{HashMap, VecDeque};
12use std::sync::Arc;
13use std::time::{Instant, SystemTime, UNIX_EPOCH};
14use tokio::sync::RwLock;
15
16#[derive(Debug)]
18pub struct CompactionEngine {
19 config: Arc<RwLock<CompactionConfig>>,
20 message_history: Arc<RwLock<VecDeque<CompactedMessage>>>,
21 enhanced_messages: Arc<RwLock<Vec<EnhancedMessage>>>,
22 semantic_analyzer: SemanticAnalyzer,
23 last_compaction: Arc<RwLock<u64>>,
24 compaction_count: Arc<RwLock<u64>>,
25 start_time: Instant,
26}
27
28impl CompactionEngine {
29 pub fn new() -> Self {
31 Self {
32 config: Arc::new(RwLock::new(CompactionConfig::default())),
33 message_history: Arc::new(RwLock::new(VecDeque::new())),
34 enhanced_messages: Arc::new(RwLock::new(Vec::new())),
35 semantic_analyzer: SemanticAnalyzer::new(),
36 last_compaction: Arc::new(RwLock::new(0)),
37 compaction_count: Arc::new(RwLock::new(0)),
38 start_time: Instant::now(),
39 }
40 }
41
42 pub fn with_config(config: CompactionConfig) -> Self {
44 Self {
45 config: Arc::new(RwLock::new(config)),
46 message_history: Arc::new(RwLock::new(VecDeque::new())),
47 enhanced_messages: Arc::new(RwLock::new(Vec::new())),
48 semantic_analyzer: SemanticAnalyzer::new(),
49 last_compaction: Arc::new(RwLock::new(0)),
50 compaction_count: Arc::new(RwLock::new(0)),
51 start_time: Instant::now(),
52 }
53 }
54
55 pub async fn add_message(&self, content: &Content, message_type: MessageType) -> Result<()> {
57 let text_content = self.extract_text_content(content)?;
59
60 let compacted = CompactedMessage {
62 timestamp: std::time::SystemTime::now()
63 .duration_since(std::time::UNIX_EPOCH)
64 .unwrap()
65 .as_secs(),
66 message_type: message_type.clone(),
67 summary: self.generate_message_summary(&text_content, &message_type)?,
68 key_info: self.extract_key_information(&text_content, &message_type)?,
69 compression_ratio: 1.0,
70 original_size: text_content.len(),
71 };
72
73 let priority = self
75 .semantic_analyzer
76 .analyze_message_priority(&text_content, &message_type);
77 let semantic_tags = self.semantic_analyzer.extract_semantic_tags(&text_content);
78
79 let enhanced = EnhancedMessage {
81 base_message: compacted.clone(),
82 priority,
83 semantic_tags,
84 context_references: Vec::new(),
85 conversation_turn: 0,
86 related_messages: Vec::new(),
87 };
88
89 let mut history = self.message_history.write().await;
91 history.push_back(compacted);
92
93 let mut enhanced_history = self.enhanced_messages.write().await;
94 enhanced_history.push(enhanced);
95
96 Ok(())
97 }
98
99 fn extract_text_content(&self, content: &Content) -> Result<String> {
100 let mut text = String::new();
101 for part in &content.parts {
102 if let Some(text_part) = part.as_text() {
103 text.push_str(text_part);
104 text.push(' ');
105 }
106 }
107 Ok(text.trim().to_string())
108 }
109
110 fn generate_message_summary(
111 &self,
112 content: &str,
113 _message_type: &MessageType,
114 ) -> Result<String> {
115 if content.len() <= 100 {
116 Ok(content.to_string())
117 } else {
118 Ok(format!("{}...", &content[..100]))
119 }
120 }
121
122 fn extract_key_information(
123 &self,
124 content: &str,
125 _message_type: &MessageType,
126 ) -> Result<Vec<String>> {
127 let mut key_info = Vec::new();
128
129 if content.contains("error") {
131 key_info.push("error".to_string());
132 }
133 if content.contains("success") {
134 key_info.push("success".to_string());
135 }
136 if content.contains("function") {
137 key_info.push("function".to_string());
138 }
139
140 Ok(key_info)
141 }
142
143 pub async fn get_compaction_suggestions(&self) -> Result<Vec<CompactionSuggestion>> {
145 let config = self.config.read().await.clone();
146 let history = self.message_history.read().await;
147 let now = SystemTime::now()
148 .duration_since(UNIX_EPOCH)
149 .unwrap_or_default()
150 .as_secs();
151 let mut suggestions = Vec::new();
152
153 for msg in history.iter() {
154 let too_old = now.saturating_sub(msg.timestamp) > config.max_message_age_seconds;
155 let over_limit = history.len() > config.max_uncompressed_messages;
156 if too_old || over_limit {
157 let urgency = if over_limit {
158 Urgency::High
159 } else {
160 Urgency::Medium
161 };
162 suggestions.push(CompactionSuggestion {
163 action: "compact_message".to_string(),
164 urgency,
165 estimated_savings: msg.original_size,
166 reasoning: "message exceeds configured thresholds".to_string(),
167 });
168 }
169 }
170
171 Ok(suggestions)
172 }
173
174 pub async fn get_statistics(&self) -> Result<CompactionStatistics> {
176 let history = self.message_history.read().await;
177 let enhanced = self.enhanced_messages.read().await;
178 let total_messages = history.len();
179 let total_memory_usage: usize = history.iter().map(|m| m.original_size).sum();
180 let average_message_size = if total_messages > 0 {
181 total_memory_usage / total_messages
182 } else {
183 0
184 };
185 let mut messages_by_priority: HashMap<MessagePriority, usize> = HashMap::new();
186 for msg in enhanced.iter() {
187 *messages_by_priority
188 .entry(msg.priority.clone())
189 .or_insert(0) += 1;
190 }
191 let last_compaction_timestamp = *self.last_compaction.read().await;
192 let elapsed_hours = self.start_time.elapsed().as_secs() as f64 / 3600.0;
193 let count = *self.compaction_count.read().await as f64;
194 let compaction_frequency = if elapsed_hours > 0.0 {
195 count / elapsed_hours
196 } else {
197 0.0
198 };
199
200 Ok(CompactionStatistics {
201 total_messages,
202 messages_by_priority,
203 total_memory_usage,
204 average_message_size,
205 last_compaction_timestamp,
206 compaction_frequency,
207 })
208 }
209
210 pub async fn should_compact(&self) -> Result<bool> {
212 let config = self.config.read().await.clone();
213 let history = self.message_history.read().await;
214 let total_memory: usize = history.iter().map(|m| m.original_size).sum();
215 let now = SystemTime::now()
216 .duration_since(UNIX_EPOCH)
217 .unwrap_or_default()
218 .as_secs();
219 let last = *self.last_compaction.read().await;
220
221 if history.len() > config.max_uncompressed_messages {
222 return Ok(true);
223 }
224 if total_memory > config.max_memory_mb * 1_000_000 {
225 return Ok(true);
226 }
227 if config.auto_compaction_enabled
228 && now.saturating_sub(last) > config.compaction_interval_seconds
229 {
230 return Ok(true);
231 }
232 Ok(false)
233 }
234
235 pub async fn compact_messages_intelligently(&self) -> Result<CompactionResult> {
237 let start = Instant::now();
238 let config = self.config.read().await.clone();
239 let mut history = self.message_history.write().await;
240 let mut enhanced = self.enhanced_messages.write().await;
241 let mut messages_compacted = 0usize;
242 let mut original_size = 0usize;
243
244 while history.len() > config.max_uncompressed_messages {
245 if let Some(msg) = history.pop_front() {
246 original_size += msg.original_size;
247 messages_compacted += 1;
248 if !enhanced.is_empty() {
249 enhanced.remove(0);
250 }
251 }
252 }
253
254 let processing_time_ms = start.elapsed().as_millis() as u64;
255 if messages_compacted > 0 {
256 let mut last = self.last_compaction.write().await;
257 *last = SystemTime::now()
258 .duration_since(UNIX_EPOCH)
259 .unwrap_or_default()
260 .as_secs();
261 let mut count = self.compaction_count.write().await;
262 *count += 1;
263 }
264
265 Ok(CompactionResult {
266 messages_processed: messages_compacted,
267 messages_compacted,
268 original_size,
269 compacted_size: 0,
270 compression_ratio: if original_size > 0 { 0.0 } else { 1.0 },
271 processing_time_ms,
272 })
273 }
274
275 pub async fn compact_context(
277 &self,
278 _context_key: &str,
279 context_data: &mut std::collections::HashMap<String, serde_json::Value>,
280 ) -> Result<CompactionResult> {
281 let start = Instant::now();
282 let config = self.config.read().await.clone();
283
284 let original_size: usize = context_data
285 .values()
286 .filter_map(|v| v.as_str().map(|s| s.len()))
287 .sum();
288 let initial_len = context_data.len();
289
290 context_data.retain(|_, v| {
291 v.get("confidence")
292 .and_then(|c| c.as_f64())
293 .map(|c| c >= config.min_context_confidence)
294 .unwrap_or(true)
295 });
296
297 let compacted_size: usize = context_data
298 .values()
299 .filter_map(|v| v.as_str().map(|s| s.len()))
300 .sum();
301 let messages_compacted = initial_len - context_data.len();
302
303 if messages_compacted > 0 {
304 let mut last = self.last_compaction.write().await;
305 *last = SystemTime::now()
306 .duration_since(UNIX_EPOCH)
307 .unwrap_or_default()
308 .as_secs();
309 let mut count = self.compaction_count.write().await;
310 *count += 1;
311 }
312
313 Ok(CompactionResult {
314 messages_processed: initial_len,
315 messages_compacted,
316 original_size,
317 compacted_size,
318 compression_ratio: if original_size > 0 {
319 compacted_size as f64 / original_size as f64
320 } else {
321 1.0
322 },
323 processing_time_ms: start.elapsed().as_millis() as u64,
324 })
325 }
326}
327
328impl Default for CompactionEngine {
329 fn default() -> Self {
330 Self::new()
331 }
332}