pub struct TieredStore {
hot: Arc<DashMap<Blake3Hash, HotCacheEntry>>,
warm_backend: Box<dyn StorageBackend>,
cold_backend: Box<dyn StorageBackend>,
archive_after_days: u32,
}
impl TieredStore {
pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
let warm_config = StorageConfig {
backend_type: crate::tdg::storage_backend::StorageBackendType::Libsql,
path: Some(db_path.as_ref().join(".pmat/tdg-warm.db")),
cache_size_mb: Some(128),
compression: true,
};
let cold_config = StorageConfig {
backend_type: crate::tdg::storage_backend::StorageBackendType::Libsql,
path: Some(db_path.as_ref().join(".pmat/tdg-cold.db")),
cache_size_mb: Some(64),
compression: false, };
Self::with_config(warm_config, cold_config)
}
pub fn with_config(warm_config: StorageConfig, cold_config: StorageConfig) -> Result<Self> {
let warm_backend = StorageBackendFactory::create_from_config(&warm_config)?;
let cold_backend = StorageBackendFactory::create_from_config(&cold_config)?;
Ok(Self {
hot: Arc::new(DashMap::new()),
warm_backend,
cold_backend,
archive_after_days: 30,
})
}
#[must_use]
pub fn in_memory() -> Self {
Self {
hot: Arc::new(DashMap::new()),
warm_backend: StorageBackendFactory::create_in_memory(),
cold_backend: StorageBackendFactory::create_in_memory(),
archive_after_days: 30,
}
}
pub async fn store(&self, record: FullTdgRecord) -> Result<()> {
let hash = record.identity.content_hash;
let hot_entry = HotCacheEntry::from_record(&record);
self.hot.insert(hash, hot_entry);
let serialized = serde_json::to_vec(&record)?;
let compressed = compress_prepend_size(&serialized);
self.warm_backend.put(hash.as_bytes(), &compressed)?;
if self.should_archive(&record) {
self.archive_to_cold(record).await?;
}
Ok(())
}
#[must_use]
pub fn get_hot(&self, hash: &Blake3Hash) -> Option<HotCacheEntry> {
self.hot.get(hash).map(|entry| *entry.value())
}
pub async fn retrieve_full(&self, hash: &Blake3Hash) -> Result<Option<FullTdgRecord>> {
if let Some(compressed) = self.warm_backend.get(hash.as_bytes())? {
let decompressed = decompress_size_prepended(&compressed)?;
return Ok(Some(serde_json::from_slice(&decompressed)?));
}
if let Some(archived) = self.cold_backend.get(hash.as_bytes())? {
return Ok(Some(serde_json::from_slice(&archived)?));
}
Ok(None)
}
fn should_archive(&self, record: &FullTdgRecord) -> bool {
let age_days = record
.metadata
.analysis_timestamp
.elapsed()
.unwrap_or_default()
.as_secs()
/ (24 * 60 * 60);
age_days > u64::from(self.archive_after_days)
}
async fn archive_to_cold(&self, record: FullTdgRecord) -> Result<()> {
let hash = record.identity.content_hash;
let serialized = serde_json::to_vec(&record)?;
self.cold_backend.put(hash.as_bytes(), &serialized)?;
self.warm_backend.delete(hash.as_bytes())?;
Ok(())
}
#[must_use]
pub fn cleanup_hot_cache(&self, max_age_seconds: u64) -> usize {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let mut removed = 0;
self.hot.retain(|_, entry| {
let age = now - entry.timestamp;
if age > max_age_seconds as i64 {
removed += 1;
false
} else {
true
}
});
removed
}
pub async fn migrate_backend(
&mut self,
new_warm_config: StorageConfig,
new_cold_config: StorageConfig,
) -> Result<()> {
let new_warm = StorageBackendFactory::create_from_config(&new_warm_config)?;
let new_cold = StorageBackendFactory::create_from_config(&new_cold_config)?;
if let Ok(iter) = self.warm_backend.iter() {
for result in iter {
let (key, value) = result?;
new_warm.put(&key, &value)?;
}
}
if let Ok(iter) = self.cold_backend.iter() {
for result in iter {
let (key, value) = result?;
new_cold.put(&key, &value)?;
}
}
self.warm_backend = new_warm;
self.cold_backend = new_cold;
Ok(())
}
pub fn flush(&self) -> Result<()> {
self.warm_backend.flush()?;
self.cold_backend.flush()?;
Ok(())
}
#[must_use]
pub fn get_statistics(&self) -> StorageStatistics {
let hot_entries = self.hot.len();
let hot_memory_kb = (hot_entries * std::mem::size_of::<HotCacheEntry>()) / 1024;
let warm_stats = self.warm_backend.get_stats();
let cold_stats = self.cold_backend.get_stats();
let warm_entries = warm_stats
.get("entry_count")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(0);
let cold_entries = cold_stats
.get("entry_count")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(0);
let total_entries = hot_entries + warm_entries + cold_entries;
let mut backend_stats = HashMap::new();
backend_stats.insert("warm".to_string(), warm_stats);
backend_stats.insert("cold".to_string(), cold_stats);
StorageStatistics {
hot_entries,
warm_entries,
cold_entries,
total_entries,
hot_memory_kb,
compression_ratio: 0.33, warm_backend: "sled".to_string(), cold_backend: "sled".to_string(), backend_stats,
}
}
}