Skip to main content

kora_core/shard/
migration.rs

1//! Automatic tier migration: hot (RAM) → warm (mmap) → cold (LZ4 disk).
2//!
3//! The [`TierMigrator`] scans keys in a shard and identifies candidates for
4//! demotion based on their LFU counter. Promotion happens synchronously on
5//! read via [`ShardStore::promote`].
6
7use crate::types::{CompactKey, TIER_COLD, TIER_HOT, TIER_WARM};
8
9/// Configuration for automatic tier migration thresholds.
10pub struct TierConfig {
11    /// LFU counter below this threshold triggers demotion from hot to warm.
12    pub warm_threshold: u8,
13    /// LFU counter below this threshold triggers demotion from warm to cold.
14    pub cold_threshold: u8,
15    /// Number of keys to scan per migration cycle.
16    pub scan_batch_size: usize,
17}
18
19impl Default for TierConfig {
20    fn default() -> Self {
21        Self {
22            warm_threshold: 3,
23            cold_threshold: 1,
24            scan_batch_size: 100,
25        }
26    }
27}
28
29/// Statistics returned from a single migration scan cycle.
30pub struct MigrationStats {
31    /// Number of keys demoted from hot to warm.
32    pub demoted_to_warm: usize,
33    /// Number of keys demoted from warm to cold.
34    pub demoted_to_cold: usize,
35    /// Total keys scanned in this cycle.
36    pub keys_scanned: usize,
37}
38
39/// Drives background tier migration for a single shard.
40///
41/// Call [`TierMigrator::scan_and_collect`] periodically to identify keys
42/// eligible for demotion. The actual data movement is performed by the
43/// server/engine layer which has access to the warm and cold backends.
44pub struct TierMigrator {
45    config: TierConfig,
46    scan_cursor: usize,
47}
48
49impl TierMigrator {
50    /// Create a new migrator with the given configuration.
51    pub fn new(config: TierConfig) -> Self {
52        Self {
53            config,
54            scan_cursor: 0,
55        }
56    }
57
58    /// Get a reference to the tier configuration.
59    pub fn config(&self) -> &TierConfig {
60        &self.config
61    }
62
63    /// Set a new tier configuration.
64    pub fn set_config(&mut self, config: TierConfig) {
65        self.config = config;
66    }
67
68    /// Scan keys and return those eligible for demotion.
69    ///
70    /// Returns a vec of `(key, serialized_value, target_tier)` tuples.
71    /// The caller is responsible for storing the data in the target tier
72    /// and then calling [`super::ShardStore::mark_demoted`].
73    pub fn scan_and_collect(
74        &mut self,
75        store: &mut super::ShardStore,
76    ) -> Vec<(CompactKey, Vec<u8>, u8)> {
77        let total = store.len();
78        if total == 0 {
79            return Vec::new();
80        }
81
82        let batch = self.config.scan_batch_size.min(total);
83        let start = self.scan_cursor % total;
84
85        let mut candidates = Vec::new();
86        let mut scanned = 0;
87
88        let keys: Vec<CompactKey> = store
89            .entries_iter()
90            .skip(start)
91            .take(batch)
92            .map(|(k, _)| k.clone())
93            .collect();
94
95        for key in &keys {
96            scanned += 1;
97            if let Some(entry) = store.get_entry_mut(key) {
98                entry.decay_lfu(1);
99
100                let tier = entry.tier();
101                let lfu = entry.lfu_counter;
102
103                if tier == TIER_HOT && lfu < self.config.warm_threshold {
104                    let serialized = entry.value.to_bytes();
105                    candidates.push((key.clone(), serialized, TIER_WARM));
106                } else if tier == TIER_WARM && lfu < self.config.cold_threshold {
107                    candidates.push((key.clone(), Vec::new(), TIER_COLD));
108                }
109            }
110        }
111
112        if scanned > 0 {
113            self.scan_cursor = (start + scanned) % total;
114        }
115
116        candidates
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::shard::ShardStore;
124    use crate::types::{CompactKey, KeyEntry, Value};
125
126    fn make_store_with_keys(keys: &[(&[u8], u8)]) -> ShardStore {
127        let mut store = ShardStore::new(0);
128        for (key, lfu) in keys {
129            let compact = CompactKey::new(key);
130            let mut entry = KeyEntry::new(compact.clone(), Value::from_bytes(b"value"));
131            entry.lfu_counter = *lfu;
132            store.insert_entry(compact, entry);
133        }
134        store
135    }
136
137    #[test]
138    fn test_tier_config_defaults() {
139        let config = TierConfig::default();
140        assert_eq!(config.warm_threshold, 3);
141        assert_eq!(config.cold_threshold, 1);
142        assert_eq!(config.scan_batch_size, 100);
143    }
144
145    #[test]
146    fn test_scan_identifies_low_lfu_keys() {
147        let mut store = make_store_with_keys(&[(b"hot-key", 10), (b"cold-key", 1)]);
148        let mut migrator = TierMigrator::new(TierConfig {
149            warm_threshold: 3,
150            cold_threshold: 1,
151            scan_batch_size: 100,
152        });
153
154        let candidates = migrator.scan_and_collect(&mut store);
155
156        let demoted_keys: Vec<&[u8]> = candidates.iter().map(|(k, _, _)| k.as_bytes()).collect();
157
158        assert!(demoted_keys.contains(&b"cold-key".as_slice()));
159
160        for (key, _, target) in &candidates {
161            if key.as_bytes() == b"cold-key" {
162                assert_eq!(*target, TIER_WARM);
163            }
164        }
165    }
166
167    #[test]
168    fn test_hot_keys_not_demoted() {
169        let mut store = make_store_with_keys(&[(b"hot1", 10), (b"hot2", 20), (b"hot3", 255)]);
170        let mut migrator = TierMigrator::new(TierConfig::default());
171
172        let candidates = migrator.scan_and_collect(&mut store);
173        assert!(candidates.is_empty());
174    }
175
176    #[test]
177    fn test_mark_demoted_replaces_with_warm_ref() {
178        let mut store = make_store_with_keys(&[(b"mykey", 1)]);
179        let key = CompactKey::new(b"mykey");
180
181        store.mark_demoted(&key, TIER_WARM, 0xDEAD);
182
183        let entry = store.get_entry(&key).expect("entry should exist");
184        assert_eq!(entry.tier(), TIER_WARM);
185        assert!(matches!(entry.value, Value::WarmRef(0xDEAD)));
186    }
187
188    #[test]
189    fn test_mark_demoted_replaces_with_cold_ref() {
190        let mut store = make_store_with_keys(&[(b"mykey", 0)]);
191        let key = CompactKey::new(b"mykey");
192
193        store.mark_demoted(&key, TIER_COLD, 0xBEEF);
194
195        let entry = store.get_entry(&key).expect("entry should exist");
196        assert_eq!(entry.tier(), TIER_COLD);
197        assert!(matches!(entry.value, Value::ColdRef(0xBEEF)));
198    }
199
200    #[test]
201    fn test_promote_restores_value() {
202        let mut store = make_store_with_keys(&[(b"mykey", 1)]);
203        let key = CompactKey::new(b"mykey");
204
205        store.mark_demoted(&key, TIER_WARM, 0xDEAD);
206
207        let restored_value = Value::from_bytes(b"restored");
208        store.promote(&key, restored_value.clone());
209
210        let entry = store.get_entry(&key).expect("entry should exist");
211        assert_eq!(entry.tier(), TIER_HOT);
212        assert_eq!(entry.value, restored_value);
213        assert_eq!(entry.lfu_counter, 5);
214    }
215
216    #[test]
217    fn test_empty_store_scan() {
218        let mut store = ShardStore::new(0);
219        let mut migrator = TierMigrator::new(TierConfig::default());
220        let candidates = migrator.scan_and_collect(&mut store);
221        assert!(candidates.is_empty());
222    }
223}