use crate::types::{CompactKey, TIER_COLD, TIER_HOT, TIER_WARM};
pub struct TierConfig {
pub warm_threshold: u8,
pub cold_threshold: u8,
pub scan_batch_size: usize,
}
impl Default for TierConfig {
fn default() -> Self {
Self {
warm_threshold: 3,
cold_threshold: 1,
scan_batch_size: 100,
}
}
}
pub struct MigrationStats {
pub demoted_to_warm: usize,
pub demoted_to_cold: usize,
pub keys_scanned: usize,
}
pub struct TierMigrator {
config: TierConfig,
scan_cursor: usize,
}
impl TierMigrator {
pub fn new(config: TierConfig) -> Self {
Self {
config,
scan_cursor: 0,
}
}
pub fn config(&self) -> &TierConfig {
&self.config
}
pub fn set_config(&mut self, config: TierConfig) {
self.config = config;
}
pub fn scan_and_collect(
&mut self,
store: &mut super::ShardStore,
) -> Vec<(CompactKey, Vec<u8>, u8)> {
let total = store.len();
if total == 0 {
return Vec::new();
}
let batch = self.config.scan_batch_size.min(total);
let start = self.scan_cursor % total;
let mut candidates = Vec::new();
let mut scanned = 0;
let keys: Vec<CompactKey> = store
.entries_iter()
.skip(start)
.take(batch)
.map(|(k, _)| k.clone())
.collect();
for key in &keys {
scanned += 1;
if let Some(entry) = store.get_entry_mut(key) {
entry.decay_lfu(1);
let tier = entry.tier();
let lfu = entry.lfu_counter;
if tier == TIER_HOT && lfu < self.config.warm_threshold {
let serialized = entry.value.to_bytes();
candidates.push((key.clone(), serialized, TIER_WARM));
} else if tier == TIER_WARM && lfu < self.config.cold_threshold {
candidates.push((key.clone(), Vec::new(), TIER_COLD));
}
}
}
if scanned > 0 {
self.scan_cursor = (start + scanned) % total;
}
candidates
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::shard::ShardStore;
use crate::types::{CompactKey, KeyEntry, Value};
fn make_store_with_keys(keys: &[(&[u8], u8)]) -> ShardStore {
let mut store = ShardStore::new(0);
for (key, lfu) in keys {
let compact = CompactKey::new(key);
let mut entry = KeyEntry::new(compact.clone(), Value::from_bytes(b"value"));
entry.lfu_counter = *lfu;
store.insert_entry(compact, entry);
}
store
}
#[test]
fn test_tier_config_defaults() {
let config = TierConfig::default();
assert_eq!(config.warm_threshold, 3);
assert_eq!(config.cold_threshold, 1);
assert_eq!(config.scan_batch_size, 100);
}
#[test]
fn test_scan_identifies_low_lfu_keys() {
let mut store = make_store_with_keys(&[(b"hot-key", 10), (b"cold-key", 1)]);
let mut migrator = TierMigrator::new(TierConfig {
warm_threshold: 3,
cold_threshold: 1,
scan_batch_size: 100,
});
let candidates = migrator.scan_and_collect(&mut store);
let demoted_keys: Vec<&[u8]> = candidates.iter().map(|(k, _, _)| k.as_bytes()).collect();
assert!(demoted_keys.contains(&b"cold-key".as_slice()));
for (key, _, target) in &candidates {
if key.as_bytes() == b"cold-key" {
assert_eq!(*target, TIER_WARM);
}
}
}
#[test]
fn test_hot_keys_not_demoted() {
let mut store = make_store_with_keys(&[(b"hot1", 10), (b"hot2", 20), (b"hot3", 255)]);
let mut migrator = TierMigrator::new(TierConfig::default());
let candidates = migrator.scan_and_collect(&mut store);
assert!(candidates.is_empty());
}
#[test]
fn test_mark_demoted_replaces_with_warm_ref() {
let mut store = make_store_with_keys(&[(b"mykey", 1)]);
let key = CompactKey::new(b"mykey");
store.mark_demoted(&key, TIER_WARM, 0xDEAD);
let entry = store.get_entry(&key).expect("entry should exist");
assert_eq!(entry.tier(), TIER_WARM);
assert!(matches!(entry.value, Value::WarmRef(0xDEAD)));
}
#[test]
fn test_mark_demoted_replaces_with_cold_ref() {
let mut store = make_store_with_keys(&[(b"mykey", 0)]);
let key = CompactKey::new(b"mykey");
store.mark_demoted(&key, TIER_COLD, 0xBEEF);
let entry = store.get_entry(&key).expect("entry should exist");
assert_eq!(entry.tier(), TIER_COLD);
assert!(matches!(entry.value, Value::ColdRef(0xBEEF)));
}
#[test]
fn test_promote_restores_value() {
let mut store = make_store_with_keys(&[(b"mykey", 1)]);
let key = CompactKey::new(b"mykey");
store.mark_demoted(&key, TIER_WARM, 0xDEAD);
let restored_value = Value::from_bytes(b"restored");
store.promote(&key, restored_value.clone());
let entry = store.get_entry(&key).expect("entry should exist");
assert_eq!(entry.tier(), TIER_HOT);
assert_eq!(entry.value, restored_value);
assert_eq!(entry.lfu_counter, 5);
}
#[test]
fn test_empty_store_scan() {
let mut store = ShardStore::new(0);
let mut migrator = TierMigrator::new(TierConfig::default());
let candidates = migrator.scan_and_collect(&mut store);
assert!(candidates.is_empty());
}
}