1use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21 pub hot_tier_capacity: usize,
23 pub hot_to_warm_threshold: Duration,
25 pub warm_to_cold_threshold: Duration,
27 pub auto_tier_enabled: bool,
29 pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34 fn default() -> Self {
35 Self {
36 hot_tier_capacity: 100_000,
37 hot_to_warm_threshold: Duration::from_secs(3600), warm_to_cold_threshold: Duration::from_secs(86400), auto_tier_enabled: true,
40 tier_check_interval: Duration::from_secs(300), }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48 Hot,
50 Warm,
52 Cold,
54}
55
56impl StorageTier {
57 pub fn as_str(&self) -> &'static str {
58 match self {
59 StorageTier::Hot => "hot",
60 StorageTier::Warm => "warm",
61 StorageTier::Cold => "cold",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68struct AccessInfo {
69 last_access: Instant,
71 access_count: u64,
73 tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78 fn default() -> Self {
79 Self {
80 last_access: Instant::now(),
81 access_count: 0,
82 tier: StorageTier::Hot,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90 pub hot_count: u64,
92 pub warm_count: u64,
94 pub cold_count: u64,
96 pub hot_hits: u64,
98 pub warm_hits: u64,
100 pub cold_hits: u64,
102 pub promotions_to_hot: u64,
104 pub demotions_to_warm: u64,
106 pub demotions_to_cold: u64,
108}
109
110pub struct TieredStorage<H, W, C> {
112 config: TieredStorageConfig,
114 hot_storage: H,
116 warm_storage: W,
118 cold_storage: C,
120 access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122 stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127 hot_count: AtomicU64,
128 warm_count: AtomicU64,
129 cold_count: AtomicU64,
130 hot_hits: AtomicU64,
131 warm_hits: AtomicU64,
132 cold_hits: AtomicU64,
133 promotions_to_hot: AtomicU64,
134 demotions_to_warm: AtomicU64,
135 demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139 fn default() -> Self {
140 Self {
141 hot_count: AtomicU64::new(0),
142 warm_count: AtomicU64::new(0),
143 cold_count: AtomicU64::new(0),
144 hot_hits: AtomicU64::new(0),
145 warm_hits: AtomicU64::new(0),
146 cold_hits: AtomicU64::new(0),
147 promotions_to_hot: AtomicU64::new(0),
148 demotions_to_warm: AtomicU64::new(0),
149 demotions_to_cold: AtomicU64::new(0),
150 }
151 }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156 H: VectorStorage,
157 W: VectorStorage,
158 C: VectorStorage,
159{
160 pub fn new(
162 config: TieredStorageConfig,
163 hot_storage: H,
164 warm_storage: W,
165 cold_storage: C,
166 ) -> Self {
167 Self {
168 config,
169 hot_storage,
170 warm_storage,
171 cold_storage,
172 access_info: RwLock::new(HashMap::new()),
173 stats: TieredStorageStatsInner::default(),
174 }
175 }
176
177 pub fn config(&self) -> &TieredStorageConfig {
179 &self.config
180 }
181
182 fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184 let key = (namespace.clone(), id.clone());
185 let mut access_map = self.access_info.write();
186
187 let info = access_map.entry(key).or_default();
188 info.last_access = Instant::now();
189 info.access_count += 1;
190 info.tier = tier;
191
192 match tier {
194 StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195 StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196 StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197 };
198 }
199
200 fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202 let access_map = self.access_info.read();
203 access_map
204 .get(&(namespace.clone(), id.clone()))
205 .map(|info| info.tier)
206 }
207
208 pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210 let current_tier = self.get_tier(namespace, id);
211
212 match current_tier {
213 Some(StorageTier::Warm) => {
214 let vectors = self
216 .warm_storage
217 .get(namespace, std::slice::from_ref(id))
218 .await?;
219 if !vectors.is_empty() {
220 self.hot_storage.upsert(namespace, vectors).await?;
221 self.warm_storage
222 .delete(namespace, std::slice::from_ref(id))
223 .await?;
224
225 self.update_tier(namespace, id, StorageTier::Hot);
226 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230 return Ok(true);
231 }
232 }
233 Some(StorageTier::Cold) => {
234 let vectors = self
236 .cold_storage
237 .get(namespace, std::slice::from_ref(id))
238 .await?;
239 if !vectors.is_empty() {
240 let should_be_hot = {
242 let access_map = self.access_info.read();
243 access_map
244 .get(&(namespace.clone(), id.clone()))
245 .map(|info| info.access_count > 10)
246 .unwrap_or(false)
247 };
248
249 if should_be_hot {
250 self.hot_storage.upsert(namespace, vectors).await?;
251 self.update_tier(namespace, id, StorageTier::Hot);
252 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254 } else {
255 self.warm_storage.upsert(namespace, vectors).await?;
256 self.update_tier(namespace, id, StorageTier::Warm);
257 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258 }
259 return Ok(true);
264 }
265 }
266 _ => {}
267 }
268
269 Ok(false)
270 }
271
272 pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274 let current_tier = self.get_tier(namespace, id);
275
276 match current_tier {
277 Some(StorageTier::Hot) => {
278 let vectors = self
280 .hot_storage
281 .get(namespace, std::slice::from_ref(id))
282 .await?;
283 if !vectors.is_empty() {
284 self.warm_storage.upsert(namespace, vectors).await?;
285 self.hot_storage
286 .delete(namespace, std::slice::from_ref(id))
287 .await?;
288
289 self.update_tier(namespace, id, StorageTier::Warm);
290 self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291 self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294 return Ok(true);
295 }
296 }
297 Some(StorageTier::Warm) => {
298 let vectors = self
300 .warm_storage
301 .get(namespace, std::slice::from_ref(id))
302 .await?;
303 if !vectors.is_empty() {
304 self.cold_storage.upsert(namespace, vectors).await?;
305 self.warm_storage
306 .delete(namespace, std::slice::from_ref(id))
307 .await?;
308
309 self.update_tier(namespace, id, StorageTier::Cold);
310 self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312 self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314 return Ok(true);
315 }
316 }
317 _ => {}
318 }
319
320 Ok(false)
321 }
322
323 fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325 let mut access_map = self.access_info.write();
326 let key = (namespace.clone(), id.clone());
327 let info = access_map.entry(key).or_default();
328 info.tier = tier;
329 }
330
331 pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
333 if !self.config.auto_tier_enabled {
334 return Ok(TieringResult::default());
335 }
336
337 let now = Instant::now();
338 let mut to_demote_to_warm = Vec::new();
339 let mut to_demote_to_cold = Vec::new();
340
341 {
343 let access_map = self.access_info.read();
344 for ((namespace, id), info) in access_map.iter() {
345 let elapsed = now.duration_since(info.last_access);
346
347 match info.tier {
348 StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
349 to_demote_to_warm.push((namespace.clone(), id.clone()));
350 }
351 StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
352 to_demote_to_cold.push((namespace.clone(), id.clone()));
353 }
354 _ => {}
355 }
356 }
357 }
358
359 let mut demoted_to_warm = 0;
361 let mut demoted_to_cold = 0;
362
363 for (namespace, id) in to_demote_to_warm {
364 if self.demote(&namespace, &id).await? {
365 demoted_to_warm += 1;
366 }
367 }
368
369 for (namespace, id) in to_demote_to_cold {
370 if self.demote(&namespace, &id).await? {
371 demoted_to_cold += 1;
372 }
373 }
374
375 Ok(TieringResult {
376 demoted_to_warm,
377 demoted_to_cold,
378 promoted_to_hot: 0,
379 promoted_to_warm: 0,
380 })
381 }
382
383 pub fn stats(&self) -> TieredStorageStats {
385 TieredStorageStats {
386 hot_count: self.stats.hot_count.load(Ordering::Relaxed),
387 warm_count: self.stats.warm_count.load(Ordering::Relaxed),
388 cold_count: self.stats.cold_count.load(Ordering::Relaxed),
389 hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
390 warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
391 cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
392 promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
393 demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
394 demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
395 }
396 }
397
398 pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
400 let access_map = self.access_info.read();
401 let mut hot = 0u64;
402 let mut warm = 0u64;
403 let mut cold = 0u64;
404
405 for ((ns, _), info) in access_map.iter() {
406 if ns == namespace {
407 match info.tier {
408 StorageTier::Hot => hot += 1,
409 StorageTier::Warm => warm += 1,
410 StorageTier::Cold => cold += 1,
411 }
412 }
413 }
414
415 TierDistribution { hot, warm, cold }
416 }
417}
418
419#[derive(Debug, Clone, Default)]
421pub struct TieringResult {
422 pub demoted_to_warm: u64,
424 pub demoted_to_cold: u64,
426 pub promoted_to_hot: u64,
428 pub promoted_to_warm: u64,
430}
431
432#[derive(Debug, Clone)]
434pub struct TierDistribution {
435 pub hot: u64,
436 pub warm: u64,
437 pub cold: u64,
438}
439
440#[async_trait]
441impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
442where
443 H: VectorStorage,
444 W: VectorStorage,
445 C: VectorStorage + Clone + Send + Sync + 'static,
446{
447 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
448 let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
450
451 let cold_vectors = vectors.clone();
453
454 let count = self.hot_storage.upsert(namespace, vectors).await?;
456
457 let cold = self.cold_storage.clone();
460 let cold_ns = namespace.clone();
461 tokio::spawn(async move {
462 if let Err(e) = cold.ensure_namespace(&cold_ns).await {
463 tracing::error!(
464 error = %e,
465 namespace = %cold_ns,
466 "Cold tier namespace ensure failed (S3 flush aborted)"
467 );
468 return;
469 }
470 if let Err(e) = cold.upsert(&cold_ns, cold_vectors).await {
471 tracing::error!(
472 error = %e,
473 namespace = %cold_ns,
474 "Cold tier S3 flush failed — data is durable in hot tier"
475 );
476 }
477 });
478
479 for id in &ids {
480 self.update_tier(namespace, id, StorageTier::Hot);
481 self.record_access(namespace, id, StorageTier::Hot);
482 }
483
484 self.stats
485 .hot_count
486 .fetch_add(count as u64, Ordering::Relaxed);
487 Ok(count)
488 }
489
490 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
491 let mut results = Vec::with_capacity(ids.len());
492 let mut remaining_ids: Vec<VectorId> = ids.to_vec();
493
494 let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
496 Ok(v) => v,
497 Err(DakeraError::NamespaceNotFound(_)) => vec![],
498 Err(e) => return Err(e),
499 };
500 for v in &hot_results {
501 self.record_access(namespace, &v.id, StorageTier::Hot);
502 }
503
504 let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
506 remaining_ids.retain(|id| !found_ids.contains(id));
507 results.extend(hot_results);
508
509 if remaining_ids.is_empty() {
510 return Ok(results);
511 }
512
513 let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
515 Ok(v) => v,
516 Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
517 Err(e) => return Err(e),
518 };
519 for v in &warm_results {
520 self.record_access(namespace, &v.id, StorageTier::Warm);
521 }
522
523 let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
524 remaining_ids.retain(|id| !found_ids.contains(id));
525 results.extend(warm_results);
526
527 if remaining_ids.is_empty() {
528 return Ok(results);
529 }
530
531 let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
533 Ok(v) => v,
534 Err(DakeraError::NamespaceNotFound(_)) => vec![],
535 Err(e) => return Err(e),
536 };
537 for v in &cold_results {
538 self.record_access(namespace, &v.id, StorageTier::Cold);
539 }
540 results.extend(cold_results);
541
542 Ok(results)
543 }
544
545 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
546 let mut seen = std::collections::HashSet::new();
547 let mut results = Vec::new();
548
549 let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
551 match res {
552 Ok(v) => Ok(v),
553 Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
554 Err(e) => Err(e),
555 }
556 };
557
558 for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
562 if seen.insert(v.id.clone()) {
563 results.push(v);
564 }
565 }
566 for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
567 if seen.insert(v.id.clone()) {
568 results.push(v);
569 }
570 }
571 for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
572 if seen.insert(v.id.clone()) {
573 results.push(v);
574 }
575 }
576
577 Ok(results)
578 }
579
580 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
581 let mut deleted = 0;
582
583 match self.hot_storage.delete(namespace, ids).await {
586 Ok(n) => deleted += n,
587 Err(DakeraError::NamespaceNotFound(_)) => {}
588 Err(e) => return Err(e),
589 }
590 match self.warm_storage.delete(namespace, ids).await {
591 Ok(n) => deleted += n,
592 Err(DakeraError::NamespaceNotFound(_)) => {}
593 Err(e) => return Err(e),
594 }
595 match self.cold_storage.delete(namespace, ids).await {
596 Ok(n) => deleted += n,
597 Err(DakeraError::NamespaceNotFound(_)) => {}
598 Err(e) => return Err(e),
599 }
600
601 {
603 let mut access_map = self.access_info.write();
604 for id in ids {
605 access_map.remove(&(namespace.clone(), id.clone()));
606 }
607 }
608
609 Ok(deleted)
610 }
611
612 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
613 Ok(self.hot_storage.namespace_exists(namespace).await?
615 || self.warm_storage.namespace_exists(namespace).await?
616 || self.cold_storage.namespace_exists(namespace).await?)
617 }
618
619 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
620 self.hot_storage.ensure_namespace(namespace).await?;
622 self.warm_storage.ensure_namespace(namespace).await?;
623 self.cold_storage.ensure_namespace(namespace).await?;
624 Ok(())
625 }
626
627 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
628 let cold = self.cold_storage.count(namespace).await?;
633 if cold > 0 {
634 return Ok(cold);
635 }
636 let hot = self.hot_storage.count(namespace).await?;
638 let warm = self.warm_storage.count(namespace).await?;
639 Ok(hot + warm)
640 }
641
642 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
643 if let Some(dim) = self.hot_storage.dimension(namespace).await? {
645 return Ok(Some(dim));
646 }
647 if let Some(dim) = self.warm_storage.dimension(namespace).await? {
648 return Ok(Some(dim));
649 }
650 self.cold_storage.dimension(namespace).await
651 }
652
653 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
654 let mut namespaces = std::collections::HashSet::new();
655
656 namespaces.extend(self.hot_storage.list_namespaces().await?);
657 namespaces.extend(self.warm_storage.list_namespaces().await?);
658 namespaces.extend(self.cold_storage.list_namespaces().await?);
659
660 Ok(namespaces.into_iter().collect())
661 }
662
663 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
664 let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
666 let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
667 let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
668
669 {
671 let mut access_map = self.access_info.write();
672 access_map.retain(|(ns, _), _| ns != namespace);
673 }
674
675 Ok(hot_deleted || warm_deleted || cold_deleted)
676 }
677
678 async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
679 let mut total = 0;
681 total += self.hot_storage.cleanup_expired(namespace).await?;
682 total += self.warm_storage.cleanup_expired(namespace).await?;
683 total += self.cold_storage.cleanup_expired(namespace).await?;
684 Ok(total)
685 }
686
687 async fn cleanup_all_expired(&self) -> Result<usize> {
688 let mut total = 0;
690 total += self.hot_storage.cleanup_all_expired().await?;
691 total += self.warm_storage.cleanup_all_expired().await?;
692 total += self.cold_storage.cleanup_all_expired().await?;
693 Ok(total)
694 }
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700 use crate::memory::InMemoryStorage;
701
702 fn create_test_vector(id: &str, dim: usize) -> Vector {
703 Vector {
704 id: id.to_string(),
705 values: vec![1.0; dim],
706 metadata: None,
707 ttl_seconds: None,
708 expires_at: None,
709 }
710 }
711
712 #[tokio::test]
713 async fn test_tiered_storage_basic() {
714 let config = TieredStorageConfig::default();
715 let storage = TieredStorage::new(
716 config,
717 InMemoryStorage::new(),
718 InMemoryStorage::new(),
719 InMemoryStorage::new(),
720 );
721
722 let namespace = "test".to_string();
723 storage.ensure_namespace(&namespace).await.unwrap();
724
725 let vectors = vec![create_test_vector("v1", 4)];
727 let count = storage.upsert(&namespace, vectors).await.unwrap();
728 assert_eq!(count, 1);
729
730 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
732 assert_eq!(results.len(), 1);
733
734 let stats = storage.stats();
735 assert_eq!(stats.hot_hits, 2); }
737
738 #[tokio::test]
739 async fn test_tiered_storage_promotion_demotion() {
740 let config = TieredStorageConfig::default();
741 let storage = TieredStorage::new(
742 config,
743 InMemoryStorage::new(),
744 InMemoryStorage::new(),
745 InMemoryStorage::new(),
746 );
747
748 let namespace = "test".to_string();
749 storage.ensure_namespace(&namespace).await.unwrap();
750
751 storage
753 .upsert(&namespace, vec![create_test_vector("v1", 4)])
754 .await
755 .unwrap();
756
757 assert_eq!(
759 storage.get_tier(&namespace, &"v1".to_string()),
760 Some(StorageTier::Hot)
761 );
762
763 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
765 assert!(demoted);
766 assert_eq!(
767 storage.get_tier(&namespace, &"v1".to_string()),
768 Some(StorageTier::Warm)
769 );
770
771 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
773 assert!(demoted);
774 assert_eq!(
775 storage.get_tier(&namespace, &"v1".to_string()),
776 Some(StorageTier::Cold)
777 );
778
779 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
781 assert_eq!(results.len(), 1);
782
783 let stats = storage.stats();
784 assert_eq!(stats.demotions_to_warm, 1);
785 assert_eq!(stats.demotions_to_cold, 1);
786 }
787
788 #[tokio::test]
789 async fn test_tiered_storage_multi_tier_get() {
790 let config = TieredStorageConfig::default();
791 let storage = TieredStorage::new(
792 config,
793 InMemoryStorage::new(),
794 InMemoryStorage::new(),
795 InMemoryStorage::new(),
796 );
797
798 let namespace = "test".to_string();
799 storage.ensure_namespace(&namespace).await.unwrap();
800
801 for i in 0..3 {
803 storage
804 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
805 .await
806 .unwrap();
807 }
808
809 storage.demote(&namespace, &"v1".to_string()).await.unwrap();
811 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
812 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
813
814 let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
816 let results = storage.get(&namespace, &ids).await.unwrap();
817 assert_eq!(results.len(), 3);
818 }
819
820 #[tokio::test]
821 async fn test_tier_distribution() {
822 let config = TieredStorageConfig::default();
823 let storage = TieredStorage::new(
824 config,
825 InMemoryStorage::new(),
826 InMemoryStorage::new(),
827 InMemoryStorage::new(),
828 );
829
830 let namespace = "test".to_string();
831 storage.ensure_namespace(&namespace).await.unwrap();
832
833 for i in 0..5 {
835 storage
836 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
837 .await
838 .unwrap();
839 }
840
841 storage.demote(&namespace, &"v3".to_string()).await.unwrap();
843 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
844 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
845
846 let dist = storage.tier_distribution(&namespace);
847 assert_eq!(dist.hot, 3);
848 assert_eq!(dist.warm, 1);
849 assert_eq!(dist.cold, 1);
850 }
851}