1use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21 pub hot_tier_capacity: usize,
23 pub hot_to_warm_threshold: Duration,
25 pub warm_to_cold_threshold: Duration,
27 pub auto_tier_enabled: bool,
29 pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34 fn default() -> Self {
35 Self {
36 hot_tier_capacity: 100_000,
37 hot_to_warm_threshold: Duration::from_secs(3600), warm_to_cold_threshold: Duration::from_secs(86400), auto_tier_enabled: true,
40 tier_check_interval: Duration::from_secs(300), }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48 Hot,
50 Warm,
52 Cold,
54}
55
56impl StorageTier {
57 pub fn as_str(&self) -> &'static str {
58 match self {
59 StorageTier::Hot => "hot",
60 StorageTier::Warm => "warm",
61 StorageTier::Cold => "cold",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68struct AccessInfo {
69 last_access: Instant,
71 access_count: u64,
73 tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78 fn default() -> Self {
79 Self {
80 last_access: Instant::now(),
81 access_count: 0,
82 tier: StorageTier::Hot,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90 pub hot_count: u64,
92 pub warm_count: u64,
94 pub cold_count: u64,
96 pub hot_hits: u64,
98 pub warm_hits: u64,
100 pub cold_hits: u64,
102 pub promotions_to_hot: u64,
104 pub demotions_to_warm: u64,
106 pub demotions_to_cold: u64,
108}
109
110pub struct TieredStorage<H, W, C> {
112 config: TieredStorageConfig,
114 hot_storage: H,
116 warm_storage: W,
118 cold_storage: C,
120 access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122 stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127 hot_count: AtomicU64,
128 warm_count: AtomicU64,
129 cold_count: AtomicU64,
130 hot_hits: AtomicU64,
131 warm_hits: AtomicU64,
132 cold_hits: AtomicU64,
133 promotions_to_hot: AtomicU64,
134 demotions_to_warm: AtomicU64,
135 demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139 fn default() -> Self {
140 Self {
141 hot_count: AtomicU64::new(0),
142 warm_count: AtomicU64::new(0),
143 cold_count: AtomicU64::new(0),
144 hot_hits: AtomicU64::new(0),
145 warm_hits: AtomicU64::new(0),
146 cold_hits: AtomicU64::new(0),
147 promotions_to_hot: AtomicU64::new(0),
148 demotions_to_warm: AtomicU64::new(0),
149 demotions_to_cold: AtomicU64::new(0),
150 }
151 }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156 H: VectorStorage,
157 W: VectorStorage,
158 C: VectorStorage,
159{
160 pub fn new(
162 config: TieredStorageConfig,
163 hot_storage: H,
164 warm_storage: W,
165 cold_storage: C,
166 ) -> Self {
167 Self {
168 config,
169 hot_storage,
170 warm_storage,
171 cold_storage,
172 access_info: RwLock::new(HashMap::new()),
173 stats: TieredStorageStatsInner::default(),
174 }
175 }
176
177 pub fn config(&self) -> &TieredStorageConfig {
179 &self.config
180 }
181
182 fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184 let key = (namespace.clone(), id.clone());
185 let mut access_map = self.access_info.write();
186
187 let info = access_map.entry(key).or_default();
188 info.last_access = Instant::now();
189 info.access_count += 1;
190 info.tier = tier;
191
192 match tier {
194 StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195 StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196 StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197 };
198 }
199
200 fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202 let access_map = self.access_info.read();
203 access_map
204 .get(&(namespace.clone(), id.clone()))
205 .map(|info| info.tier)
206 }
207
208 pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210 let current_tier = self.get_tier(namespace, id);
211
212 match current_tier {
213 Some(StorageTier::Warm) => {
214 let vectors = self
216 .warm_storage
217 .get(namespace, std::slice::from_ref(id))
218 .await?;
219 if !vectors.is_empty() {
220 self.hot_storage.upsert(namespace, vectors).await?;
221 self.warm_storage
222 .delete(namespace, std::slice::from_ref(id))
223 .await?;
224
225 self.update_tier(namespace, id, StorageTier::Hot);
226 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230 return Ok(true);
231 }
232 }
233 Some(StorageTier::Cold) => {
234 let vectors = self
236 .cold_storage
237 .get(namespace, std::slice::from_ref(id))
238 .await?;
239 if !vectors.is_empty() {
240 let should_be_hot = {
242 let access_map = self.access_info.read();
243 access_map
244 .get(&(namespace.clone(), id.clone()))
245 .map(|info| info.access_count > 10)
246 .unwrap_or(false)
247 };
248
249 if should_be_hot {
250 self.hot_storage.upsert(namespace, vectors).await?;
251 self.update_tier(namespace, id, StorageTier::Hot);
252 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254 } else {
255 self.warm_storage.upsert(namespace, vectors).await?;
256 self.update_tier(namespace, id, StorageTier::Warm);
257 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258 }
259 return Ok(true);
264 }
265 }
266 _ => {}
267 }
268
269 Ok(false)
270 }
271
272 pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274 let current_tier = self.get_tier(namespace, id);
275
276 match current_tier {
277 Some(StorageTier::Hot) => {
278 let vectors = self
280 .hot_storage
281 .get(namespace, std::slice::from_ref(id))
282 .await?;
283 if !vectors.is_empty() {
284 self.warm_storage.upsert(namespace, vectors).await?;
285 self.hot_storage
286 .delete(namespace, std::slice::from_ref(id))
287 .await?;
288
289 self.update_tier(namespace, id, StorageTier::Warm);
290 self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291 self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294 return Ok(true);
295 }
296 }
297 Some(StorageTier::Warm) => {
298 let vectors = self
300 .warm_storage
301 .get(namespace, std::slice::from_ref(id))
302 .await?;
303 if !vectors.is_empty() {
304 self.cold_storage.upsert(namespace, vectors).await?;
305 self.warm_storage
306 .delete(namespace, std::slice::from_ref(id))
307 .await?;
308
309 self.update_tier(namespace, id, StorageTier::Cold);
310 self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312 self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314 return Ok(true);
315 }
316 }
317 _ => {}
318 }
319
320 Ok(false)
321 }
322
323 fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325 let mut access_map = self.access_info.write();
326 let key = (namespace.clone(), id.clone());
327 let info = access_map.entry(key).or_default();
328 info.tier = tier;
329 }
330
331 pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
333 if !self.config.auto_tier_enabled {
334 return Ok(TieringResult::default());
335 }
336
337 let now = Instant::now();
338 let mut to_demote_to_warm = Vec::new();
339 let mut to_demote_to_cold = Vec::new();
340
341 {
343 let access_map = self.access_info.read();
344 for ((namespace, id), info) in access_map.iter() {
345 let elapsed = now.duration_since(info.last_access);
346
347 match info.tier {
348 StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
349 to_demote_to_warm.push((namespace.clone(), id.clone()));
350 }
351 StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
352 to_demote_to_cold.push((namespace.clone(), id.clone()));
353 }
354 _ => {}
355 }
356 }
357 }
358
359 let mut demoted_to_warm = 0;
361 let mut demoted_to_cold = 0;
362
363 for (namespace, id) in to_demote_to_warm {
364 if self.demote(&namespace, &id).await? {
365 demoted_to_warm += 1;
366 }
367 }
368
369 for (namespace, id) in to_demote_to_cold {
370 if self.demote(&namespace, &id).await? {
371 demoted_to_cold += 1;
372 }
373 }
374
375 Ok(TieringResult {
376 demoted_to_warm,
377 demoted_to_cold,
378 promoted_to_hot: 0,
379 promoted_to_warm: 0,
380 })
381 }
382
383 pub fn stats(&self) -> TieredStorageStats {
385 TieredStorageStats {
386 hot_count: self.stats.hot_count.load(Ordering::Relaxed),
387 warm_count: self.stats.warm_count.load(Ordering::Relaxed),
388 cold_count: self.stats.cold_count.load(Ordering::Relaxed),
389 hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
390 warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
391 cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
392 promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
393 demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
394 demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
395 }
396 }
397
398 pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
400 let access_map = self.access_info.read();
401 let mut hot = 0u64;
402 let mut warm = 0u64;
403 let mut cold = 0u64;
404
405 for ((ns, _), info) in access_map.iter() {
406 if ns == namespace {
407 match info.tier {
408 StorageTier::Hot => hot += 1,
409 StorageTier::Warm => warm += 1,
410 StorageTier::Cold => cold += 1,
411 }
412 }
413 }
414
415 TierDistribution { hot, warm, cold }
416 }
417}
418
419#[derive(Debug, Clone, Default)]
421pub struct TieringResult {
422 pub demoted_to_warm: u64,
424 pub demoted_to_cold: u64,
426 pub promoted_to_hot: u64,
428 pub promoted_to_warm: u64,
430}
431
432#[derive(Debug, Clone)]
434pub struct TierDistribution {
435 pub hot: u64,
436 pub warm: u64,
437 pub cold: u64,
438}
439
440#[async_trait]
441impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
442where
443 H: VectorStorage,
444 W: VectorStorage,
445 C: VectorStorage,
446{
447 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
448 let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
450
451 let cold_vectors = vectors.clone();
454 self.cold_storage.ensure_namespace(namespace).await?;
455 self.cold_storage.upsert(namespace, cold_vectors).await?;
456
457 let count = self.hot_storage.upsert(namespace, vectors).await?;
459
460 for id in &ids {
461 self.update_tier(namespace, id, StorageTier::Hot);
462 self.record_access(namespace, id, StorageTier::Hot);
463 }
464
465 self.stats
466 .hot_count
467 .fetch_add(count as u64, Ordering::Relaxed);
468 Ok(count)
469 }
470
471 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
472 let mut results = Vec::with_capacity(ids.len());
473 let mut remaining_ids: Vec<VectorId> = ids.to_vec();
474
475 let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
477 Ok(v) => v,
478 Err(DakeraError::NamespaceNotFound(_)) => vec![],
479 Err(e) => return Err(e),
480 };
481 for v in &hot_results {
482 self.record_access(namespace, &v.id, StorageTier::Hot);
483 }
484
485 let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
487 remaining_ids.retain(|id| !found_ids.contains(id));
488 results.extend(hot_results);
489
490 if remaining_ids.is_empty() {
491 return Ok(results);
492 }
493
494 let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
496 Ok(v) => v,
497 Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
498 Err(e) => return Err(e),
499 };
500 for v in &warm_results {
501 self.record_access(namespace, &v.id, StorageTier::Warm);
502 }
503
504 let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
505 remaining_ids.retain(|id| !found_ids.contains(id));
506 results.extend(warm_results);
507
508 if remaining_ids.is_empty() {
509 return Ok(results);
510 }
511
512 let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
514 Ok(v) => v,
515 Err(DakeraError::NamespaceNotFound(_)) => vec![],
516 Err(e) => return Err(e),
517 };
518 for v in &cold_results {
519 self.record_access(namespace, &v.id, StorageTier::Cold);
520 }
521 results.extend(cold_results);
522
523 Ok(results)
524 }
525
526 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
527 let mut seen = std::collections::HashSet::new();
528 let mut results = Vec::new();
529
530 let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
532 match res {
533 Ok(v) => Ok(v),
534 Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
535 Err(e) => Err(e),
536 }
537 };
538
539 for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
543 if seen.insert(v.id.clone()) {
544 results.push(v);
545 }
546 }
547 for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
548 if seen.insert(v.id.clone()) {
549 results.push(v);
550 }
551 }
552 for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
553 if seen.insert(v.id.clone()) {
554 results.push(v);
555 }
556 }
557
558 Ok(results)
559 }
560
561 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
562 let mut deleted = 0;
563
564 match self.hot_storage.delete(namespace, ids).await {
567 Ok(n) => deleted += n,
568 Err(DakeraError::NamespaceNotFound(_)) => {}
569 Err(e) => return Err(e),
570 }
571 match self.warm_storage.delete(namespace, ids).await {
572 Ok(n) => deleted += n,
573 Err(DakeraError::NamespaceNotFound(_)) => {}
574 Err(e) => return Err(e),
575 }
576 match self.cold_storage.delete(namespace, ids).await {
577 Ok(n) => deleted += n,
578 Err(DakeraError::NamespaceNotFound(_)) => {}
579 Err(e) => return Err(e),
580 }
581
582 {
584 let mut access_map = self.access_info.write();
585 for id in ids {
586 access_map.remove(&(namespace.clone(), id.clone()));
587 }
588 }
589
590 Ok(deleted)
591 }
592
593 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
594 Ok(self.hot_storage.namespace_exists(namespace).await?
596 || self.warm_storage.namespace_exists(namespace).await?
597 || self.cold_storage.namespace_exists(namespace).await?)
598 }
599
600 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
601 self.hot_storage.ensure_namespace(namespace).await?;
603 self.warm_storage.ensure_namespace(namespace).await?;
604 self.cold_storage.ensure_namespace(namespace).await?;
605 Ok(())
606 }
607
608 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
609 let cold = self.cold_storage.count(namespace).await?;
614 if cold > 0 {
615 return Ok(cold);
616 }
617 let hot = self.hot_storage.count(namespace).await?;
619 let warm = self.warm_storage.count(namespace).await?;
620 Ok(hot + warm)
621 }
622
623 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
624 if let Some(dim) = self.hot_storage.dimension(namespace).await? {
626 return Ok(Some(dim));
627 }
628 if let Some(dim) = self.warm_storage.dimension(namespace).await? {
629 return Ok(Some(dim));
630 }
631 self.cold_storage.dimension(namespace).await
632 }
633
634 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
635 let mut namespaces = std::collections::HashSet::new();
636
637 namespaces.extend(self.hot_storage.list_namespaces().await?);
638 namespaces.extend(self.warm_storage.list_namespaces().await?);
639 namespaces.extend(self.cold_storage.list_namespaces().await?);
640
641 Ok(namespaces.into_iter().collect())
642 }
643
644 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
645 let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
647 let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
648 let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
649
650 {
652 let mut access_map = self.access_info.write();
653 access_map.retain(|(ns, _), _| ns != namespace);
654 }
655
656 Ok(hot_deleted || warm_deleted || cold_deleted)
657 }
658
659 async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
660 let mut total = 0;
662 total += self.hot_storage.cleanup_expired(namespace).await?;
663 total += self.warm_storage.cleanup_expired(namespace).await?;
664 total += self.cold_storage.cleanup_expired(namespace).await?;
665 Ok(total)
666 }
667
668 async fn cleanup_all_expired(&self) -> Result<usize> {
669 let mut total = 0;
671 total += self.hot_storage.cleanup_all_expired().await?;
672 total += self.warm_storage.cleanup_all_expired().await?;
673 total += self.cold_storage.cleanup_all_expired().await?;
674 Ok(total)
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681 use crate::memory::InMemoryStorage;
682
683 fn create_test_vector(id: &str, dim: usize) -> Vector {
684 Vector {
685 id: id.to_string(),
686 values: vec![1.0; dim],
687 metadata: None,
688 ttl_seconds: None,
689 expires_at: None,
690 }
691 }
692
693 #[tokio::test]
694 async fn test_tiered_storage_basic() {
695 let config = TieredStorageConfig::default();
696 let storage = TieredStorage::new(
697 config,
698 InMemoryStorage::new(),
699 InMemoryStorage::new(),
700 InMemoryStorage::new(),
701 );
702
703 let namespace = "test".to_string();
704 storage.ensure_namespace(&namespace).await.unwrap();
705
706 let vectors = vec![create_test_vector("v1", 4)];
708 let count = storage.upsert(&namespace, vectors).await.unwrap();
709 assert_eq!(count, 1);
710
711 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
713 assert_eq!(results.len(), 1);
714
715 let stats = storage.stats();
716 assert_eq!(stats.hot_hits, 2); }
718
719 #[tokio::test]
720 async fn test_tiered_storage_promotion_demotion() {
721 let config = TieredStorageConfig::default();
722 let storage = TieredStorage::new(
723 config,
724 InMemoryStorage::new(),
725 InMemoryStorage::new(),
726 InMemoryStorage::new(),
727 );
728
729 let namespace = "test".to_string();
730 storage.ensure_namespace(&namespace).await.unwrap();
731
732 storage
734 .upsert(&namespace, vec![create_test_vector("v1", 4)])
735 .await
736 .unwrap();
737
738 assert_eq!(
740 storage.get_tier(&namespace, &"v1".to_string()),
741 Some(StorageTier::Hot)
742 );
743
744 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
746 assert!(demoted);
747 assert_eq!(
748 storage.get_tier(&namespace, &"v1".to_string()),
749 Some(StorageTier::Warm)
750 );
751
752 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
754 assert!(demoted);
755 assert_eq!(
756 storage.get_tier(&namespace, &"v1".to_string()),
757 Some(StorageTier::Cold)
758 );
759
760 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
762 assert_eq!(results.len(), 1);
763
764 let stats = storage.stats();
765 assert_eq!(stats.demotions_to_warm, 1);
766 assert_eq!(stats.demotions_to_cold, 1);
767 }
768
769 #[tokio::test]
770 async fn test_tiered_storage_multi_tier_get() {
771 let config = TieredStorageConfig::default();
772 let storage = TieredStorage::new(
773 config,
774 InMemoryStorage::new(),
775 InMemoryStorage::new(),
776 InMemoryStorage::new(),
777 );
778
779 let namespace = "test".to_string();
780 storage.ensure_namespace(&namespace).await.unwrap();
781
782 for i in 0..3 {
784 storage
785 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
786 .await
787 .unwrap();
788 }
789
790 storage.demote(&namespace, &"v1".to_string()).await.unwrap();
792 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
793 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
794
795 let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
797 let results = storage.get(&namespace, &ids).await.unwrap();
798 assert_eq!(results.len(), 3);
799 }
800
801 #[tokio::test]
802 async fn test_tier_distribution() {
803 let config = TieredStorageConfig::default();
804 let storage = TieredStorage::new(
805 config,
806 InMemoryStorage::new(),
807 InMemoryStorage::new(),
808 InMemoryStorage::new(),
809 );
810
811 let namespace = "test".to_string();
812 storage.ensure_namespace(&namespace).await.unwrap();
813
814 for i in 0..5 {
816 storage
817 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
818 .await
819 .unwrap();
820 }
821
822 storage.demote(&namespace, &"v3".to_string()).await.unwrap();
824 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
825 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
826
827 let dist = storage.tier_distribution(&namespace);
828 assert_eq!(dist.hot, 3);
829 assert_eq!(dist.warm, 1);
830 assert_eq!(dist.cold, 1);
831 }
832}