Skip to main content

storage/
tiered.rs

1//! Tiered Storage for Buffer
2//!
3//! Automatic data tiering based on access patterns:
4//! - Hot tier: In-memory for frequently accessed data
5//! - Warm tier: Local disk cache for recent data
6//! - Cold tier: Object storage for infrequently accessed data
7
8use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18/// Tiered storage configuration
19#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21    /// Hot tier capacity (number of vectors)
22    pub hot_tier_capacity: usize,
23    /// Time before demoting from hot to warm
24    pub hot_to_warm_threshold: Duration,
25    /// Time before demoting from warm to cold
26    pub warm_to_cold_threshold: Duration,
27    /// Enable automatic tiering
28    pub auto_tier_enabled: bool,
29    /// Tier check interval
30    pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34    fn default() -> Self {
35        Self {
36            hot_tier_capacity: 100_000,
37            hot_to_warm_threshold: Duration::from_secs(3600), // 1 hour
38            warm_to_cold_threshold: Duration::from_secs(86400), // 24 hours
39            auto_tier_enabled: true,
40            tier_check_interval: Duration::from_secs(300), // 5 minutes
41        }
42    }
43}
44
45/// Storage tier for a piece of data
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48    /// In-memory, fastest access
49    Hot,
50    /// Local disk, fast access
51    Warm,
52    /// Object storage, slowest access
53    Cold,
54}
55
56impl StorageTier {
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            StorageTier::Hot => "hot",
60            StorageTier::Warm => "warm",
61            StorageTier::Cold => "cold",
62        }
63    }
64}
65
66/// Access tracking for tiering decisions
67#[derive(Debug, Clone)]
68struct AccessInfo {
69    /// Last access timestamp
70    last_access: Instant,
71    /// Total access count
72    access_count: u64,
73    /// Current tier
74    tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78    fn default() -> Self {
79        Self {
80            last_access: Instant::now(),
81            access_count: 0,
82            tier: StorageTier::Hot,
83        }
84    }
85}
86
87/// Statistics for tiered storage
88#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90    /// Vectors in hot tier
91    pub hot_count: u64,
92    /// Vectors in warm tier
93    pub warm_count: u64,
94    /// Vectors in cold tier
95    pub cold_count: u64,
96    /// Hot tier hits
97    pub hot_hits: u64,
98    /// Warm tier hits
99    pub warm_hits: u64,
100    /// Cold tier hits
101    pub cold_hits: u64,
102    /// Promotions to hot tier
103    pub promotions_to_hot: u64,
104    /// Demotions to warm tier
105    pub demotions_to_warm: u64,
106    /// Demotions to cold tier
107    pub demotions_to_cold: u64,
108}
109
110/// Tiered storage manager
111pub struct TieredStorage<H, W, C> {
112    /// Configuration
113    config: TieredStorageConfig,
114    /// Hot tier storage (in-memory)
115    hot_storage: H,
116    /// Warm tier storage (disk cache)
117    warm_storage: W,
118    /// Cold tier storage (object storage)
119    cold_storage: C,
120    /// Access tracking per vector
121    access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122    /// Statistics
123    stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127    hot_count: AtomicU64,
128    warm_count: AtomicU64,
129    cold_count: AtomicU64,
130    hot_hits: AtomicU64,
131    warm_hits: AtomicU64,
132    cold_hits: AtomicU64,
133    promotions_to_hot: AtomicU64,
134    demotions_to_warm: AtomicU64,
135    demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139    fn default() -> Self {
140        Self {
141            hot_count: AtomicU64::new(0),
142            warm_count: AtomicU64::new(0),
143            cold_count: AtomicU64::new(0),
144            hot_hits: AtomicU64::new(0),
145            warm_hits: AtomicU64::new(0),
146            cold_hits: AtomicU64::new(0),
147            promotions_to_hot: AtomicU64::new(0),
148            demotions_to_warm: AtomicU64::new(0),
149            demotions_to_cold: AtomicU64::new(0),
150        }
151    }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156    H: VectorStorage,
157    W: VectorStorage,
158    C: VectorStorage,
159{
160    /// Create a new tiered storage
161    pub fn new(
162        config: TieredStorageConfig,
163        hot_storage: H,
164        warm_storage: W,
165        cold_storage: C,
166    ) -> Self {
167        Self {
168            config,
169            hot_storage,
170            warm_storage,
171            cold_storage,
172            access_info: RwLock::new(HashMap::new()),
173            stats: TieredStorageStatsInner::default(),
174        }
175    }
176
177    /// Get the tiered storage configuration
178    pub fn config(&self) -> &TieredStorageConfig {
179        &self.config
180    }
181
182    /// Record access to a vector
183    fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184        let key = (namespace.clone(), id.clone());
185        let mut access_map = self.access_info.write();
186
187        let info = access_map.entry(key).or_default();
188        info.last_access = Instant::now();
189        info.access_count += 1;
190        info.tier = tier;
191
192        // Update hit counters
193        match tier {
194            StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195            StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196            StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197        };
198    }
199
200    /// Get the current tier for a vector
201    fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202        let access_map = self.access_info.read();
203        access_map
204            .get(&(namespace.clone(), id.clone()))
205            .map(|info| info.tier)
206    }
207
208    /// Promote a vector to a higher tier
209    pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210        let current_tier = self.get_tier(namespace, id);
211
212        match current_tier {
213            Some(StorageTier::Warm) => {
214                // Promote warm -> hot
215                let vectors = self
216                    .warm_storage
217                    .get(namespace, std::slice::from_ref(id))
218                    .await?;
219                if !vectors.is_empty() {
220                    self.hot_storage.upsert(namespace, vectors).await?;
221                    self.warm_storage
222                        .delete(namespace, std::slice::from_ref(id))
223                        .await?;
224
225                    self.update_tier(namespace, id, StorageTier::Hot);
226                    self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227                    self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230                    return Ok(true);
231                }
232            }
233            Some(StorageTier::Cold) => {
234                // Promote cold -> warm (or directly to hot if frequently accessed)
235                let vectors = self
236                    .cold_storage
237                    .get(namespace, std::slice::from_ref(id))
238                    .await?;
239                if !vectors.is_empty() {
240                    // Check if should go directly to hot based on access frequency
241                    let should_be_hot = {
242                        let access_map = self.access_info.read();
243                        access_map
244                            .get(&(namespace.clone(), id.clone()))
245                            .map(|info| info.access_count > 10)
246                            .unwrap_or(false)
247                    };
248
249                    if should_be_hot {
250                        self.hot_storage.upsert(namespace, vectors).await?;
251                        self.update_tier(namespace, id, StorageTier::Hot);
252                        self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253                        self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254                    } else {
255                        self.warm_storage.upsert(namespace, vectors).await?;
256                        self.update_tier(namespace, id, StorageTier::Warm);
257                        self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258                    }
259                    // Cold tier is the durable source of truth — never delete on promotion.
260                    // The tier map tracks which tier is "active" for reads; cold remains
261                    // as the persistent backup in case warm/hot are lost on restart.
262
263                    return Ok(true);
264                }
265            }
266            _ => {}
267        }
268
269        Ok(false)
270    }
271
272    /// Demote a vector to a lower tier
273    pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274        let current_tier = self.get_tier(namespace, id);
275
276        match current_tier {
277            Some(StorageTier::Hot) => {
278                // Demote hot -> warm
279                let vectors = self
280                    .hot_storage
281                    .get(namespace, std::slice::from_ref(id))
282                    .await?;
283                if !vectors.is_empty() {
284                    self.warm_storage.upsert(namespace, vectors).await?;
285                    self.hot_storage
286                        .delete(namespace, std::slice::from_ref(id))
287                        .await?;
288
289                    self.update_tier(namespace, id, StorageTier::Warm);
290                    self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291                    self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292                    self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294                    return Ok(true);
295                }
296            }
297            Some(StorageTier::Warm) => {
298                // Demote warm -> cold
299                let vectors = self
300                    .warm_storage
301                    .get(namespace, std::slice::from_ref(id))
302                    .await?;
303                if !vectors.is_empty() {
304                    self.cold_storage.upsert(namespace, vectors).await?;
305                    self.warm_storage
306                        .delete(namespace, std::slice::from_ref(id))
307                        .await?;
308
309                    self.update_tier(namespace, id, StorageTier::Cold);
310                    self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312                    self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314                    return Ok(true);
315                }
316            }
317            _ => {}
318        }
319
320        Ok(false)
321    }
322
323    /// Update tier tracking
324    fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325        let mut access_map = self.access_info.write();
326        let key = (namespace.clone(), id.clone());
327        let info = access_map.entry(key).or_default();
328        info.tier = tier;
329    }
330
331    /// Run automatic tiering based on access patterns
332    pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
333        if !self.config.auto_tier_enabled {
334            return Ok(TieringResult::default());
335        }
336
337        let now = Instant::now();
338        let mut to_demote_to_warm = Vec::new();
339        let mut to_demote_to_cold = Vec::new();
340
341        // Collect vectors to demote
342        {
343            let access_map = self.access_info.read();
344            for ((namespace, id), info) in access_map.iter() {
345                let elapsed = now.duration_since(info.last_access);
346
347                match info.tier {
348                    StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
349                        to_demote_to_warm.push((namespace.clone(), id.clone()));
350                    }
351                    StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
352                        to_demote_to_cold.push((namespace.clone(), id.clone()));
353                    }
354                    _ => {}
355                }
356            }
357        }
358
359        // Execute demotions
360        let mut demoted_to_warm = 0;
361        let mut demoted_to_cold = 0;
362
363        for (namespace, id) in to_demote_to_warm {
364            if self.demote(&namespace, &id).await? {
365                demoted_to_warm += 1;
366            }
367        }
368
369        for (namespace, id) in to_demote_to_cold {
370            if self.demote(&namespace, &id).await? {
371                demoted_to_cold += 1;
372            }
373        }
374
375        Ok(TieringResult {
376            demoted_to_warm,
377            demoted_to_cold,
378            promoted_to_hot: 0,
379            promoted_to_warm: 0,
380        })
381    }
382
383    /// Get storage statistics
384    pub fn stats(&self) -> TieredStorageStats {
385        TieredStorageStats {
386            hot_count: self.stats.hot_count.load(Ordering::Relaxed),
387            warm_count: self.stats.warm_count.load(Ordering::Relaxed),
388            cold_count: self.stats.cold_count.load(Ordering::Relaxed),
389            hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
390            warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
391            cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
392            promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
393            demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
394            demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
395        }
396    }
397
398    /// Get tier distribution by namespace
399    pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
400        let access_map = self.access_info.read();
401        let mut hot = 0u64;
402        let mut warm = 0u64;
403        let mut cold = 0u64;
404
405        for ((ns, _), info) in access_map.iter() {
406            if ns == namespace {
407                match info.tier {
408                    StorageTier::Hot => hot += 1,
409                    StorageTier::Warm => warm += 1,
410                    StorageTier::Cold => cold += 1,
411                }
412            }
413        }
414
415        TierDistribution { hot, warm, cold }
416    }
417}
418
419/// Result of automatic tiering
420#[derive(Debug, Clone, Default)]
421pub struct TieringResult {
422    /// Vectors demoted to warm tier
423    pub demoted_to_warm: u64,
424    /// Vectors demoted to cold tier
425    pub demoted_to_cold: u64,
426    /// Vectors promoted to hot tier
427    pub promoted_to_hot: u64,
428    /// Vectors promoted to warm tier
429    pub promoted_to_warm: u64,
430}
431
432/// Tier distribution for a namespace
433#[derive(Debug, Clone)]
434pub struct TierDistribution {
435    pub hot: u64,
436    pub warm: u64,
437    pub cold: u64,
438}
439
440#[async_trait]
441impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
442where
443    H: VectorStorage,
444    W: VectorStorage,
445    C: VectorStorage + Clone + Send + Sync + 'static,
446{
447    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
448        // Track access info before moving vectors into storage
449        let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
450
451        // Clone vectors for the background cold-tier flush before consuming them.
452        let cold_vectors = vectors.clone();
453
454        // Write hot tier first — lowest latency path, immediately available for reads.
455        let count = self.hot_storage.upsert(namespace, vectors).await?;
456
457        // Flush to cold tier (S3) in the background — failure is logged but not fatal.
458        // The hot tier (RocksDB/in-memory) is the primary durable read path.
459        let cold = self.cold_storage.clone();
460        let cold_ns = namespace.clone();
461        tokio::spawn(async move {
462            if let Err(e) = cold.ensure_namespace(&cold_ns).await {
463                tracing::error!(
464                    error = %e,
465                    namespace = %cold_ns,
466                    "Cold tier namespace ensure failed (S3 flush aborted)"
467                );
468                return;
469            }
470            if let Err(e) = cold.upsert(&cold_ns, cold_vectors).await {
471                tracing::error!(
472                    error = %e,
473                    namespace = %cold_ns,
474                    "Cold tier S3 flush failed — data is durable in hot tier"
475                );
476            }
477        });
478
479        for id in &ids {
480            self.update_tier(namespace, id, StorageTier::Hot);
481            self.record_access(namespace, id, StorageTier::Hot);
482        }
483
484        self.stats
485            .hot_count
486            .fetch_add(count as u64, Ordering::Relaxed);
487        Ok(count)
488    }
489
490    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
491        let mut results = Vec::with_capacity(ids.len());
492        let mut remaining_ids: Vec<VectorId> = ids.to_vec();
493
494        // Try hot tier first (NamespaceNotFound is normal after restart when hot cache is empty)
495        let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
496            Ok(v) => v,
497            Err(DakeraError::NamespaceNotFound(_)) => vec![],
498            Err(e) => return Err(e),
499        };
500        for v in &hot_results {
501            self.record_access(namespace, &v.id, StorageTier::Hot);
502        }
503
504        // Remove found IDs
505        let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
506        remaining_ids.retain(|id| !found_ids.contains(id));
507        results.extend(hot_results);
508
509        if remaining_ids.is_empty() {
510            return Ok(results);
511        }
512
513        // Try warm tier (NamespaceNotFound is normal after restart when warm cache is empty)
514        let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
515            Ok(v) => v,
516            Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
517            Err(e) => return Err(e),
518        };
519        for v in &warm_results {
520            self.record_access(namespace, &v.id, StorageTier::Warm);
521        }
522
523        let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
524        remaining_ids.retain(|id| !found_ids.contains(id));
525        results.extend(warm_results);
526
527        if remaining_ids.is_empty() {
528            return Ok(results);
529        }
530
531        // Try cold tier
532        let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
533            Ok(v) => v,
534            Err(DakeraError::NamespaceNotFound(_)) => vec![],
535            Err(e) => return Err(e),
536        };
537        for v in &cold_results {
538            self.record_access(namespace, &v.id, StorageTier::Cold);
539        }
540        results.extend(cold_results);
541
542        Ok(results)
543    }
544
545    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
546        let mut seen = std::collections::HashSet::new();
547        let mut results = Vec::new();
548
549        // Helper: treat NamespaceNotFound as empty — normal for hot/warm after restart.
550        let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
551            match res {
552                Ok(v) => Ok(v),
553                Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
554                Err(e) => Err(e),
555            }
556        };
557
558        // Gather from all tiers, preferring hot over warm over cold.
559        // Deduplicate by vector ID since write-through means a vector
560        // can exist in both hot and cold simultaneously.
561        for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
562            if seen.insert(v.id.clone()) {
563                results.push(v);
564            }
565        }
566        for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
567            if seen.insert(v.id.clone()) {
568                results.push(v);
569            }
570        }
571        for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
572            if seen.insert(v.id.clone()) {
573                results.push(v);
574            }
575        }
576
577        Ok(results)
578    }
579
580    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
581        let mut deleted = 0;
582
583        // Delete from all tiers. Tolerate NamespaceNotFound from individual tiers
584        // since data may only reside in a subset of tiers (e.g. cold but not hot).
585        match self.hot_storage.delete(namespace, ids).await {
586            Ok(n) => deleted += n,
587            Err(DakeraError::NamespaceNotFound(_)) => {}
588            Err(e) => return Err(e),
589        }
590        match self.warm_storage.delete(namespace, ids).await {
591            Ok(n) => deleted += n,
592            Err(DakeraError::NamespaceNotFound(_)) => {}
593            Err(e) => return Err(e),
594        }
595        match self.cold_storage.delete(namespace, ids).await {
596            Ok(n) => deleted += n,
597            Err(DakeraError::NamespaceNotFound(_)) => {}
598            Err(e) => return Err(e),
599        }
600
601        // Remove from tracking
602        {
603            let mut access_map = self.access_info.write();
604            for id in ids {
605                access_map.remove(&(namespace.clone(), id.clone()));
606            }
607        }
608
609        Ok(deleted)
610    }
611
612    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
613        // Check any tier
614        Ok(self.hot_storage.namespace_exists(namespace).await?
615            || self.warm_storage.namespace_exists(namespace).await?
616            || self.cold_storage.namespace_exists(namespace).await?)
617    }
618
619    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
620        // Ensure in all tiers
621        self.hot_storage.ensure_namespace(namespace).await?;
622        self.warm_storage.ensure_namespace(namespace).await?;
623        self.cold_storage.ensure_namespace(namespace).await?;
624        Ok(())
625    }
626
627    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
628        // With write-through, cold tier is the source of truth for total count.
629        // Hot/warm are caches that hold subsets of the same data.
630        // Use cold count as the baseline, then add any vectors that are ONLY
631        // in hot or warm (shouldn't happen with write-through, but safe).
632        let cold = self.cold_storage.count(namespace).await?;
633        if cold > 0 {
634            return Ok(cold);
635        }
636        // Fallback: if cold is empty, count from hot + warm (non-tiered data)
637        let hot = self.hot_storage.count(namespace).await?;
638        let warm = self.warm_storage.count(namespace).await?;
639        Ok(hot + warm)
640    }
641
642    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
643        // Check hot first, then warm, then cold
644        if let Some(dim) = self.hot_storage.dimension(namespace).await? {
645            return Ok(Some(dim));
646        }
647        if let Some(dim) = self.warm_storage.dimension(namespace).await? {
648            return Ok(Some(dim));
649        }
650        self.cold_storage.dimension(namespace).await
651    }
652
653    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
654        let mut namespaces = std::collections::HashSet::new();
655
656        namespaces.extend(self.hot_storage.list_namespaces().await?);
657        namespaces.extend(self.warm_storage.list_namespaces().await?);
658        namespaces.extend(self.cold_storage.list_namespaces().await?);
659
660        Ok(namespaces.into_iter().collect())
661    }
662
663    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
664        // Delete from all tiers
665        let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
666        let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
667        let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
668
669        // Remove from access tracking
670        {
671            let mut access_map = self.access_info.write();
672            access_map.retain(|(ns, _), _| ns != namespace);
673        }
674
675        Ok(hot_deleted || warm_deleted || cold_deleted)
676    }
677
678    async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
679        // Cleanup from all tiers
680        let mut total = 0;
681        total += self.hot_storage.cleanup_expired(namespace).await?;
682        total += self.warm_storage.cleanup_expired(namespace).await?;
683        total += self.cold_storage.cleanup_expired(namespace).await?;
684        Ok(total)
685    }
686
687    async fn cleanup_all_expired(&self) -> Result<usize> {
688        // Cleanup from all tiers
689        let mut total = 0;
690        total += self.hot_storage.cleanup_all_expired().await?;
691        total += self.warm_storage.cleanup_all_expired().await?;
692        total += self.cold_storage.cleanup_all_expired().await?;
693        Ok(total)
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700    use crate::memory::InMemoryStorage;
701
702    fn create_test_vector(id: &str, dim: usize) -> Vector {
703        Vector {
704            id: id.to_string(),
705            values: vec![1.0; dim],
706            metadata: None,
707            ttl_seconds: None,
708            expires_at: None,
709        }
710    }
711
712    #[tokio::test]
713    async fn test_tiered_storage_basic() {
714        let config = TieredStorageConfig::default();
715        let storage = TieredStorage::new(
716            config,
717            InMemoryStorage::new(),
718            InMemoryStorage::new(),
719            InMemoryStorage::new(),
720        );
721
722        let namespace = "test".to_string();
723        storage.ensure_namespace(&namespace).await.unwrap();
724
725        // Upsert goes to hot tier
726        let vectors = vec![create_test_vector("v1", 4)];
727        let count = storage.upsert(&namespace, vectors).await.unwrap();
728        assert_eq!(count, 1);
729
730        // Get should find in hot tier
731        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
732        assert_eq!(results.len(), 1);
733
734        let stats = storage.stats();
735        assert_eq!(stats.hot_hits, 2); // One from upsert record_access, one from get
736    }
737
738    #[tokio::test]
739    async fn test_tiered_storage_promotion_demotion() {
740        let config = TieredStorageConfig::default();
741        let storage = TieredStorage::new(
742            config,
743            InMemoryStorage::new(),
744            InMemoryStorage::new(),
745            InMemoryStorage::new(),
746        );
747
748        let namespace = "test".to_string();
749        storage.ensure_namespace(&namespace).await.unwrap();
750
751        // Add to hot tier
752        storage
753            .upsert(&namespace, vec![create_test_vector("v1", 4)])
754            .await
755            .unwrap();
756
757        // Verify in hot
758        assert_eq!(
759            storage.get_tier(&namespace, &"v1".to_string()),
760            Some(StorageTier::Hot)
761        );
762
763        // Demote to warm
764        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
765        assert!(demoted);
766        assert_eq!(
767            storage.get_tier(&namespace, &"v1".to_string()),
768            Some(StorageTier::Warm)
769        );
770
771        // Demote to cold
772        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
773        assert!(demoted);
774        assert_eq!(
775            storage.get_tier(&namespace, &"v1".to_string()),
776            Some(StorageTier::Cold)
777        );
778
779        // Still accessible
780        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
781        assert_eq!(results.len(), 1);
782
783        let stats = storage.stats();
784        assert_eq!(stats.demotions_to_warm, 1);
785        assert_eq!(stats.demotions_to_cold, 1);
786    }
787
788    #[tokio::test]
789    async fn test_tiered_storage_multi_tier_get() {
790        let config = TieredStorageConfig::default();
791        let storage = TieredStorage::new(
792            config,
793            InMemoryStorage::new(),
794            InMemoryStorage::new(),
795            InMemoryStorage::new(),
796        );
797
798        let namespace = "test".to_string();
799        storage.ensure_namespace(&namespace).await.unwrap();
800
801        // Add vectors and demote some
802        for i in 0..3 {
803            storage
804                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
805                .await
806                .unwrap();
807        }
808
809        // v0 stays hot, v1 goes warm, v2 goes cold
810        storage.demote(&namespace, &"v1".to_string()).await.unwrap();
811        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
812        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
813
814        // Get all at once
815        let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
816        let results = storage.get(&namespace, &ids).await.unwrap();
817        assert_eq!(results.len(), 3);
818    }
819
820    #[tokio::test]
821    async fn test_tier_distribution() {
822        let config = TieredStorageConfig::default();
823        let storage = TieredStorage::new(
824            config,
825            InMemoryStorage::new(),
826            InMemoryStorage::new(),
827            InMemoryStorage::new(),
828        );
829
830        let namespace = "test".to_string();
831        storage.ensure_namespace(&namespace).await.unwrap();
832
833        // Add 5 vectors
834        for i in 0..5 {
835            storage
836                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
837                .await
838                .unwrap();
839        }
840
841        // Demote 2 to warm, 1 to cold
842        storage.demote(&namespace, &"v3".to_string()).await.unwrap();
843        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
844        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
845
846        let dist = storage.tier_distribution(&namespace);
847        assert_eq!(dist.hot, 3);
848        assert_eq!(dist.warm, 1);
849        assert_eq!(dist.cold, 1);
850    }
851}