1use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::path::PathBuf;
13use std::sync::RwLock;
14use std::time::Instant;
15use tracing::{debug, info};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum StorageTier {
20 Hot,
22 #[default]
24 Warm,
25 Cold,
27}
28
29#[derive(Debug, Clone)]
31pub struct TierConfig {
32 pub tier: StorageTier,
34 pub path: PathBuf,
36 pub capacity: u64,
38 pub read_speed_mbps: u32,
40 pub write_speed_mbps: u32,
42 pub enabled: bool,
44}
45
46impl TierConfig {
47 #[must_use]
49 pub fn new(tier: StorageTier, path: impl Into<PathBuf>, capacity: u64) -> Self {
50 let (read_speed, write_speed) = match tier {
51 StorageTier::Hot => (500, 400), StorageTier::Warm => (150, 100), StorageTier::Cold => (50, 30), };
55
56 Self {
57 tier,
58 path: path.into(),
59 capacity,
60 read_speed_mbps: read_speed,
61 write_speed_mbps: write_speed,
62 enabled: true,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct TieredStorageConfig {
70 pub hot: Option<TierConfig>,
72 pub warm: TierConfig,
74 pub cold: Option<TierConfig>,
76 pub hot_promotion_threshold: u32,
78 pub hot_demotion_inactive_secs: u64,
80 pub cold_demotion_inactive_secs: u64,
82 pub rebalance_interval_secs: u64,
84 pub max_move_per_cycle: u64,
86}
87
88impl Default for TieredStorageConfig {
89 fn default() -> Self {
90 Self {
91 hot: None, warm: TierConfig::new(
93 StorageTier::Warm,
94 "/var/chie/warm",
95 100 * 1024 * 1024 * 1024,
96 ),
97 cold: None, hot_promotion_threshold: 10,
99 hot_demotion_inactive_secs: 3600, cold_demotion_inactive_secs: 7 * 24 * 3600, rebalance_interval_secs: 300, max_move_per_cycle: 1024 * 1024 * 1024, }
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ContentLocation {
110 pub cid: String,
112 pub tier: StorageTier,
114 pub size: u64,
116 pub access_count: u32,
118 pub last_accessed: u64,
120 pub tier_placed_at: u64,
122}
123
124#[derive(Debug, Clone)]
126#[allow(dead_code)]
127struct AccessRecord {
128 timestamp: Instant,
129 cid: String,
130}
131
132pub struct TieredStorageManager {
134 config: TieredStorageConfig,
136 locations: RwLock<HashMap<String, ContentLocation>>,
138 tier_usage: RwLock<HashMap<StorageTier, u64>>,
140 access_history: RwLock<VecDeque<AccessRecord>>,
142 pending_moves: RwLock<Vec<PendingMove>>,
144}
145
146#[derive(Debug, Clone)]
148pub struct PendingMove {
149 pub cid: String,
151 pub from: StorageTier,
153 pub to: StorageTier,
155 pub size: u64,
157 pub priority: u32,
159}
160
161impl TieredStorageManager {
162 #[must_use]
164 pub fn new(config: TieredStorageConfig) -> Self {
165 let mut tier_usage = HashMap::new();
166 tier_usage.insert(StorageTier::Warm, 0);
167 if config.hot.is_some() {
168 tier_usage.insert(StorageTier::Hot, 0);
169 }
170 if config.cold.is_some() {
171 tier_usage.insert(StorageTier::Cold, 0);
172 }
173
174 Self {
175 config,
176 locations: RwLock::new(HashMap::new()),
177 tier_usage: RwLock::new(tier_usage),
178 access_history: RwLock::new(VecDeque::with_capacity(10000)),
179 pending_moves: RwLock::new(Vec::new()),
180 }
181 }
182
183 #[must_use]
185 pub fn register_content(&self, cid: &str, size: u64) -> StorageTier {
186 let initial_tier = self.determine_initial_tier(size);
187 let now = current_timestamp();
188
189 let location = ContentLocation {
190 cid: cid.to_string(),
191 tier: initial_tier,
192 size,
193 access_count: 0,
194 last_accessed: now,
195 tier_placed_at: now,
196 };
197
198 {
199 let mut locations = self.locations.write().unwrap();
200 locations.insert(cid.to_string(), location);
201 }
202
203 {
204 let mut usage = self.tier_usage.write().unwrap();
205 *usage.entry(initial_tier).or_insert(0) += size;
206 }
207
208 info!(
209 "Registered content {} ({} bytes) in {:?} tier",
210 cid, size, initial_tier
211 );
212 initial_tier
213 }
214
215 pub fn record_access(&self, cid: &str) {
217 let mut locations = self.locations.write().unwrap();
218 if let Some(location) = locations.get_mut(cid) {
219 location.access_count += 1;
220 location.last_accessed = current_timestamp();
221 }
222
223 let mut history = self.access_history.write().unwrap();
225 history.push_back(AccessRecord {
226 timestamp: Instant::now(),
227 cid: cid.to_string(),
228 });
229
230 while history.len() > 10000 {
232 history.pop_front();
233 }
234 }
235
236 #[must_use]
238 #[inline]
239 pub fn get_location(&self, cid: &str) -> Option<ContentLocation> {
240 let locations = self.locations.read().unwrap();
241 locations.get(cid).cloned()
242 }
243
244 #[must_use]
246 #[inline]
247 pub fn get_content_path(&self, cid: &str) -> Option<PathBuf> {
248 let locations = self.locations.read().unwrap();
249 let location = locations.get(cid)?;
250
251 let tier_config = match location.tier {
252 StorageTier::Hot => self.config.hot.as_ref(),
253 StorageTier::Warm => Some(&self.config.warm),
254 StorageTier::Cold => self.config.cold.as_ref(),
255 };
256
257 tier_config.map(|c| c.path.join(cid))
258 }
259
260 #[inline]
262 fn determine_initial_tier(&self, size: u64) -> StorageTier {
263 if size < 10 * 1024 * 1024 {
265 if let Some(hot) = &self.config.hot {
266 if hot.enabled && self.has_space(StorageTier::Hot, size) {
267 return StorageTier::Hot;
268 }
269 }
270 }
271
272 if self.has_space(StorageTier::Warm, size) {
274 return StorageTier::Warm;
275 }
276
277 if let Some(cold) = &self.config.cold {
279 if cold.enabled && self.has_space(StorageTier::Cold, size) {
280 return StorageTier::Cold;
281 }
282 }
283
284 StorageTier::Warm
286 }
287
288 #[inline]
290 fn has_space(&self, tier: StorageTier, size: u64) -> bool {
291 let capacity = match tier {
292 StorageTier::Hot => self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0),
293 StorageTier::Warm => self.config.warm.capacity,
294 StorageTier::Cold => self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0),
295 };
296
297 let usage = self.tier_usage.read().unwrap();
298 let used = *usage.get(&tier).unwrap_or(&0);
299
300 used + size <= capacity
301 }
302
303 #[must_use]
305 #[inline]
306 pub fn analyze_tier_changes(&self) -> Vec<PendingMove> {
307 let mut moves = Vec::new();
308 let now = current_timestamp();
309 let locations = self.locations.read().unwrap();
310
311 for location in locations.values() {
312 if location.tier != StorageTier::Hot
314 && location.access_count >= self.config.hot_promotion_threshold
315 && self.config.hot.is_some()
316 && self.has_space(StorageTier::Hot, location.size)
317 {
318 moves.push(PendingMove {
319 cid: location.cid.clone(),
320 from: location.tier,
321 to: StorageTier::Hot,
322 size: location.size,
323 priority: location.access_count,
324 });
325 continue;
326 }
327
328 if location.tier == StorageTier::Hot {
330 let inactive_secs = now.saturating_sub(location.last_accessed);
331 if inactive_secs > self.config.hot_demotion_inactive_secs {
332 moves.push(PendingMove {
333 cid: location.cid.clone(),
334 from: StorageTier::Hot,
335 to: StorageTier::Warm,
336 size: location.size,
337 priority: 100 - location.access_count.min(100),
338 });
339 continue;
340 }
341 }
342
343 if location.tier == StorageTier::Warm && self.config.cold.is_some() {
345 let inactive_secs = now.saturating_sub(location.last_accessed);
346 if inactive_secs > self.config.cold_demotion_inactive_secs
347 && self.has_space(StorageTier::Cold, location.size)
348 {
349 moves.push(PendingMove {
350 cid: location.cid.clone(),
351 from: StorageTier::Warm,
352 to: StorageTier::Cold,
353 size: location.size,
354 priority: 0,
355 });
356 }
357 }
358 }
359
360 moves.sort_by(|a, b| b.priority.cmp(&a.priority));
362
363 moves
364 }
365
366 pub fn execute_move(&self, cid: &str, new_tier: StorageTier) {
368 let mut locations = self.locations.write().unwrap();
369 let mut usage = self.tier_usage.write().unwrap();
370
371 if let Some(location) = locations.get_mut(cid) {
372 let old_tier = location.tier;
373 let size = location.size;
374
375 if let Some(old_usage) = usage.get_mut(&old_tier) {
377 *old_usage = old_usage.saturating_sub(size);
378 }
379 *usage.entry(new_tier).or_insert(0) += size;
380
381 location.tier = new_tier;
383 location.tier_placed_at = current_timestamp();
384
385 debug!("Moved {} from {:?} to {:?}", cid, old_tier, new_tier);
386 }
387 }
388
389 pub fn remove_content(&self, cid: &str) {
391 let mut locations = self.locations.write().unwrap();
392 let mut usage = self.tier_usage.write().unwrap();
393
394 if let Some(location) = locations.remove(cid) {
395 if let Some(tier_usage) = usage.get_mut(&location.tier) {
396 *tier_usage = tier_usage.saturating_sub(location.size);
397 }
398 }
399 }
400
401 #[must_use]
403 pub fn tier_stats(&self) -> TierStats {
404 let usage = self.tier_usage.read().unwrap();
405 let locations = self.locations.read().unwrap();
406
407 let hot_used = *usage.get(&StorageTier::Hot).unwrap_or(&0);
408 let warm_used = *usage.get(&StorageTier::Warm).unwrap_or(&0);
409 let cold_used = *usage.get(&StorageTier::Cold).unwrap_or(&0);
410
411 let hot_capacity = self.config.hot.as_ref().map(|c| c.capacity).unwrap_or(0);
412 let warm_capacity = self.config.warm.capacity;
413 let cold_capacity = self.config.cold.as_ref().map(|c| c.capacity).unwrap_or(0);
414
415 let content_by_tier = locations.values().fold(HashMap::new(), |mut acc, loc| {
416 *acc.entry(loc.tier).or_insert(0) += 1;
417 acc
418 });
419
420 TierStats {
421 hot_used,
422 hot_capacity,
423 hot_content_count: *content_by_tier.get(&StorageTier::Hot).unwrap_or(&0),
424 warm_used,
425 warm_capacity,
426 warm_content_count: *content_by_tier.get(&StorageTier::Warm).unwrap_or(&0),
427 cold_used,
428 cold_capacity,
429 cold_content_count: *content_by_tier.get(&StorageTier::Cold).unwrap_or(&0),
430 total_content: locations.len(),
431 }
432 }
433
434 #[must_use]
436 #[inline]
437 pub fn get_pending_moves(&self) -> Vec<PendingMove> {
438 self.pending_moves.read().unwrap().clone()
439 }
440
441 #[must_use]
443 #[inline]
444 pub fn get_tier_path(&self, tier: StorageTier) -> Option<PathBuf> {
445 match tier {
446 StorageTier::Hot => self.config.hot.as_ref().map(|c| c.path.clone()),
447 StorageTier::Warm => Some(self.config.warm.path.clone()),
448 StorageTier::Cold => self.config.cold.as_ref().map(|c| c.path.clone()),
449 }
450 }
451
452 #[must_use]
454 #[inline]
455 pub fn get_tier_config(&self, tier: StorageTier) -> Option<&TierConfig> {
456 match tier {
457 StorageTier::Hot => self.config.hot.as_ref(),
458 StorageTier::Warm => Some(&self.config.warm),
459 StorageTier::Cold => self.config.cold.as_ref(),
460 }
461 }
462
463 #[must_use]
465 pub fn rebalance(&self) -> RebalanceResult {
466 let moves = self.analyze_tier_changes();
467 let mut bytes_moved = 0u64;
468 let mut moves_executed = 0;
469
470 let mut pending = self.pending_moves.write().unwrap();
471 pending.clear();
472
473 for m in moves {
474 if bytes_moved + m.size > self.config.max_move_per_cycle {
475 pending.push(m);
477 } else {
478 bytes_moved += m.size;
480 moves_executed += 1;
481 }
482 }
483
484 RebalanceResult {
485 moves_executed,
486 bytes_moved,
487 pending_moves: pending.len(),
488 }
489 }
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize, Default)]
494pub struct TierStats {
495 pub hot_used: u64,
497 pub hot_capacity: u64,
499 pub hot_content_count: usize,
501 pub warm_used: u64,
503 pub warm_capacity: u64,
505 pub warm_content_count: usize,
507 pub cold_used: u64,
509 pub cold_capacity: u64,
511 pub cold_content_count: usize,
513 pub total_content: usize,
515}
516
517#[derive(Debug, Clone)]
519pub struct RebalanceResult {
520 pub moves_executed: usize,
522 pub bytes_moved: u64,
524 pub pending_moves: usize,
526}
527
528fn current_timestamp() -> u64 {
530 std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .map(|d| d.as_secs())
533 .unwrap_or(0)
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[test]
541 fn test_tiered_storage_default_config() {
542 let config = TieredStorageConfig::default();
543 assert!(config.hot.is_none());
544 assert!(config.cold.is_none());
545 }
546
547 #[test]
548 fn test_register_content() {
549 let config = TieredStorageConfig::default();
550 let manager = TieredStorageManager::new(config);
551
552 let tier = manager.register_content("QmTest123", 1024 * 1024);
553 assert_eq!(tier, StorageTier::Warm);
554
555 let location = manager.get_location("QmTest123").unwrap();
556 assert_eq!(location.tier, StorageTier::Warm);
557 assert_eq!(location.size, 1024 * 1024);
558 }
559
560 #[test]
561 fn test_record_access() {
562 let config = TieredStorageConfig::default();
563 let manager = TieredStorageManager::new(config);
564
565 let _ = manager.register_content("QmTest123", 1024);
566
567 for _ in 0..5 {
568 manager.record_access("QmTest123");
569 }
570
571 let location = manager.get_location("QmTest123").unwrap();
572 assert_eq!(location.access_count, 5);
573 }
574
575 #[test]
576 fn test_tier_stats() {
577 let config = TieredStorageConfig::default();
578 let manager = TieredStorageManager::new(config);
579
580 let _ = manager.register_content("QmTest1", 1024);
581 let _ = manager.register_content("QmTest2", 2048);
582
583 let stats = manager.tier_stats();
584 assert_eq!(stats.warm_used, 3072);
585 assert_eq!(stats.total_content, 2);
586 }
587
588 #[test]
589 fn test_content_removal() {
590 let config = TieredStorageConfig::default();
591 let manager = TieredStorageManager::new(config);
592
593 let _ = manager.register_content("QmTest123", 1024);
594 assert!(manager.get_location("QmTest123").is_some());
595
596 manager.remove_content("QmTest123");
597 assert!(manager.get_location("QmTest123").is_none());
598 }
599
600 #[test]
601 fn test_hot_tier_placement() {
602 let config = TieredStorageConfig {
603 hot: Some(TierConfig::new(
604 StorageTier::Hot,
605 "/tmp/hot",
606 100 * 1024 * 1024,
607 )),
608 ..Default::default()
609 };
610
611 let manager = TieredStorageManager::new(config);
612
613 let tier = manager.register_content("QmSmall", 1024);
615 assert_eq!(tier, StorageTier::Hot);
616
617 let tier = manager.register_content("QmLarge", 50 * 1024 * 1024);
619 assert_eq!(tier, StorageTier::Warm);
620 }
621}