kora_core/shard/
migration.rs1use crate::types::{CompactKey, TIER_COLD, TIER_HOT, TIER_WARM};
8
9pub struct TierConfig {
11 pub warm_threshold: u8,
13 pub cold_threshold: u8,
15 pub scan_batch_size: usize,
17}
18
19impl Default for TierConfig {
20 fn default() -> Self {
21 Self {
22 warm_threshold: 3,
23 cold_threshold: 1,
24 scan_batch_size: 100,
25 }
26 }
27}
28
29pub struct MigrationStats {
31 pub demoted_to_warm: usize,
33 pub demoted_to_cold: usize,
35 pub keys_scanned: usize,
37}
38
39pub struct TierMigrator {
45 config: TierConfig,
46 scan_cursor: usize,
47}
48
49impl TierMigrator {
50 pub fn new(config: TierConfig) -> Self {
52 Self {
53 config,
54 scan_cursor: 0,
55 }
56 }
57
58 pub fn config(&self) -> &TierConfig {
60 &self.config
61 }
62
63 pub fn set_config(&mut self, config: TierConfig) {
65 self.config = config;
66 }
67
68 pub fn scan_and_collect(
74 &mut self,
75 store: &mut super::ShardStore,
76 ) -> Vec<(CompactKey, Vec<u8>, u8)> {
77 let total = store.len();
78 if total == 0 {
79 return Vec::new();
80 }
81
82 let batch = self.config.scan_batch_size.min(total);
83 let start = self.scan_cursor % total;
84
85 let mut candidates = Vec::new();
86 let mut scanned = 0;
87
88 let keys: Vec<CompactKey> = store
89 .entries_iter()
90 .skip(start)
91 .take(batch)
92 .map(|(k, _)| k.clone())
93 .collect();
94
95 for key in &keys {
96 scanned += 1;
97 if let Some(entry) = store.get_entry_mut(key) {
98 entry.decay_lfu(1);
99
100 let tier = entry.tier();
101 let lfu = entry.lfu_counter;
102
103 if tier == TIER_HOT && lfu < self.config.warm_threshold {
104 let serialized = entry.value.to_bytes();
105 candidates.push((key.clone(), serialized, TIER_WARM));
106 } else if tier == TIER_WARM && lfu < self.config.cold_threshold {
107 candidates.push((key.clone(), Vec::new(), TIER_COLD));
108 }
109 }
110 }
111
112 if scanned > 0 {
113 self.scan_cursor = (start + scanned) % total;
114 }
115
116 candidates
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use crate::shard::ShardStore;
124 use crate::types::{CompactKey, KeyEntry, Value};
125
126 fn make_store_with_keys(keys: &[(&[u8], u8)]) -> ShardStore {
127 let mut store = ShardStore::new(0);
128 for (key, lfu) in keys {
129 let compact = CompactKey::new(key);
130 let mut entry = KeyEntry::new(compact.clone(), Value::from_bytes(b"value"));
131 entry.lfu_counter = *lfu;
132 store.insert_entry(compact, entry);
133 }
134 store
135 }
136
137 #[test]
138 fn test_tier_config_defaults() {
139 let config = TierConfig::default();
140 assert_eq!(config.warm_threshold, 3);
141 assert_eq!(config.cold_threshold, 1);
142 assert_eq!(config.scan_batch_size, 100);
143 }
144
145 #[test]
146 fn test_scan_identifies_low_lfu_keys() {
147 let mut store = make_store_with_keys(&[(b"hot-key", 10), (b"cold-key", 1)]);
148 let mut migrator = TierMigrator::new(TierConfig {
149 warm_threshold: 3,
150 cold_threshold: 1,
151 scan_batch_size: 100,
152 });
153
154 let candidates = migrator.scan_and_collect(&mut store);
155
156 let demoted_keys: Vec<&[u8]> = candidates.iter().map(|(k, _, _)| k.as_bytes()).collect();
157
158 assert!(demoted_keys.contains(&b"cold-key".as_slice()));
159
160 for (key, _, target) in &candidates {
161 if key.as_bytes() == b"cold-key" {
162 assert_eq!(*target, TIER_WARM);
163 }
164 }
165 }
166
167 #[test]
168 fn test_hot_keys_not_demoted() {
169 let mut store = make_store_with_keys(&[(b"hot1", 10), (b"hot2", 20), (b"hot3", 255)]);
170 let mut migrator = TierMigrator::new(TierConfig::default());
171
172 let candidates = migrator.scan_and_collect(&mut store);
173 assert!(candidates.is_empty());
174 }
175
176 #[test]
177 fn test_mark_demoted_replaces_with_warm_ref() {
178 let mut store = make_store_with_keys(&[(b"mykey", 1)]);
179 let key = CompactKey::new(b"mykey");
180
181 store.mark_demoted(&key, TIER_WARM, 0xDEAD);
182
183 let entry = store.get_entry(&key).expect("entry should exist");
184 assert_eq!(entry.tier(), TIER_WARM);
185 assert!(matches!(entry.value, Value::WarmRef(0xDEAD)));
186 }
187
188 #[test]
189 fn test_mark_demoted_replaces_with_cold_ref() {
190 let mut store = make_store_with_keys(&[(b"mykey", 0)]);
191 let key = CompactKey::new(b"mykey");
192
193 store.mark_demoted(&key, TIER_COLD, 0xBEEF);
194
195 let entry = store.get_entry(&key).expect("entry should exist");
196 assert_eq!(entry.tier(), TIER_COLD);
197 assert!(matches!(entry.value, Value::ColdRef(0xBEEF)));
198 }
199
200 #[test]
201 fn test_promote_restores_value() {
202 let mut store = make_store_with_keys(&[(b"mykey", 1)]);
203 let key = CompactKey::new(b"mykey");
204
205 store.mark_demoted(&key, TIER_WARM, 0xDEAD);
206
207 let restored_value = Value::from_bytes(b"restored");
208 store.promote(&key, restored_value.clone());
209
210 let entry = store.get_entry(&key).expect("entry should exist");
211 assert_eq!(entry.tier(), TIER_HOT);
212 assert_eq!(entry.value, restored_value);
213 assert_eq!(entry.lfu_counter, 5);
214 }
215
216 #[test]
217 fn test_empty_store_scan() {
218 let mut store = ShardStore::new(0);
219 let mut migrator = TierMigrator::new(TierConfig::default());
220 let candidates = migrator.scan_and_collect(&mut store);
221 assert!(candidates.is_empty());
222 }
223}