use bincode::{deserialize, serialize};
use lru::LruCache;
use sysinfo::{get_current_pid, ProcessesToUpdate, System};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Mutex;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryConfig {
pub spill_threshold: f64,
pub check_interval_secs: u64,
pub auto_spill: bool,
pub max_cache_bytes: usize,
pub cache_dir: PathBuf,
}
impl Default for MemoryConfig {
fn default() -> Self {
Self {
spill_threshold: 0.85,
check_interval_secs: 30,
auto_spill: true,
max_cache_bytes: 500_000_000, cache_dir: PathBuf::from(".leindex/cache"),
}
}
}
pub type CacheKey = String;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CacheEntry {
PDG {
project_id: String,
node_count: usize,
edge_count: usize,
serialized_data: Vec<u8>,
},
SearchIndex {
project_id: String,
entry_count: usize,
serialized_data: Vec<u8>,
},
Analysis {
query: String,
timestamp: u64,
serialized_data: Vec<u8>,
},
Binary {
metadata: HashMap<String, String>,
serialized_data: Vec<u8>,
},
}
impl CacheEntry {
pub fn size_bytes(&self) -> usize {
match self {
CacheEntry::PDG {
serialized_data, ..
} => serialized_data.len(),
CacheEntry::SearchIndex {
serialized_data, ..
} => serialized_data.len(),
CacheEntry::Analysis {
serialized_data, ..
} => serialized_data.len(),
CacheEntry::Binary {
serialized_data, ..
} => serialized_data.len(),
}
}
pub fn description(&self) -> String {
match self {
CacheEntry::PDG {
project_id,
node_count,
..
} => {
format!("PDG for {} ({} nodes)", project_id, node_count)
}
CacheEntry::SearchIndex {
project_id,
entry_count,
..
} => {
format!("Search index for {} ({} entries)", project_id, entry_count)
}
CacheEntry::Analysis { query, .. } => {
format!("Analysis for '{}'", query)
}
CacheEntry::Binary { metadata, .. } => {
format!(
"Binary data ({})",
metadata.get("type").unwrap_or(&"unknown".to_string())
)
}
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CacheTelemetry {
pub memory_hits: usize,
pub disk_hits: usize,
pub misses: usize,
pub writes: usize,
pub spills: usize,
pub restores: usize,
}
impl CacheTelemetry {
pub fn total_hits(&self) -> usize {
self.memory_hits + self.disk_hits
}
pub fn total_lookups(&self) -> usize {
self.total_hits() + self.misses
}
pub fn hit_rate(&self) -> f64 {
let total = self.total_lookups();
if total == 0 {
0.0
} else {
self.total_hits() as f64 / total as f64
}
}
}
pub struct CacheStore {
cache: LruCache<CacheKey, CacheEntry>,
total_bytes: usize,
max_bytes: usize,
cache_dir: PathBuf,
telemetry: CacheTelemetry,
}
impl CacheStore {
pub fn new(config: &MemoryConfig) -> Self {
if let Err(e) = std::fs::create_dir_all(&config.cache_dir) {
warn!(
"Failed to create cache directory {:?}: {}",
config.cache_dir, e
);
}
let item_capacity = NonZeroUsize::new(100).unwrap();
Self {
cache: LruCache::new(item_capacity),
total_bytes: 0,
max_bytes: config.max_cache_bytes,
cache_dir: config.cache_dir.clone(),
telemetry: CacheTelemetry::default(),
}
}
pub fn insert(&mut self, key: CacheKey, entry: CacheEntry) -> Result<(), Error> {
let entry_size = entry.size_bytes();
self.telemetry.writes += 1;
while self.total_bytes + entry_size > self.max_bytes && !self.cache.is_empty() {
if let Some((evicted_key, evicted_entry)) = self.cache.pop_lru() {
let evicted_size = evicted_entry.size_bytes();
self.total_bytes = self.total_bytes.saturating_sub(evicted_size);
debug!(
"Evicted cache entry '{}' ({} bytes)",
evicted_key, evicted_size
);
if let Err(e) = self.spill_to_disk(&evicted_key, &evicted_entry) {
warn!("Failed to spill evicted entry to disk: {}", e);
}
}
}
if let Some(existing) = self.cache.get(&key) {
self.total_bytes = self.total_bytes.saturating_sub(existing.size_bytes());
}
self.total_bytes += entry_size;
self.cache.put(key, entry);
Ok(())
}
pub fn get(&mut self, key: &str) -> Option<CacheEntry> {
let found = self.cache.get(key).cloned();
if found.is_some() {
self.telemetry.memory_hits += 1;
} else {
self.telemetry.misses += 1;
}
found
}
pub fn peek(&self, key: &str) -> Option<&CacheEntry> {
self.cache.peek(key)
}
pub fn get_or_load(&mut self, key: &str) -> Result<Option<CacheEntry>, Error> {
if let Some(entry) = self.cache.get(key).cloned() {
self.telemetry.memory_hits += 1;
return Ok(Some(entry));
}
match self.load_from_disk(key) {
Ok(entry) => {
self.telemetry.disk_hits += 1;
self.telemetry.restores += 1;
self.insert(key.to_string(), entry.clone())?;
Ok(Some(entry))
}
Err(Error::CacheNotFound(_)) => {
self.telemetry.misses += 1;
Ok(None)
}
Err(e) => {
self.telemetry.misses += 1;
Err(e)
}
}
}
pub fn remove(&mut self, key: &str) -> Option<CacheEntry> {
if let Some(entry) = self.cache.pop(key) {
self.total_bytes = self.total_bytes.saturating_sub(entry.size_bytes());
Some(entry)
} else {
None
}
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
pub fn total_bytes(&self) -> usize {
self.total_bytes
}
pub fn max_bytes(&self) -> usize {
self.max_bytes
}
pub fn cache_dir(&self) -> &PathBuf {
&self.cache_dir
}
pub fn telemetry(&self) -> &CacheTelemetry {
&self.telemetry
}
pub fn persist_key(&mut self, key: &str) -> Result<bool, Error> {
if let Some(entry) = self.cache.get(key).cloned() {
self.spill_to_disk(key, &entry)?;
Ok(true)
} else {
Ok(false)
}
}
pub fn pop_lru(&mut self) -> Option<(CacheKey, CacheEntry)> {
if let Some((key, entry)) = self.cache.pop_lru() {
self.total_bytes = self.total_bytes.saturating_sub(entry.size_bytes());
Some((key, entry))
} else {
None
}
}
pub fn clear(&mut self) -> Result<usize, Error> {
let bytes_freed = self.total_bytes;
let snapshot: Vec<(CacheKey, CacheEntry)> = self
.cache
.iter()
.map(|(key, entry)| (key.clone(), entry.clone()))
.collect();
for (key, entry) in snapshot {
if let Err(e) = self.spill_to_disk(&key, &entry) {
warn!("Failed to spill entry '{}' to disk: {}", key, e);
}
}
self.cache.clear();
self.total_bytes = 0;
Ok(bytes_freed)
}
fn spill_to_disk(&mut self, key: &str, entry: &CacheEntry) -> Result<(), Error> {
let cache_file = self.cache_dir.join(format!("{}.bin", sanitize_key(key)));
let serialized = serialize(entry)
.map_err(|e| Error::SpillFailed(format!("Serialization failed: {}", e)))?;
let temp_file = cache_file.with_extension("tmp");
let mut file = std::fs::File::create(&temp_file)
.map_err(|e| Error::SpillFailed(format!("Failed to create temp file: {}", e)))?;
file.write_all(&serialized)
.map_err(|e| Error::SpillFailed(format!("Failed to write cache file: {}", e)))?;
file.sync_all()
.map_err(|e| Error::SpillFailed(format!("Failed to sync cache file: {}", e)))?;
std::fs::rename(&temp_file, &cache_file)
.map_err(|e| Error::SpillFailed(format!("Failed to rename cache file: {}", e)))?;
debug!(
"Spilled cache entry '{}' to disk ({} bytes)",
key,
serialized.len()
);
self.telemetry.spills += 1;
Ok(())
}
pub fn load_from_disk(&self, key: &str) -> Result<CacheEntry, Error> {
let cache_file = self.cache_dir.join(format!("{}.bin", sanitize_key(key)));
if !cache_file.exists() {
return Err(Error::CacheNotFound(format!(
"Cache entry '{}' not found on disk",
key
)));
}
let mut file = std::fs::File::open(&cache_file)
.map_err(|e| Error::SpillFailed(format!("Failed to open cache file: {}", e)))?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.map_err(|e| Error::SpillFailed(format!("Failed to read cache file: {}", e)))?;
let entry: CacheEntry = deserialize(&buffer)
.map_err(|e| Error::SpillFailed(format!("Deserialization failed: {}", e)))?;
debug!(
"Loaded cache entry '{}' from disk ({} bytes)",
key,
buffer.len()
);
Ok(entry)
}
pub fn list_spilled(&self) -> Result<Vec<String>, Error> {
let mut entries = Vec::new();
if !self.cache_dir.exists() {
return Ok(entries);
}
for entry in std::fs::read_dir(&self.cache_dir)
.map_err(|e| Error::SpillFailed(format!("Failed to read cache directory: {}", e)))?
{
let entry = entry
.map_err(|e| Error::SpillFailed(format!("Failed to read dir entry: {}", e)))?;
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".bin") {
let key = name.strip_suffix(".bin").unwrap_or(name);
entries.push(key.to_string());
}
}
}
Ok(entries)
}
pub fn delete_spilled(&self, key: &str) -> Result<(), Error> {
let cache_file = self.cache_dir.join(format!("{}.bin", sanitize_key(key)));
if cache_file.exists() {
std::fs::remove_file(&cache_file)
.map_err(|e| Error::SpillFailed(format!("Failed to delete cache file: {}", e)))?;
debug!("Deleted spilled cache entry '{}'", key);
}
Ok(())
}
pub fn spilled_size_bytes(&self) -> Result<usize, Error> {
if !self.cache_dir.exists() {
return Ok(0);
}
let mut total = 0;
for entry in std::fs::read_dir(&self.cache_dir)
.map_err(|e| Error::SpillFailed(format!("Failed to read cache directory: {}", e)))?
{
let entry = entry
.map_err(|e| Error::SpillFailed(format!("Failed to read dir entry: {}", e)))?;
total += entry.metadata().map(|m| m.len() as usize).unwrap_or(0);
}
Ok(total)
}
pub fn validate_entry(&self, key: &str) -> Result<ValidationResult, Error> {
let cache_file = self.cache_dir.join(format!("{}.bin", sanitize_key(key)));
if !cache_file.exists() {
return Ok(ValidationResult {
is_valid: false,
entry_type: None,
size_bytes: 0,
error: Some("File not found".to_string()),
});
}
match self.load_from_disk(key) {
Ok(entry) => {
let entry_type = match &entry {
CacheEntry::PDG { .. } => Some("PDG".to_string()),
CacheEntry::SearchIndex { .. } => Some("SearchIndex".to_string()),
CacheEntry::Analysis { .. } => Some("Analysis".to_string()),
CacheEntry::Binary { .. } => Some("Binary".to_string()),
};
Ok(ValidationResult {
is_valid: true,
entry_type,
size_bytes: entry.size_bytes(),
error: None,
})
}
Err(e) => Ok(ValidationResult {
is_valid: false,
entry_type: None,
size_bytes: 0,
error: Some(e.to_string()),
}),
}
}
pub fn restore_spilled(&mut self, keys: &[String]) -> Result<RestoreResult, Error> {
let mut restored = 0;
let mut failed = Vec::new();
for key in keys {
match self.load_from_disk(key) {
Ok(entry) => {
if self.insert(key.clone(), entry).is_ok() {
restored += 1;
} else {
failed.push((key.clone(), "Insert failed".to_string()));
}
}
Err(e) => {
failed.push((key.clone(), e.to_string()));
}
}
}
self.telemetry.restores += restored;
Ok(RestoreResult {
entries_restored: restored,
entries_failed: failed,
})
}
}
fn sanitize_key(key: &str) -> String {
key.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect()
}
fn cache_key_priority(key: &str) -> u8 {
if key.starts_with("pdg:") || key.starts_with("pdg_") {
0
} else if key.starts_with("search:") || key.starts_with("search_") {
1
} else if key.starts_with("project_scan:") || key.starts_with("project_scan_") {
2
} else if key.starts_with("analysis:") || key.starts_with("analysis_") {
3
} else {
4
}
}
pub struct CacheSpiller {
store: CacheStore,
memory_manager: MemoryManager,
}
impl CacheSpiller {
pub fn new(config: MemoryConfig) -> Result<Self, Error> {
let store = CacheStore::new(&config);
let memory_manager = MemoryManager::new(config.clone())?;
Ok(Self {
store,
memory_manager,
})
}
pub fn store_mut(&mut self) -> &mut CacheStore {
&mut self.store
}
pub fn store(&self) -> &CacheStore {
&self.store
}
pub fn check_and_spill(&mut self) -> Result<SpillResult, Error> {
if !self.memory_manager.is_threshold_exceeded()? {
return Ok(SpillResult {
memory_freed: 0,
caches_cleared: Vec::new(),
entries_spilled: 0,
});
}
info!("Memory threshold exceeded, spilling cache...");
let rss_before = self.memory_manager.get_rss_bytes()?;
let entries_before = self.store.len();
let target_entries = (entries_before as f64 * 0.2).ceil() as usize;
let mut spilled_keys = Vec::new();
for _ in 0..target_entries {
if let Some((key, _entry)) = self.store.pop_lru() {
spilled_keys.push(key.clone());
}
}
let bytes_freed = rss_before.saturating_sub(self.memory_manager.get_rss_bytes()?);
info!(
"Spilled {} cache entries, freed {} bytes",
spilled_keys.len(),
bytes_freed
);
Ok(SpillResult {
memory_freed: bytes_freed,
caches_cleared: vec!["lru_cache".to_string()],
entries_spilled: spilled_keys.len(),
})
}
pub fn spill_all(&mut self) -> Result<SpillResult, Error> {
let rss_before = self.memory_manager.get_rss_bytes()?;
let entries_before = self.store.len();
let _bytes_freed = self.store.clear()?;
let rss_after = self.memory_manager.get_rss_bytes()?;
Ok(SpillResult {
memory_freed: rss_before.saturating_sub(rss_after),
caches_cleared: vec!["lru_cache".to_string()],
entries_spilled: entries_before,
})
}
pub fn memory_stats(&self) -> Result<MemoryStats, Error> {
let telemetry = self.store.telemetry();
Ok(MemoryStats {
rss_bytes: self.memory_manager.get_rss_bytes()?,
total_bytes: self.memory_manager.get_total_memory()?,
cache_entries: self.store.len(),
cache_bytes: self.store.total_bytes(),
spilled_entries: self.store.list_spilled()?.len(),
spilled_bytes: self.store.spilled_size_bytes()?,
cache_hits: telemetry.total_hits(),
cache_memory_hits: telemetry.memory_hits,
cache_disk_hits: telemetry.disk_hits,
cache_misses: telemetry.misses,
cache_hit_rate: telemetry.hit_rate(),
cache_writes: telemetry.writes,
cache_spills: telemetry.spills,
cache_restores: telemetry.restores,
})
}
pub fn is_threshold_exceeded(&self) -> Result<bool, Error> {
self.memory_manager.is_threshold_exceeded()
}
pub fn memory_manager(&self) -> &MemoryManager {
&self.memory_manager
}
pub fn validate_all_spilled(&self) -> Result<Vec<(String, ValidationResult)>, Error> {
let spilled_keys = self.store.list_spilled()?;
let mut results = Vec::new();
for key in spilled_keys {
let validation = self.store.validate_entry(&key)?;
results.push((key, validation));
}
Ok(results)
}
pub fn restore_keys(&mut self, keys: &[String]) -> Result<RestoreResult, Error> {
self.store.restore_spilled(keys)
}
pub fn warm_cache(&mut self, strategy: WarmStrategy) -> Result<WarmResult, Error> {
let spilled_keys = self.store.list_spilled()?;
if spilled_keys.is_empty() {
return Ok(WarmResult {
entries_warmed: 0,
entries_skipped: 0,
warming_strategy: strategy,
});
}
let prioritized_keys = self.prioritize_keys_for_warming(&spilled_keys, strategy)?;
let current_bytes = self.store.total_bytes();
let max_bytes = self.store.max_bytes;
let available_bytes = max_bytes.saturating_sub(current_bytes);
let mut warmed = 0;
let mut skipped = 0;
let mut used_bytes = 0;
for key in prioritized_keys {
if let Ok(validation) = self.store.validate_entry(&key) {
if validation.is_valid {
if used_bytes + validation.size_bytes > available_bytes {
skipped += 1;
continue;
}
match self.store.load_from_disk(&key) {
Ok(entry) => {
if self.store.insert(key.clone(), entry).is_ok() {
warmed += 1;
used_bytes += validation.size_bytes;
} else {
skipped += 1;
}
}
Err(_) => {
skipped += 1;
}
}
} else {
skipped += 1;
}
} else {
skipped += 1;
}
}
info!(
"Cache warming complete: {} entries warmed, {} skipped",
warmed, skipped
);
self.store.telemetry.restores += warmed;
Ok(WarmResult {
entries_warmed: warmed,
entries_skipped: skipped,
warming_strategy: strategy,
})
}
fn prioritize_keys_for_warming(
&self,
keys: &[String],
strategy: WarmStrategy,
) -> Result<Vec<String>, Error> {
match strategy {
WarmStrategy::All => {
let mut prioritized = keys.to_vec();
prioritized.sort_by_key(|key| cache_key_priority(key));
Ok(prioritized)
}
WarmStrategy::PDGOnly => Ok(keys
.iter()
.filter(|k| k.starts_with("pdg:"))
.cloned()
.collect()),
WarmStrategy::SearchIndexOnly => Ok(keys
.iter()
.filter(|k| k.starts_with("search:"))
.cloned()
.collect()),
WarmStrategy::RecentFirst => {
let mut keyed: Vec<(String, std::time::SystemTime)> = Vec::new();
for key in keys {
let cache_file = self
.store
.cache_dir()
.join(format!("{}.bin", sanitize_key(key)));
if let Ok(metadata) = std::fs::metadata(&cache_file) {
if let Ok(modified) = metadata.modified() {
keyed.push((key.clone(), modified));
}
}
}
keyed.sort_by(|a, b| {
cache_key_priority(&a.0)
.cmp(&cache_key_priority(&b.0))
.then_with(|| b.1.cmp(&a.1))
});
Ok(keyed.into_iter().map(|(k, _)| k).collect())
}
}
}
pub fn auto_restore(&mut self) -> Result<RestoreResult, Error> {
info!("Auto-restoring cache from disk...");
let spilled_keys = self.store.list_spilled()?;
if spilled_keys.is_empty() {
info!("No spilled cache entries to restore");
return Ok(RestoreResult {
entries_restored: 0,
entries_failed: Vec::new(),
});
}
let prioritized =
self.prioritize_keys_for_warming(&spilled_keys, WarmStrategy::RecentFirst)?;
let current_bytes = self.store.total_bytes();
let max_bytes = self.store.max_bytes;
let available_bytes = max_bytes.saturating_sub(current_bytes);
let mut restored = 0;
let mut failed = Vec::new();
let mut used_bytes = 0;
for key in prioritized {
if let Ok(validation) = self.store.validate_entry(&key) {
if validation.is_valid && used_bytes + validation.size_bytes <= available_bytes {
match self.store.load_from_disk(&key) {
Ok(entry) => {
if self.store.insert(key.clone(), entry).is_ok() {
restored += 1;
used_bytes += validation.size_bytes;
} else {
failed.push((key, "Insert failed".to_string()));
}
}
Err(e) => {
failed.push((key, e.to_string()));
}
}
}
}
}
info!(
"Auto-restore complete: {} entries restored, {} failed",
restored,
failed.len()
);
self.store.telemetry.restores += restored;
Ok(RestoreResult {
entries_restored: restored,
entries_failed: failed,
})
}
}
pub struct MemoryManager {
config: MemoryConfig,
system: Mutex<System>,
current_pid: sysinfo::Pid,
}
impl MemoryManager {
pub fn new(config: MemoryConfig) -> Result<Self, Error> {
let current_pid = get_current_pid().map_err(|e| Error::ProcessAccess(e.to_string()))?;
let system = System::new();
Ok(Self {
config,
system: Mutex::new(system),
current_pid,
})
}
pub fn get_rss_bytes(&self) -> Result<usize, Error> {
let mut system = self
.system
.lock()
.map_err(|e| Error::MemoryInfo(format!("System lock poisoned: {}", e)))?;
let pid_list = [self.current_pid];
system.refresh_processes(ProcessesToUpdate::Some(&pid_list), true);
system
.process(self.current_pid)
.map(|process| process.memory() as usize)
.ok_or_else(|| {
Error::ProcessAccess(format!(
"Current process {} is not available",
self.current_pid
))
})
}
pub fn get_total_memory(&self) -> Result<usize, Error> {
let mut system = self
.system
.lock()
.map_err(|e| Error::MemoryInfo(format!("System lock poisoned: {}", e)))?;
system.refresh_memory();
Ok(system.total_memory() as usize)
}
pub fn is_threshold_exceeded(&self) -> Result<bool, Error> {
let rss = self.get_rss_bytes()?;
let total = self.get_total_memory()?;
let ratio = rss as f64 / total as f64;
Ok(ratio > self.config.spill_threshold)
}
pub fn config(&self) -> &MemoryConfig {
&self.config
}
}
impl Default for MemoryManager {
fn default() -> Self {
Self::new(MemoryConfig::default()).expect(
"failed to create MemoryManager via MemoryManager::default() -> \
MemoryManager::new(MemoryConfig::default()); get_current_pid() can fail when the \
current process PID cannot be resolved",
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpillResult {
pub memory_freed: usize,
pub caches_cleared: Vec<String>,
pub entries_spilled: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryStats {
pub rss_bytes: usize,
pub total_bytes: usize,
pub cache_entries: usize,
pub cache_bytes: usize,
pub spilled_entries: usize,
pub spilled_bytes: usize,
#[serde(default)]
pub cache_hits: usize,
#[serde(default)]
pub cache_memory_hits: usize,
#[serde(default)]
pub cache_disk_hits: usize,
#[serde(default)]
pub cache_misses: usize,
#[serde(default)]
pub cache_hit_rate: f64,
#[serde(default)]
pub cache_writes: usize,
#[serde(default)]
pub cache_spills: usize,
#[serde(default)]
pub cache_restores: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationResult {
pub is_valid: bool,
pub entry_type: Option<String>,
pub size_bytes: usize,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestoreResult {
pub entries_restored: usize,
pub entries_failed: Vec<(String, String)>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WarmStrategy {
All,
PDGOnly,
SearchIndexOnly,
RecentFirst,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WarmResult {
pub entries_warmed: usize,
pub entries_skipped: usize,
pub warming_strategy: WarmStrategy,
}
impl MemoryStats {
pub fn memory_percent(&self) -> f64 {
(self.rss_bytes as f64 / self.total_bytes as f64) * 100.0
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Process access failed: {0}")]
ProcessAccess(String),
#[error("Failed to get memory info: {0}")]
MemoryInfo(String),
#[error("Spill operation failed: {0}")]
SpillFailed(String),
#[error("Cache entry not found: {0}")]
CacheNotFound(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SpillStrategy {
ClearAll,
NonActiveProjects,
LRU,
}
pub struct MemoryMonitor {
spiller: CacheSpiller,
strategy: SpillStrategy,
}
impl MemoryMonitor {
pub fn new(spiller: CacheSpiller, strategy: SpillStrategy) -> Self {
Self { spiller, strategy }
}
pub fn check_and_spill(&mut self) -> Result<Option<SpillResult>, Error> {
if self.spiller.memory_manager.is_threshold_exceeded()? {
match self.strategy {
SpillStrategy::ClearAll => Ok(Some(self.spiller.spill_all()?)),
SpillStrategy::NonActiveProjects => Ok(Some(self.spiller.check_and_spill()?)),
SpillStrategy::LRU => Ok(Some(self.spiller.check_and_spill()?)),
}
} else {
Ok(None)
}
}
pub fn stats(&self) -> Result<MemoryStats, Error> {
self.spiller.memory_stats()
}
}
pub fn create_pdg_entry(
project_id: String,
node_count: usize,
edge_count: usize,
pdg_data: &[u8],
) -> CacheEntry {
CacheEntry::PDG {
project_id,
node_count,
edge_count,
serialized_data: pdg_data.to_vec(),
}
}
pub fn create_search_entry(
project_id: String,
entry_count: usize,
index_data: &[u8],
) -> CacheEntry {
CacheEntry::SearchIndex {
project_id,
entry_count,
serialized_data: index_data.to_vec(),
}
}
pub fn pdg_cache_key(project_id: &str) -> String {
format!("pdg:{}", project_id)
}
pub fn search_cache_key(project_id: &str) -> String {
format!("search:{}", project_id)
}
pub fn project_scan_cache_key(project_id: &str) -> String {
format!("project_scan:{}", project_id)
}
pub fn analysis_cache_key(query: &str) -> String {
format!("analysis:{}", sanitize_key(query))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_memory_manager_creation() {
let manager = MemoryManager::new(MemoryConfig::default());
assert!(manager.is_ok());
}
#[test]
fn test_memory_info() {
let manager = MemoryManager::new(MemoryConfig::default()).unwrap();
let rss = manager.get_rss_bytes();
assert!(rss.is_ok());
assert!(rss.unwrap() > 0);
}
#[test]
fn test_spill_config_default() {
let config = MemoryConfig::default();
assert_eq!(config.spill_threshold, 0.85);
assert_eq!(config.check_interval_secs, 30);
assert!(config.auto_spill);
assert_eq!(config.max_cache_bytes, 500_000_000);
}
#[test]
fn test_cache_entry_size() {
let entry = CacheEntry::Binary {
metadata: HashMap::new(),
serialized_data: vec![0u8; 1024],
};
assert_eq!(entry.size_bytes(), 1024);
}
#[test]
fn test_cache_store_insert_get() {
let config = MemoryConfig {
max_cache_bytes: 10_000,
..Default::default()
};
let mut store = CacheStore::new(&config);
let entry = CacheEntry::Binary {
metadata: HashMap::new(),
serialized_data: vec![0u8; 100],
};
store.insert("test_key".to_string(), entry.clone()).unwrap();
let retrieved = store.get("test_key");
assert!(retrieved.is_some());
assert_eq!(store.telemetry().memory_hits, 1);
assert_eq!(store.telemetry().misses, 0);
}
#[test]
fn test_cache_store_get_or_load_from_disk() {
let temp_dir = tempfile::tempdir().unwrap();
let config = MemoryConfig {
cache_dir: temp_dir.path().join("cache"),
max_cache_bytes: 10_000,
..Default::default()
};
let mut store = CacheStore::new(&config);
let entry = CacheEntry::Binary {
metadata: HashMap::new(),
serialized_data: vec![1u8; 128],
};
store.insert("persist_me".to_string(), entry).unwrap();
assert!(store.persist_key("persist_me").unwrap());
let _ = store.remove("persist_me");
let loaded = store.get_or_load("persist_me").unwrap();
assert!(loaded.is_some());
assert_eq!(store.telemetry().disk_hits, 1);
assert_eq!(store.telemetry().restores, 1);
}
#[test]
fn test_cache_key_sanitization() {
assert_eq!(sanitize_key("test/key"), "test_key");
assert_eq!(sanitize_key("test:key"), "test_key");
assert_eq!(sanitize_key("test key"), "test_key");
}
#[test]
fn test_cache_key_generation() {
assert_eq!(pdg_cache_key("myproject"), "pdg:myproject");
assert_eq!(search_cache_key("myproject"), "search:myproject");
assert_eq!(
project_scan_cache_key("myproject"),
"project_scan:myproject"
);
assert!(analysis_cache_key("how does auth work").starts_with("analysis:"));
}
#[test]
fn test_memory_stats() {
let stats = MemoryStats {
rss_bytes: 1_000_000,
total_bytes: 8_000_000_000,
cache_entries: 10,
cache_bytes: 50_000,
spilled_entries: 5,
spilled_bytes: 25_000,
cache_hits: 80,
cache_memory_hits: 60,
cache_disk_hits: 20,
cache_misses: 20,
cache_hit_rate: 0.80,
cache_writes: 40,
cache_spills: 5,
cache_restores: 7,
};
let percent = stats.memory_percent();
assert!(percent > 0.0 && percent < 100.0);
assert!(stats.cache_hit_rate >= 0.0 && stats.cache_hit_rate <= 1.0);
}
#[test]
fn test_validation_result() {
let result = ValidationResult {
is_valid: true,
entry_type: Some("PDG".to_string()),
size_bytes: 1024,
error: None,
};
assert!(result.is_valid);
assert_eq!(result.entry_type.unwrap(), "PDG");
assert_eq!(result.size_bytes, 1024);
}
#[test]
fn test_restore_result() {
let result = RestoreResult {
entries_restored: 5,
entries_failed: vec![("key1".to_string(), "error".to_string())],
};
assert_eq!(result.entries_restored, 5);
assert_eq!(result.entries_failed.len(), 1);
}
#[test]
fn test_warm_strategy() {
assert_eq!(WarmStrategy::All, WarmStrategy::All);
assert_eq!(WarmStrategy::PDGOnly, WarmStrategy::PDGOnly);
assert_eq!(WarmStrategy::SearchIndexOnly, WarmStrategy::SearchIndexOnly);
assert_eq!(WarmStrategy::RecentFirst, WarmStrategy::RecentFirst);
}
#[test]
fn test_warm_result() {
let result = WarmResult {
entries_warmed: 10,
entries_skipped: 2,
warming_strategy: WarmStrategy::RecentFirst,
};
assert_eq!(result.entries_warmed, 10);
assert_eq!(result.entries_skipped, 2);
assert_eq!(result.warming_strategy, WarmStrategy::RecentFirst);
}
#[test]
fn test_cache_store_max_bytes() {
let config = MemoryConfig {
max_cache_bytes: 100_000,
..Default::default()
};
let store = CacheStore::new(&config);
assert_eq!(store.max_bytes(), 100_000);
}
#[test]
fn test_cache_store_pop_lru() {
let config = MemoryConfig {
max_cache_bytes: 10_000,
..Default::default()
};
let mut store = CacheStore::new(&config);
let entry = CacheEntry::Binary {
metadata: HashMap::new(),
serialized_data: vec![0u8; 100],
};
store.insert("test_key".to_string(), entry.clone()).unwrap();
let popped = store.pop_lru();
assert!(popped.is_some());
assert_eq!(popped.unwrap().0, "test_key");
assert!(store.pop_lru().is_none());
}
}