Skip to main content

storage/
tiered.rs

1//! Tiered Storage for Buffer
2//!
3//! Automatic data tiering based on access patterns:
4//! - Hot tier: In-memory for frequently accessed data
5//! - Warm tier: Local disk cache for recent data
6//! - Cold tier: Object storage for infrequently accessed data
7
8use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18/// Tiered storage configuration
19#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21    /// Hot tier capacity (number of vectors)
22    pub hot_tier_capacity: usize,
23    /// Time before demoting from hot to warm
24    pub hot_to_warm_threshold: Duration,
25    /// Time before demoting from warm to cold
26    pub warm_to_cold_threshold: Duration,
27    /// Enable automatic tiering
28    pub auto_tier_enabled: bool,
29    /// Tier check interval
30    pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34    fn default() -> Self {
35        Self {
36            hot_tier_capacity: 100_000,
37            hot_to_warm_threshold: Duration::from_secs(3600), // 1 hour
38            warm_to_cold_threshold: Duration::from_secs(86400), // 24 hours
39            auto_tier_enabled: true,
40            tier_check_interval: Duration::from_secs(300), // 5 minutes
41        }
42    }
43}
44
45/// Storage tier for a piece of data
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48    /// In-memory, fastest access
49    Hot,
50    /// Local disk, fast access
51    Warm,
52    /// Object storage, slowest access
53    Cold,
54}
55
56impl StorageTier {
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            StorageTier::Hot => "hot",
60            StorageTier::Warm => "warm",
61            StorageTier::Cold => "cold",
62        }
63    }
64}
65
66/// Access tracking for tiering decisions
67#[derive(Debug, Clone)]
68struct AccessInfo {
69    /// Last access timestamp
70    last_access: Instant,
71    /// Total access count
72    access_count: u64,
73    /// Current tier
74    tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78    fn default() -> Self {
79        Self {
80            last_access: Instant::now(),
81            access_count: 0,
82            tier: StorageTier::Hot,
83        }
84    }
85}
86
87/// Statistics for tiered storage
88#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90    /// Vectors in hot tier
91    pub hot_count: u64,
92    /// Vectors in warm tier
93    pub warm_count: u64,
94    /// Vectors in cold tier
95    pub cold_count: u64,
96    /// Hot tier hits
97    pub hot_hits: u64,
98    /// Warm tier hits
99    pub warm_hits: u64,
100    /// Cold tier hits
101    pub cold_hits: u64,
102    /// Promotions to hot tier
103    pub promotions_to_hot: u64,
104    /// Demotions to warm tier
105    pub demotions_to_warm: u64,
106    /// Demotions to cold tier
107    pub demotions_to_cold: u64,
108}
109
110/// Tiered storage manager
111pub struct TieredStorage<H, W, C> {
112    /// Configuration
113    config: TieredStorageConfig,
114    /// Hot tier storage (in-memory)
115    hot_storage: H,
116    /// Warm tier storage (disk cache)
117    warm_storage: W,
118    /// Cold tier storage (object storage)
119    cold_storage: C,
120    /// Access tracking per vector
121    access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122    /// Statistics
123    stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127    hot_count: AtomicU64,
128    warm_count: AtomicU64,
129    cold_count: AtomicU64,
130    hot_hits: AtomicU64,
131    warm_hits: AtomicU64,
132    cold_hits: AtomicU64,
133    promotions_to_hot: AtomicU64,
134    demotions_to_warm: AtomicU64,
135    demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139    fn default() -> Self {
140        Self {
141            hot_count: AtomicU64::new(0),
142            warm_count: AtomicU64::new(0),
143            cold_count: AtomicU64::new(0),
144            hot_hits: AtomicU64::new(0),
145            warm_hits: AtomicU64::new(0),
146            cold_hits: AtomicU64::new(0),
147            promotions_to_hot: AtomicU64::new(0),
148            demotions_to_warm: AtomicU64::new(0),
149            demotions_to_cold: AtomicU64::new(0),
150        }
151    }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156    H: VectorStorage,
157    W: VectorStorage,
158    C: VectorStorage,
159{
160    /// Create a new tiered storage
161    pub fn new(
162        config: TieredStorageConfig,
163        hot_storage: H,
164        warm_storage: W,
165        cold_storage: C,
166    ) -> Self {
167        Self {
168            config,
169            hot_storage,
170            warm_storage,
171            cold_storage,
172            access_info: RwLock::new(HashMap::new()),
173            stats: TieredStorageStatsInner::default(),
174        }
175    }
176
177    /// Get the tiered storage configuration
178    pub fn config(&self) -> &TieredStorageConfig {
179        &self.config
180    }
181
182    /// Record access to a vector
183    fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184        let key = (namespace.clone(), id.clone());
185        let mut access_map = self.access_info.write();
186
187        let info = access_map.entry(key).or_default();
188        info.last_access = Instant::now();
189        info.access_count += 1;
190        info.tier = tier;
191
192        // Update hit counters
193        match tier {
194            StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195            StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196            StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197        };
198    }
199
200    /// Get the current tier for a vector
201    fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202        let access_map = self.access_info.read();
203        access_map
204            .get(&(namespace.clone(), id.clone()))
205            .map(|info| info.tier)
206    }
207
208    /// Promote a vector to a higher tier
209    pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210        let current_tier = self.get_tier(namespace, id);
211
212        match current_tier {
213            Some(StorageTier::Warm) => {
214                // Promote warm -> hot
215                let vectors = self
216                    .warm_storage
217                    .get(namespace, std::slice::from_ref(id))
218                    .await?;
219                if !vectors.is_empty() {
220                    self.hot_storage.upsert(namespace, vectors).await?;
221                    self.warm_storage
222                        .delete(namespace, std::slice::from_ref(id))
223                        .await?;
224
225                    self.update_tier(namespace, id, StorageTier::Hot);
226                    self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227                    self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230                    return Ok(true);
231                }
232            }
233            Some(StorageTier::Cold) => {
234                // Promote cold -> warm (or directly to hot if frequently accessed)
235                let vectors = self
236                    .cold_storage
237                    .get(namespace, std::slice::from_ref(id))
238                    .await?;
239                if !vectors.is_empty() {
240                    // Check if should go directly to hot based on access frequency
241                    let should_be_hot = {
242                        let access_map = self.access_info.read();
243                        access_map
244                            .get(&(namespace.clone(), id.clone()))
245                            .map(|info| info.access_count > 10)
246                            .unwrap_or(false)
247                    };
248
249                    if should_be_hot {
250                        self.hot_storage.upsert(namespace, vectors).await?;
251                        self.update_tier(namespace, id, StorageTier::Hot);
252                        self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253                        self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254                    } else {
255                        self.warm_storage.upsert(namespace, vectors).await?;
256                        self.update_tier(namespace, id, StorageTier::Warm);
257                        self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258                    }
259                    // Cold tier is the durable source of truth — never delete on promotion.
260                    // The tier map tracks which tier is "active" for reads; cold remains
261                    // as the persistent backup in case warm/hot are lost on restart.
262
263                    return Ok(true);
264                }
265            }
266            _ => {}
267        }
268
269        Ok(false)
270    }
271
272    /// Demote a vector to a lower tier
273    pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274        let current_tier = self.get_tier(namespace, id);
275
276        match current_tier {
277            Some(StorageTier::Hot) => {
278                // Demote hot -> warm
279                let vectors = self
280                    .hot_storage
281                    .get(namespace, std::slice::from_ref(id))
282                    .await?;
283                if !vectors.is_empty() {
284                    self.warm_storage.upsert(namespace, vectors).await?;
285                    self.hot_storage
286                        .delete(namespace, std::slice::from_ref(id))
287                        .await?;
288
289                    self.update_tier(namespace, id, StorageTier::Warm);
290                    self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291                    self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292                    self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294                    return Ok(true);
295                }
296            }
297            Some(StorageTier::Warm) => {
298                // Demote warm -> cold
299                let vectors = self
300                    .warm_storage
301                    .get(namespace, std::slice::from_ref(id))
302                    .await?;
303                if !vectors.is_empty() {
304                    self.cold_storage.upsert(namespace, vectors).await?;
305                    self.warm_storage
306                        .delete(namespace, std::slice::from_ref(id))
307                        .await?;
308
309                    self.update_tier(namespace, id, StorageTier::Cold);
310                    self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312                    self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314                    return Ok(true);
315                }
316            }
317            _ => {}
318        }
319
320        Ok(false)
321    }
322
323    /// Update tier tracking
324    fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325        let mut access_map = self.access_info.write();
326        let key = (namespace.clone(), id.clone());
327        let info = access_map.entry(key).or_default();
328        info.tier = tier;
329    }
330
331    /// Run automatic tiering based on access patterns
332    pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
333        if !self.config.auto_tier_enabled {
334            return Ok(TieringResult::default());
335        }
336
337        let now = Instant::now();
338        let mut to_demote_to_warm = Vec::new();
339        let mut to_demote_to_cold = Vec::new();
340
341        // Collect vectors to demote
342        {
343            let access_map = self.access_info.read();
344            for ((namespace, id), info) in access_map.iter() {
345                let elapsed = now.duration_since(info.last_access);
346
347                match info.tier {
348                    StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
349                        to_demote_to_warm.push((namespace.clone(), id.clone()));
350                    }
351                    StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
352                        to_demote_to_cold.push((namespace.clone(), id.clone()));
353                    }
354                    _ => {}
355                }
356            }
357        }
358
359        // Execute demotions
360        let mut demoted_to_warm = 0;
361        let mut demoted_to_cold = 0;
362
363        for (namespace, id) in to_demote_to_warm {
364            if self.demote(&namespace, &id).await? {
365                demoted_to_warm += 1;
366            }
367        }
368
369        for (namespace, id) in to_demote_to_cold {
370            if self.demote(&namespace, &id).await? {
371                demoted_to_cold += 1;
372            }
373        }
374
375        Ok(TieringResult {
376            demoted_to_warm,
377            demoted_to_cold,
378            promoted_to_hot: 0,
379            promoted_to_warm: 0,
380        })
381    }
382
383    /// Get storage statistics
384    pub fn stats(&self) -> TieredStorageStats {
385        TieredStorageStats {
386            hot_count: self.stats.hot_count.load(Ordering::Relaxed),
387            warm_count: self.stats.warm_count.load(Ordering::Relaxed),
388            cold_count: self.stats.cold_count.load(Ordering::Relaxed),
389            hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
390            warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
391            cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
392            promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
393            demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
394            demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
395        }
396    }
397
398    /// Get tier distribution by namespace
399    pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
400        let access_map = self.access_info.read();
401        let mut hot = 0u64;
402        let mut warm = 0u64;
403        let mut cold = 0u64;
404
405        for ((ns, _), info) in access_map.iter() {
406            if ns == namespace {
407                match info.tier {
408                    StorageTier::Hot => hot += 1,
409                    StorageTier::Warm => warm += 1,
410                    StorageTier::Cold => cold += 1,
411                }
412            }
413        }
414
415        TierDistribution { hot, warm, cold }
416    }
417}
418
419/// Result of automatic tiering
420#[derive(Debug, Clone, Default)]
421pub struct TieringResult {
422    /// Vectors demoted to warm tier
423    pub demoted_to_warm: u64,
424    /// Vectors demoted to cold tier
425    pub demoted_to_cold: u64,
426    /// Vectors promoted to hot tier
427    pub promoted_to_hot: u64,
428    /// Vectors promoted to warm tier
429    pub promoted_to_warm: u64,
430}
431
432/// Tier distribution for a namespace
433#[derive(Debug, Clone)]
434pub struct TierDistribution {
435    pub hot: u64,
436    pub warm: u64,
437    pub cold: u64,
438}
439
440#[async_trait]
441impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
442where
443    H: VectorStorage,
444    W: VectorStorage,
445    C: VectorStorage,
446{
447    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
448        // Track access info before moving vectors into storage
449        let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
450
451        // Always persist to cold tier (S3) for durability — this is the
452        // source of truth that survives restarts.
453        let cold_vectors = vectors.clone();
454        self.cold_storage.ensure_namespace(namespace).await?;
455        self.cold_storage.upsert(namespace, cold_vectors).await?;
456
457        // Hot tier serves reads with lowest latency
458        let count = self.hot_storage.upsert(namespace, vectors).await?;
459
460        for id in &ids {
461            self.update_tier(namespace, id, StorageTier::Hot);
462            self.record_access(namespace, id, StorageTier::Hot);
463        }
464
465        self.stats
466            .hot_count
467            .fetch_add(count as u64, Ordering::Relaxed);
468        Ok(count)
469    }
470
471    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
472        let mut results = Vec::with_capacity(ids.len());
473        let mut remaining_ids: Vec<VectorId> = ids.to_vec();
474
475        // Try hot tier first (NamespaceNotFound is normal after restart when hot cache is empty)
476        let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
477            Ok(v) => v,
478            Err(DakeraError::NamespaceNotFound(_)) => vec![],
479            Err(e) => return Err(e),
480        };
481        for v in &hot_results {
482            self.record_access(namespace, &v.id, StorageTier::Hot);
483        }
484
485        // Remove found IDs
486        let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
487        remaining_ids.retain(|id| !found_ids.contains(id));
488        results.extend(hot_results);
489
490        if remaining_ids.is_empty() {
491            return Ok(results);
492        }
493
494        // Try warm tier (NamespaceNotFound is normal after restart when warm cache is empty)
495        let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
496            Ok(v) => v,
497            Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
498            Err(e) => return Err(e),
499        };
500        for v in &warm_results {
501            self.record_access(namespace, &v.id, StorageTier::Warm);
502        }
503
504        let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
505        remaining_ids.retain(|id| !found_ids.contains(id));
506        results.extend(warm_results);
507
508        if remaining_ids.is_empty() {
509            return Ok(results);
510        }
511
512        // Try cold tier
513        let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
514            Ok(v) => v,
515            Err(DakeraError::NamespaceNotFound(_)) => vec![],
516            Err(e) => return Err(e),
517        };
518        for v in &cold_results {
519            self.record_access(namespace, &v.id, StorageTier::Cold);
520        }
521        results.extend(cold_results);
522
523        Ok(results)
524    }
525
526    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
527        let mut seen = std::collections::HashSet::new();
528        let mut results = Vec::new();
529
530        // Helper: treat NamespaceNotFound as empty — normal for hot/warm after restart.
531        let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
532            match res {
533                Ok(v) => Ok(v),
534                Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
535                Err(e) => Err(e),
536            }
537        };
538
539        // Gather from all tiers, preferring hot over warm over cold.
540        // Deduplicate by vector ID since write-through means a vector
541        // can exist in both hot and cold simultaneously.
542        for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
543            if seen.insert(v.id.clone()) {
544                results.push(v);
545            }
546        }
547        for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
548            if seen.insert(v.id.clone()) {
549                results.push(v);
550            }
551        }
552        for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
553            if seen.insert(v.id.clone()) {
554                results.push(v);
555            }
556        }
557
558        Ok(results)
559    }
560
561    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
562        let mut deleted = 0;
563
564        // Delete from all tiers. Tolerate NamespaceNotFound from individual tiers
565        // since data may only reside in a subset of tiers (e.g. cold but not hot).
566        match self.hot_storage.delete(namespace, ids).await {
567            Ok(n) => deleted += n,
568            Err(DakeraError::NamespaceNotFound(_)) => {}
569            Err(e) => return Err(e),
570        }
571        match self.warm_storage.delete(namespace, ids).await {
572            Ok(n) => deleted += n,
573            Err(DakeraError::NamespaceNotFound(_)) => {}
574            Err(e) => return Err(e),
575        }
576        match self.cold_storage.delete(namespace, ids).await {
577            Ok(n) => deleted += n,
578            Err(DakeraError::NamespaceNotFound(_)) => {}
579            Err(e) => return Err(e),
580        }
581
582        // Remove from tracking
583        {
584            let mut access_map = self.access_info.write();
585            for id in ids {
586                access_map.remove(&(namespace.clone(), id.clone()));
587            }
588        }
589
590        Ok(deleted)
591    }
592
593    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
594        // Check any tier
595        Ok(self.hot_storage.namespace_exists(namespace).await?
596            || self.warm_storage.namespace_exists(namespace).await?
597            || self.cold_storage.namespace_exists(namespace).await?)
598    }
599
600    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
601        // Ensure in all tiers
602        self.hot_storage.ensure_namespace(namespace).await?;
603        self.warm_storage.ensure_namespace(namespace).await?;
604        self.cold_storage.ensure_namespace(namespace).await?;
605        Ok(())
606    }
607
608    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
609        // With write-through, cold tier is the source of truth for total count.
610        // Hot/warm are caches that hold subsets of the same data.
611        // Use cold count as the baseline, then add any vectors that are ONLY
612        // in hot or warm (shouldn't happen with write-through, but safe).
613        let cold = self.cold_storage.count(namespace).await?;
614        if cold > 0 {
615            return Ok(cold);
616        }
617        // Fallback: if cold is empty, count from hot + warm (non-tiered data)
618        let hot = self.hot_storage.count(namespace).await?;
619        let warm = self.warm_storage.count(namespace).await?;
620        Ok(hot + warm)
621    }
622
623    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
624        // Check hot first, then warm, then cold
625        if let Some(dim) = self.hot_storage.dimension(namespace).await? {
626            return Ok(Some(dim));
627        }
628        if let Some(dim) = self.warm_storage.dimension(namespace).await? {
629            return Ok(Some(dim));
630        }
631        self.cold_storage.dimension(namespace).await
632    }
633
634    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
635        let mut namespaces = std::collections::HashSet::new();
636
637        namespaces.extend(self.hot_storage.list_namespaces().await?);
638        namespaces.extend(self.warm_storage.list_namespaces().await?);
639        namespaces.extend(self.cold_storage.list_namespaces().await?);
640
641        Ok(namespaces.into_iter().collect())
642    }
643
644    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
645        // Delete from all tiers
646        let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
647        let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
648        let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
649
650        // Remove from access tracking
651        {
652            let mut access_map = self.access_info.write();
653            access_map.retain(|(ns, _), _| ns != namespace);
654        }
655
656        Ok(hot_deleted || warm_deleted || cold_deleted)
657    }
658
659    async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
660        // Cleanup from all tiers
661        let mut total = 0;
662        total += self.hot_storage.cleanup_expired(namespace).await?;
663        total += self.warm_storage.cleanup_expired(namespace).await?;
664        total += self.cold_storage.cleanup_expired(namespace).await?;
665        Ok(total)
666    }
667
668    async fn cleanup_all_expired(&self) -> Result<usize> {
669        // Cleanup from all tiers
670        let mut total = 0;
671        total += self.hot_storage.cleanup_all_expired().await?;
672        total += self.warm_storage.cleanup_all_expired().await?;
673        total += self.cold_storage.cleanup_all_expired().await?;
674        Ok(total)
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use crate::memory::InMemoryStorage;
682
683    fn create_test_vector(id: &str, dim: usize) -> Vector {
684        Vector {
685            id: id.to_string(),
686            values: vec![1.0; dim],
687            metadata: None,
688            ttl_seconds: None,
689            expires_at: None,
690        }
691    }
692
693    #[tokio::test]
694    async fn test_tiered_storage_basic() {
695        let config = TieredStorageConfig::default();
696        let storage = TieredStorage::new(
697            config,
698            InMemoryStorage::new(),
699            InMemoryStorage::new(),
700            InMemoryStorage::new(),
701        );
702
703        let namespace = "test".to_string();
704        storage.ensure_namespace(&namespace).await.unwrap();
705
706        // Upsert goes to hot tier
707        let vectors = vec![create_test_vector("v1", 4)];
708        let count = storage.upsert(&namespace, vectors).await.unwrap();
709        assert_eq!(count, 1);
710
711        // Get should find in hot tier
712        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
713        assert_eq!(results.len(), 1);
714
715        let stats = storage.stats();
716        assert_eq!(stats.hot_hits, 2); // One from upsert record_access, one from get
717    }
718
719    #[tokio::test]
720    async fn test_tiered_storage_promotion_demotion() {
721        let config = TieredStorageConfig::default();
722        let storage = TieredStorage::new(
723            config,
724            InMemoryStorage::new(),
725            InMemoryStorage::new(),
726            InMemoryStorage::new(),
727        );
728
729        let namespace = "test".to_string();
730        storage.ensure_namespace(&namespace).await.unwrap();
731
732        // Add to hot tier
733        storage
734            .upsert(&namespace, vec![create_test_vector("v1", 4)])
735            .await
736            .unwrap();
737
738        // Verify in hot
739        assert_eq!(
740            storage.get_tier(&namespace, &"v1".to_string()),
741            Some(StorageTier::Hot)
742        );
743
744        // Demote to warm
745        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
746        assert!(demoted);
747        assert_eq!(
748            storage.get_tier(&namespace, &"v1".to_string()),
749            Some(StorageTier::Warm)
750        );
751
752        // Demote to cold
753        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
754        assert!(demoted);
755        assert_eq!(
756            storage.get_tier(&namespace, &"v1".to_string()),
757            Some(StorageTier::Cold)
758        );
759
760        // Still accessible
761        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
762        assert_eq!(results.len(), 1);
763
764        let stats = storage.stats();
765        assert_eq!(stats.demotions_to_warm, 1);
766        assert_eq!(stats.demotions_to_cold, 1);
767    }
768
769    #[tokio::test]
770    async fn test_tiered_storage_multi_tier_get() {
771        let config = TieredStorageConfig::default();
772        let storage = TieredStorage::new(
773            config,
774            InMemoryStorage::new(),
775            InMemoryStorage::new(),
776            InMemoryStorage::new(),
777        );
778
779        let namespace = "test".to_string();
780        storage.ensure_namespace(&namespace).await.unwrap();
781
782        // Add vectors and demote some
783        for i in 0..3 {
784            storage
785                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
786                .await
787                .unwrap();
788        }
789
790        // v0 stays hot, v1 goes warm, v2 goes cold
791        storage.demote(&namespace, &"v1".to_string()).await.unwrap();
792        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
793        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
794
795        // Get all at once
796        let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
797        let results = storage.get(&namespace, &ids).await.unwrap();
798        assert_eq!(results.len(), 3);
799    }
800
801    #[tokio::test]
802    async fn test_tier_distribution() {
803        let config = TieredStorageConfig::default();
804        let storage = TieredStorage::new(
805            config,
806            InMemoryStorage::new(),
807            InMemoryStorage::new(),
808            InMemoryStorage::new(),
809        );
810
811        let namespace = "test".to_string();
812        storage.ensure_namespace(&namespace).await.unwrap();
813
814        // Add 5 vectors
815        for i in 0..5 {
816            storage
817                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
818                .await
819                .unwrap();
820        }
821
822        // Demote 2 to warm, 1 to cold
823        storage.demote(&namespace, &"v3".to_string()).await.unwrap();
824        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
825        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
826
827        let dist = storage.tier_distribution(&namespace);
828        assert_eq!(dist.hot, 3);
829        assert_eq!(dist.warm, 1);
830        assert_eq!(dist.cold, 1);
831    }
832}