offline_intelligence/cache_management/
llama_cache_interface.rs1use crate::cache_management::cache_extractor::KVEntry;
8use crate::cache_management::cache_scorer::{CacheEntryScorer, CacheEntryParams, CacheScoringConfig};
9use tracing::{debug, info, warn};
10use serde::{Deserialize, Serialize};
11
12#[derive(Debug, Deserialize)]
15struct SlotInfo {
16 id: i32,
17 #[serde(default)]
19 state: i32,
20 #[serde(default)]
22 n_ctx: usize,
23 #[serde(rename = "n_past", default)]
25 n_past: usize,
26}
27
28#[derive(Debug, Serialize)]
29struct SlotActionRequest {
30 action: String,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 filename: Option<String>,
33}
34
35#[derive(Debug, Clone)]
39pub struct LlamaKVCacheState {
40 pub layer_count: usize,
41 pub head_count: usize,
42 pub kv_dim: usize,
43 pub context_size: usize,
44 pub current_tokens: usize,
45 pub used_memory_bytes: usize,
46 pub capacity_percentage: f32,
47}
48
49pub struct LlamaKVCacheInterface {
55 backend_url: Option<String>,
56 http_client: reqwest::Client,
57}
58
59impl LlamaKVCacheInterface {
60 pub fn new() -> Self {
61 Self {
62 backend_url: None,
63 http_client: reqwest::Client::builder()
64 .timeout(std::time::Duration::from_secs(10))
65 .build()
66 .unwrap_or_default(),
67 }
68 }
69
70 pub fn with_backend(backend_url: String) -> Self {
72 info!("LlamaKVCacheInterface wired to backend: {}", backend_url);
73 Self {
74 backend_url: Some(backend_url),
75 http_client: reqwest::Client::builder()
76 .timeout(std::time::Duration::from_secs(10))
77 .build()
78 .unwrap_or_default(),
79 }
80 }
81
82 fn url(&self, path: &str) -> Option<String> {
84 self.backend_url.as_ref().map(|base| format!("{}{}", base.trim_end_matches('/'), path))
85 }
86
87 async fn fetch_slots(&self) -> Vec<SlotInfo> {
91 let Some(url) = self.url("/slots") else { return Vec::new() };
92 match self.http_client.get(&url).send().await {
93 Ok(resp) if resp.status().is_success() => {
94 resp.json::<Vec<SlotInfo>>().await.unwrap_or_default()
95 }
96 Ok(resp) => {
97 debug!("GET /slots returned {}: non-fatal", resp.status());
98 Vec::new()
99 }
100 Err(e) => {
101 debug!("GET /slots unreachable: {} — using defaults", e);
102 Vec::new()
103 }
104 }
105 }
106
107 pub async fn get_current_cache_state(&self) -> anyhow::Result<LlamaKVCacheState> {
116 let slots = self.fetch_slots().await;
117 let slot = slots.into_iter().find(|s| s.id == 0);
118
119 let (n_ctx, n_past) = slot
120 .map(|s| (s.n_ctx, s.n_past))
121 .unwrap_or((0, 0));
122
123 let capacity_percentage = if n_ctx > 0 {
124 n_past as f32 / n_ctx as f32
125 } else {
126 0.0
127 };
128
129 let bytes_per_token = 32 * 128 * 2 * 4;
133 let used_memory_bytes = n_past * bytes_per_token;
134
135 debug!(
136 "KV cache state: {}/{} tokens ({:.1}%)",
137 n_past, n_ctx,
138 capacity_percentage * 100.0
139 );
140
141 Ok(LlamaKVCacheState {
142 layer_count: 32,
143 head_count: 32,
144 kv_dim: 128,
145 context_size: n_ctx,
146 current_tokens: n_past,
147 used_memory_bytes,
148 capacity_percentage,
149 })
150 }
151
152 pub async fn extract_current_kv_entries(&self) -> anyhow::Result<Vec<KVEntry>> {
160 let slots = self.fetch_slots().await;
161 let slot = match slots.into_iter().find(|s| s.id == 0) {
162 Some(s) if s.n_past > 0 => s,
163 _ => return Ok(Vec::new()),
164 };
165
166 let scorer = CacheEntryScorer::new(CacheScoringConfig::default());
167 let mut entries = Vec::new();
168
169 let bucket_size = 64usize;
172 let n_buckets = (slot.n_past + bucket_size - 1) / bucket_size;
173
174 for bucket_idx in 0..n_buckets {
175 let token_start = bucket_idx * bucket_size;
176 let token_end = (token_start + bucket_size).min(slot.n_past);
177
178 let layer_index = (bucket_idx % 32) as i32;
180 let head_index = (bucket_idx % 8) as i32;
181
182 let position_fraction = token_start as f32 / slot.n_past as f32;
184 let last_accessed_seconds_ago = position_fraction * 3600.0;
186
187 let key_hash = format!("slot0_bucket{}_tokens{}-{}", bucket_idx, token_start, token_end);
188
189 let importance = scorer.score_entry(CacheEntryParams {
190 key_hash: &key_hash,
191 key_data: None,
192 key_type: "attention_key",
193 layer_index,
194 head_index: Some(head_index),
195 access_count: 1,
196 last_accessed_seconds_ago,
197 value_size_bytes: (token_end - token_start) * 128,
198 });
199
200 entries.push(KVEntry {
201 key_hash,
202 key_data: None,
203 value_data: Vec::new(),
204 key_type: "attention_key".to_string(),
205 layer_index,
206 head_index: Some(head_index),
207 importance_score: importance,
208 access_count: 1,
209 last_accessed: chrono::Utc::now(),
210 token_positions: Some((token_start as u32..token_end as u32).collect()),
211 embedding: None,
212 size_bytes: (token_end - token_start) * 128,
213 is_persistent: false,
214 });
215 }
216
217 debug!(
218 "Extracted {} KV bucket entries from slot 0 ({} tokens)",
219 entries.len(), slot.n_past
220 );
221 Ok(entries)
222 }
223
224 pub async fn inject_kv_entries(&self, entries: &[KVEntry]) -> anyhow::Result<()> {
228 if entries.is_empty() {
229 return Ok(());
230 }
231
232 let filename = format!(
236 "kvcache_{}.bin",
237 entries.first().map(|e| e.key_hash.as_str()).unwrap_or("default")
238 );
239
240 let Some(url) = self.url("/slots/0") else {
241 debug!("inject_kv_entries: no backend URL configured, skipping");
242 return Ok(());
243 };
244
245 let body = SlotActionRequest {
246 action: "restore".to_string(),
247 filename: Some(filename.clone()),
248 };
249
250 match self.http_client.post(&url).json(&body).send().await {
251 Ok(resp) if resp.status().is_success() => {
252 info!("Restored KV cache from {} ({} entries)", filename, entries.len());
253 }
254 Ok(resp) => {
255 warn!("KV restore returned {}: continuing without restored cache", resp.status());
257 }
258 Err(e) => {
259 warn!("KV restore request failed: {} — continuing without restored cache", e);
260 }
261 }
262
263 Ok(())
264 }
265
266 pub async fn clear_cache_entries(
272 &self,
273 layer_indices: &[i32],
274 _head_indices: &[Option<i32>],
275 ) -> anyhow::Result<()> {
276 let Some(url) = self.url("/slots/0") else {
277 debug!("clear_cache_entries: no backend URL configured, skipping");
278 return Ok(());
279 };
280
281 let body = SlotActionRequest {
282 action: "erase".to_string(),
283 filename: None,
284 };
285
286 match self.http_client.post(&url).json(&body).send().await {
287 Ok(resp) if resp.status().is_success() => {
288 info!(
289 "Erased KV cache slot 0 (requested {} layer(s))",
290 layer_indices.len()
291 );
292 }
293 Ok(resp) => {
294 warn!("KV erase returned {}: slot may already be empty", resp.status());
295 }
296 Err(e) => {
297 warn!("KV erase request failed: {}", e);
298 }
299 }
300
301 Ok(())
302 }
303
304 pub async fn get_cache_memory_usage(&self) -> anyhow::Result<usize> {
308 let slots = self.fetch_slots().await;
309 let n_past = slots.iter().find(|s| s.id == 0).map(|s| s.n_past).unwrap_or(0);
310 let bytes_per_token = 32 * 128 * 2 * 4; Ok(n_past * bytes_per_token)
312 }
313
314 pub async fn estimate_cache_capacity(&self) -> anyhow::Result<f32> {
316 let slots = self.fetch_slots().await;
317 if let Some(slot) = slots.iter().find(|s| s.id == 0) {
318 if slot.n_ctx > 0 {
319 return Ok((slot.n_past as f32 / slot.n_ctx as f32).clamp(0.0, 1.0));
320 }
321 }
322 Ok(0.0)
323 }
324}
325
326impl Default for LlamaKVCacheInterface {
327 fn default() -> Self {
328 Self::new()
329 }
330}