Skip to main content

storage/
tiered.rs

1//! Tiered Storage for Buffer
2//!
3//! Automatic data tiering based on access patterns:
4//! - Hot tier: In-memory for frequently accessed data
5//! - Warm tier: Local disk cache for recent data
6//! - Cold tier: Object storage for infrequently accessed data
7
8use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18/// Tiered storage configuration
19#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21    /// Hot tier capacity (number of vectors)
22    pub hot_tier_capacity: usize,
23    /// Time before demoting from hot to warm
24    pub hot_to_warm_threshold: Duration,
25    /// Time before demoting from warm to cold
26    pub warm_to_cold_threshold: Duration,
27    /// Enable automatic tiering
28    pub auto_tier_enabled: bool,
29    /// Tier check interval
30    pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34    fn default() -> Self {
35        Self {
36            hot_tier_capacity: 100_000,
37            hot_to_warm_threshold: Duration::from_secs(3600), // 1 hour
38            warm_to_cold_threshold: Duration::from_secs(86400), // 24 hours
39            auto_tier_enabled: true,
40            tier_check_interval: Duration::from_secs(300), // 5 minutes
41        }
42    }
43}
44
45/// Storage tier for a piece of data
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48    /// In-memory, fastest access
49    Hot,
50    /// Local disk, fast access
51    Warm,
52    /// Object storage, slowest access
53    Cold,
54}
55
56impl StorageTier {
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            StorageTier::Hot => "hot",
60            StorageTier::Warm => "warm",
61            StorageTier::Cold => "cold",
62        }
63    }
64}
65
66/// Access tracking for tiering decisions
67#[derive(Debug, Clone)]
68struct AccessInfo {
69    /// Last access timestamp
70    last_access: Instant,
71    /// Total access count
72    access_count: u64,
73    /// Current tier
74    tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78    fn default() -> Self {
79        Self {
80            last_access: Instant::now(),
81            access_count: 0,
82            tier: StorageTier::Hot,
83        }
84    }
85}
86
87/// Statistics for tiered storage
88#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90    /// Vectors in hot tier
91    pub hot_count: u64,
92    /// Vectors in warm tier
93    pub warm_count: u64,
94    /// Vectors in cold tier
95    pub cold_count: u64,
96    /// Hot tier hits
97    pub hot_hits: u64,
98    /// Warm tier hits
99    pub warm_hits: u64,
100    /// Cold tier hits
101    pub cold_hits: u64,
102    /// Promotions to hot tier
103    pub promotions_to_hot: u64,
104    /// Demotions to warm tier
105    pub demotions_to_warm: u64,
106    /// Demotions to cold tier
107    pub demotions_to_cold: u64,
108}
109
110/// Tiered storage manager
111pub struct TieredStorage<H, W, C> {
112    /// Configuration
113    config: TieredStorageConfig,
114    /// Hot tier storage (in-memory)
115    hot_storage: H,
116    /// Warm tier storage (disk cache)
117    warm_storage: W,
118    /// Cold tier storage (object storage)
119    cold_storage: C,
120    /// Access tracking per vector
121    access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122    /// Statistics
123    stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127    hot_count: AtomicU64,
128    warm_count: AtomicU64,
129    cold_count: AtomicU64,
130    hot_hits: AtomicU64,
131    warm_hits: AtomicU64,
132    cold_hits: AtomicU64,
133    promotions_to_hot: AtomicU64,
134    demotions_to_warm: AtomicU64,
135    demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139    fn default() -> Self {
140        Self {
141            hot_count: AtomicU64::new(0),
142            warm_count: AtomicU64::new(0),
143            cold_count: AtomicU64::new(0),
144            hot_hits: AtomicU64::new(0),
145            warm_hits: AtomicU64::new(0),
146            cold_hits: AtomicU64::new(0),
147            promotions_to_hot: AtomicU64::new(0),
148            demotions_to_warm: AtomicU64::new(0),
149            demotions_to_cold: AtomicU64::new(0),
150        }
151    }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156    H: VectorStorage,
157    W: VectorStorage,
158    C: VectorStorage,
159{
160    /// Create a new tiered storage
161    pub fn new(
162        config: TieredStorageConfig,
163        hot_storage: H,
164        warm_storage: W,
165        cold_storage: C,
166    ) -> Self {
167        Self {
168            config,
169            hot_storage,
170            warm_storage,
171            cold_storage,
172            access_info: RwLock::new(HashMap::new()),
173            stats: TieredStorageStatsInner::default(),
174        }
175    }
176
177    /// Get the tiered storage configuration
178    pub fn config(&self) -> &TieredStorageConfig {
179        &self.config
180    }
181
182    /// Record access to a vector
183    fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184        let key = (namespace.clone(), id.clone());
185        let mut access_map = self.access_info.write();
186
187        let info = access_map.entry(key).or_default();
188        info.last_access = Instant::now();
189        info.access_count += 1;
190        info.tier = tier;
191
192        // Update hit counters
193        match tier {
194            StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195            StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196            StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197        };
198    }
199
200    /// Get the current tier for a vector
201    fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202        let access_map = self.access_info.read();
203        access_map
204            .get(&(namespace.clone(), id.clone()))
205            .map(|info| info.tier)
206    }
207
208    /// Promote a vector to a higher tier
209    pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210        let current_tier = self.get_tier(namespace, id);
211
212        match current_tier {
213            Some(StorageTier::Warm) => {
214                // Promote warm -> hot
215                let vectors = self
216                    .warm_storage
217                    .get(namespace, std::slice::from_ref(id))
218                    .await?;
219                if !vectors.is_empty() {
220                    self.hot_storage.upsert(namespace, vectors).await?;
221                    self.warm_storage
222                        .delete(namespace, std::slice::from_ref(id))
223                        .await?;
224
225                    self.update_tier(namespace, id, StorageTier::Hot);
226                    self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227                    self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230                    return Ok(true);
231                }
232            }
233            Some(StorageTier::Cold) => {
234                // Promote cold -> warm (or directly to hot if frequently accessed)
235                let vectors = self
236                    .cold_storage
237                    .get(namespace, std::slice::from_ref(id))
238                    .await?;
239                if !vectors.is_empty() {
240                    // Check if should go directly to hot based on access frequency
241                    let should_be_hot = {
242                        let access_map = self.access_info.read();
243                        access_map
244                            .get(&(namespace.clone(), id.clone()))
245                            .map(|info| info.access_count > 10)
246                            .unwrap_or(false)
247                    };
248
249                    if should_be_hot {
250                        self.hot_storage.upsert(namespace, vectors).await?;
251                        self.update_tier(namespace, id, StorageTier::Hot);
252                        self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253                        self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254                    } else {
255                        self.warm_storage.upsert(namespace, vectors).await?;
256                        self.update_tier(namespace, id, StorageTier::Warm);
257                        self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258                    }
259                    // Cold tier is the durable source of truth — never delete on promotion.
260                    // The tier map tracks which tier is "active" for reads; cold remains
261                    // as the persistent backup in case warm/hot are lost on restart.
262
263                    return Ok(true);
264                }
265            }
266            _ => {}
267        }
268
269        Ok(false)
270    }
271
272    /// Demote a vector to a lower tier
273    pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274        let current_tier = self.get_tier(namespace, id);
275
276        match current_tier {
277            Some(StorageTier::Hot) => {
278                // Demote hot -> warm
279                let vectors = self
280                    .hot_storage
281                    .get(namespace, std::slice::from_ref(id))
282                    .await?;
283                if !vectors.is_empty() {
284                    self.warm_storage.upsert(namespace, vectors).await?;
285                    self.hot_storage
286                        .delete(namespace, std::slice::from_ref(id))
287                        .await?;
288
289                    self.update_tier(namespace, id, StorageTier::Warm);
290                    self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291                    self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292                    self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294                    return Ok(true);
295                }
296            }
297            Some(StorageTier::Warm) => {
298                // Demote warm -> cold
299                let vectors = self
300                    .warm_storage
301                    .get(namespace, std::slice::from_ref(id))
302                    .await?;
303                if !vectors.is_empty() {
304                    self.cold_storage.upsert(namespace, vectors).await?;
305                    self.warm_storage
306                        .delete(namespace, std::slice::from_ref(id))
307                        .await?;
308
309                    self.update_tier(namespace, id, StorageTier::Cold);
310                    self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311                    self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312                    self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314                    return Ok(true);
315                }
316            }
317            _ => {}
318        }
319
320        Ok(false)
321    }
322
323    /// Update tier tracking
324    fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325        let mut access_map = self.access_info.write();
326        let key = (namespace.clone(), id.clone());
327        let info = access_map.entry(key).or_default();
328        info.tier = tier;
329    }
330
331    fn is_permanent_storage_error(error: &DakeraError) -> bool {
332        match error {
333            DakeraError::NamespaceNotFound(_) => true,
334            DakeraError::Storage(msg) => {
335                msg.contains("PermissionDenied") || msg.contains("permission denied")
336            }
337            _ => false,
338        }
339    }
340
341    /// Run automatic tiering based on access patterns
342    pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
343        if !self.config.auto_tier_enabled {
344            return Ok(TieringResult::default());
345        }
346
347        let now = Instant::now();
348        let mut to_demote_to_warm = Vec::new();
349        let mut to_demote_to_cold = Vec::new();
350
351        // Collect vectors to demote
352        {
353            let access_map = self.access_info.read();
354            for ((namespace, id), info) in access_map.iter() {
355                let elapsed = now.duration_since(info.last_access);
356
357                match info.tier {
358                    StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
359                        to_demote_to_warm.push((namespace.clone(), id.clone()));
360                    }
361                    StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
362                        to_demote_to_cold.push((namespace.clone(), id.clone()));
363                    }
364                    _ => {}
365                }
366            }
367        }
368
369        // Execute demotions — errors on individual vectors are handled gracefully
370        // to prevent a single deleted namespace from blocking all tiering.
371        let mut demoted_to_warm = 0;
372        let mut demoted_to_cold = 0;
373        let mut stale_entries = Vec::new();
374
375        for (namespace, id) in to_demote_to_warm {
376            match self.demote(&namespace, &id).await {
377                Ok(true) => demoted_to_warm += 1,
378                Ok(false) => {}
379                Err(e) => {
380                    if Self::is_permanent_storage_error(&e) {
381                        stale_entries.push((namespace, id));
382                    } else {
383                        tracing::debug!(error = %e, "Transient demotion error, will retry next cycle");
384                    }
385                }
386            }
387        }
388
389        for (namespace, id) in to_demote_to_cold {
390            match self.demote(&namespace, &id).await {
391                Ok(true) => demoted_to_cold += 1,
392                Ok(false) => {}
393                Err(e) => {
394                    if Self::is_permanent_storage_error(&e) {
395                        stale_entries.push((namespace, id));
396                    } else {
397                        tracing::debug!(error = %e, "Transient demotion error, will retry next cycle");
398                    }
399                }
400            }
401        }
402
403        if !stale_entries.is_empty() {
404            let removed = stale_entries.len();
405            let mut access_map = self.access_info.write();
406            for (ns, id) in &stale_entries {
407                access_map.remove(&(ns.clone(), id.clone()));
408            }
409            tracing::warn!(
410                removed = removed,
411                "Purged stale entries from tiering queue (deleted namespace or permanent error)"
412            );
413        }
414
415        Ok(TieringResult {
416            demoted_to_warm,
417            demoted_to_cold,
418            promoted_to_hot: 0,
419            promoted_to_warm: 0,
420        })
421    }
422
423    /// Get storage statistics
424    pub fn stats(&self) -> TieredStorageStats {
425        TieredStorageStats {
426            hot_count: self.stats.hot_count.load(Ordering::Relaxed),
427            warm_count: self.stats.warm_count.load(Ordering::Relaxed),
428            cold_count: self.stats.cold_count.load(Ordering::Relaxed),
429            hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
430            warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
431            cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
432            promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
433            demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
434            demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
435        }
436    }
437
438    /// Get tier distribution by namespace
439    pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
440        let access_map = self.access_info.read();
441        let mut hot = 0u64;
442        let mut warm = 0u64;
443        let mut cold = 0u64;
444
445        for ((ns, _), info) in access_map.iter() {
446            if ns == namespace {
447                match info.tier {
448                    StorageTier::Hot => hot += 1,
449                    StorageTier::Warm => warm += 1,
450                    StorageTier::Cold => cold += 1,
451                }
452            }
453        }
454
455        TierDistribution { hot, warm, cold }
456    }
457}
458
459/// Result of automatic tiering
460#[derive(Debug, Clone, Default)]
461pub struct TieringResult {
462    /// Vectors demoted to warm tier
463    pub demoted_to_warm: u64,
464    /// Vectors demoted to cold tier
465    pub demoted_to_cold: u64,
466    /// Vectors promoted to hot tier
467    pub promoted_to_hot: u64,
468    /// Vectors promoted to warm tier
469    pub promoted_to_warm: u64,
470}
471
472/// Tier distribution for a namespace
473#[derive(Debug, Clone)]
474pub struct TierDistribution {
475    pub hot: u64,
476    pub warm: u64,
477    pub cold: u64,
478}
479
480#[async_trait]
481impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
482where
483    H: VectorStorage,
484    W: VectorStorage,
485    C: VectorStorage + Clone + Send + Sync + 'static,
486{
487    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
488        // Track access info before moving vectors into storage
489        let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
490
491        // Clone vectors for the background cold-tier flush before consuming them.
492        let cold_vectors = vectors.clone();
493
494        // Write hot tier first — lowest latency path, immediately available for reads.
495        let count = self.hot_storage.upsert(namespace, vectors).await?;
496
497        // Flush to cold tier (S3) in the background — failure is logged but not fatal.
498        // The hot tier (RocksDB/in-memory) is the primary durable read path.
499        let cold = self.cold_storage.clone();
500        let cold_ns = namespace.clone();
501        tokio::spawn(async move {
502            if let Err(e) = cold.ensure_namespace(&cold_ns).await {
503                tracing::error!(
504                    error = %e,
505                    namespace = %cold_ns,
506                    "Cold tier namespace ensure failed (S3 flush aborted)"
507                );
508                return;
509            }
510            let retry_vectors = cold_vectors.clone();
511            match cold.upsert(&cold_ns, cold_vectors).await {
512                Ok(_) => {}
513                Err(DakeraError::DimensionMismatch { expected, actual }) => {
514                    tracing::warn!(
515                        namespace = %cold_ns,
516                        cold_dim = expected,
517                        hot_dim = actual,
518                        "Cold tier dimension mismatch — resetting stale cold namespace"
519                    );
520                    if let Err(e) = cold.delete_namespace(&cold_ns).await {
521                        tracing::error!(error = %e, namespace = %cold_ns,
522                            "Failed to delete stale cold namespace");
523                        return;
524                    }
525                    if let Err(e) = cold.ensure_namespace(&cold_ns).await {
526                        tracing::error!(error = %e, namespace = %cold_ns,
527                            "Failed to recreate cold namespace after dimension fix");
528                        return;
529                    }
530                    if let Err(e) = cold.upsert(&cold_ns, retry_vectors).await {
531                        tracing::error!(
532                            error = %e,
533                            namespace = %cold_ns,
534                            "Cold tier S3 flush failed after dimension fix — data is durable in hot tier"
535                        );
536                    }
537                }
538                Err(e) => {
539                    tracing::error!(
540                        error = %e,
541                        namespace = %cold_ns,
542                        "Cold tier S3 flush failed — data is durable in hot tier"
543                    );
544                }
545            }
546        });
547
548        for id in &ids {
549            self.update_tier(namespace, id, StorageTier::Hot);
550            self.record_access(namespace, id, StorageTier::Hot);
551        }
552
553        self.stats
554            .hot_count
555            .fetch_add(count as u64, Ordering::Relaxed);
556        Ok(count)
557    }
558
559    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
560        let mut results = Vec::with_capacity(ids.len());
561        let mut remaining_ids: Vec<VectorId> = ids.to_vec();
562
563        // Try hot tier first (NamespaceNotFound is normal after restart when hot cache is empty)
564        let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
565            Ok(v) => v,
566            Err(DakeraError::NamespaceNotFound(_)) => vec![],
567            Err(e) => return Err(e),
568        };
569        for v in &hot_results {
570            self.record_access(namespace, &v.id, StorageTier::Hot);
571        }
572
573        // Remove found IDs
574        let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
575        remaining_ids.retain(|id| !found_ids.contains(id));
576        results.extend(hot_results);
577
578        if remaining_ids.is_empty() {
579            return Ok(results);
580        }
581
582        // Try warm tier (NamespaceNotFound is normal after restart when warm cache is empty)
583        let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
584            Ok(v) => v,
585            Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
586            Err(e) => return Err(e),
587        };
588        for v in &warm_results {
589            self.record_access(namespace, &v.id, StorageTier::Warm);
590        }
591
592        let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
593        remaining_ids.retain(|id| !found_ids.contains(id));
594        results.extend(warm_results);
595
596        if remaining_ids.is_empty() {
597            return Ok(results);
598        }
599
600        // Try cold tier
601        let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
602            Ok(v) => v,
603            Err(DakeraError::NamespaceNotFound(_)) => vec![],
604            Err(e) => return Err(e),
605        };
606        for v in &cold_results {
607            self.record_access(namespace, &v.id, StorageTier::Cold);
608        }
609        results.extend(cold_results);
610
611        Ok(results)
612    }
613
614    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
615        let mut seen = std::collections::HashSet::new();
616        let mut results = Vec::new();
617
618        // Helper: treat NamespaceNotFound as empty — normal for hot/warm after restart.
619        let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
620            match res {
621                Ok(v) => Ok(v),
622                Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
623                Err(e) => Err(e),
624            }
625        };
626
627        // Gather from all tiers, preferring hot over warm over cold.
628        // Deduplicate by vector ID since write-through means a vector
629        // can exist in both hot and cold simultaneously.
630        for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
631            if seen.insert(v.id.clone()) {
632                results.push(v);
633            }
634        }
635        for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
636            if seen.insert(v.id.clone()) {
637                results.push(v);
638            }
639        }
640        for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
641            if seen.insert(v.id.clone()) {
642                results.push(v);
643            }
644        }
645
646        Ok(results)
647    }
648
649    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
650        let mut deleted = 0;
651
652        // Delete from all tiers. Tolerate NamespaceNotFound from individual tiers
653        // since data may only reside in a subset of tiers (e.g. cold but not hot).
654        match self.hot_storage.delete(namespace, ids).await {
655            Ok(n) => deleted += n,
656            Err(DakeraError::NamespaceNotFound(_)) => {}
657            Err(e) => return Err(e),
658        }
659        match self.warm_storage.delete(namespace, ids).await {
660            Ok(n) => deleted += n,
661            Err(DakeraError::NamespaceNotFound(_)) => {}
662            Err(e) => return Err(e),
663        }
664        match self.cold_storage.delete(namespace, ids).await {
665            Ok(n) => deleted += n,
666            Err(DakeraError::NamespaceNotFound(_)) => {}
667            Err(e) => return Err(e),
668        }
669
670        // Remove from tracking
671        {
672            let mut access_map = self.access_info.write();
673            for id in ids {
674                access_map.remove(&(namespace.clone(), id.clone()));
675            }
676        }
677
678        Ok(deleted)
679    }
680
681    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
682        // Check any tier
683        Ok(self.hot_storage.namespace_exists(namespace).await?
684            || self.warm_storage.namespace_exists(namespace).await?
685            || self.cold_storage.namespace_exists(namespace).await?)
686    }
687
688    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
689        // Ensure in all tiers
690        self.hot_storage.ensure_namespace(namespace).await?;
691        self.warm_storage.ensure_namespace(namespace).await?;
692        self.cold_storage.ensure_namespace(namespace).await?;
693        Ok(())
694    }
695
696    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
697        // With write-through, cold tier is the source of truth for total count.
698        // Hot/warm are caches that hold subsets of the same data.
699        // Use cold count as the baseline, then add any vectors that are ONLY
700        // in hot or warm (shouldn't happen with write-through, but safe).
701        let cold = self.cold_storage.count(namespace).await?;
702        if cold > 0 {
703            return Ok(cold);
704        }
705        // Fallback: if cold is empty, count from hot + warm (non-tiered data)
706        let hot = self.hot_storage.count(namespace).await?;
707        let warm = self.warm_storage.count(namespace).await?;
708        Ok(hot + warm)
709    }
710
711    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
712        // Check hot first, then warm, then cold
713        if let Some(dim) = self.hot_storage.dimension(namespace).await? {
714            return Ok(Some(dim));
715        }
716        if let Some(dim) = self.warm_storage.dimension(namespace).await? {
717            return Ok(Some(dim));
718        }
719        self.cold_storage.dimension(namespace).await
720    }
721
722    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
723        let mut namespaces = std::collections::HashSet::new();
724
725        namespaces.extend(self.hot_storage.list_namespaces().await?);
726        namespaces.extend(self.warm_storage.list_namespaces().await?);
727        namespaces.extend(self.cold_storage.list_namespaces().await?);
728
729        Ok(namespaces.into_iter().collect())
730    }
731
732    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
733        // Delete from all tiers
734        let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
735        let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
736        let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
737
738        // Remove from access tracking
739        {
740            let mut access_map = self.access_info.write();
741            access_map.retain(|(ns, _), _| ns != namespace);
742        }
743
744        Ok(hot_deleted || warm_deleted || cold_deleted)
745    }
746
747    async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
748        // Cleanup from all tiers
749        let mut total = 0;
750        total += self.hot_storage.cleanup_expired(namespace).await?;
751        total += self.warm_storage.cleanup_expired(namespace).await?;
752        total += self.cold_storage.cleanup_expired(namespace).await?;
753        Ok(total)
754    }
755
756    async fn cleanup_all_expired(&self) -> Result<usize> {
757        // Cleanup from all tiers
758        let mut total = 0;
759        total += self.hot_storage.cleanup_all_expired().await?;
760        total += self.warm_storage.cleanup_all_expired().await?;
761        total += self.cold_storage.cleanup_all_expired().await?;
762        Ok(total)
763    }
764}
765
766#[cfg(test)]
767mod tests {
768    use super::*;
769    use crate::memory::InMemoryStorage;
770
771    fn create_test_vector(id: &str, dim: usize) -> Vector {
772        Vector {
773            id: id.to_string(),
774            values: vec![1.0; dim],
775            metadata: None,
776            ttl_seconds: None,
777            expires_at: None,
778        }
779    }
780
781    #[tokio::test]
782    async fn test_tiered_storage_basic() {
783        let config = TieredStorageConfig::default();
784        let storage = TieredStorage::new(
785            config,
786            InMemoryStorage::new(),
787            InMemoryStorage::new(),
788            InMemoryStorage::new(),
789        );
790
791        let namespace = "test".to_string();
792        storage.ensure_namespace(&namespace).await.unwrap();
793
794        // Upsert goes to hot tier
795        let vectors = vec![create_test_vector("v1", 4)];
796        let count = storage.upsert(&namespace, vectors).await.unwrap();
797        assert_eq!(count, 1);
798
799        // Get should find in hot tier
800        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
801        assert_eq!(results.len(), 1);
802
803        let stats = storage.stats();
804        assert_eq!(stats.hot_hits, 2); // One from upsert record_access, one from get
805    }
806
807    #[tokio::test]
808    async fn test_tiered_storage_promotion_demotion() {
809        let config = TieredStorageConfig::default();
810        let storage = TieredStorage::new(
811            config,
812            InMemoryStorage::new(),
813            InMemoryStorage::new(),
814            InMemoryStorage::new(),
815        );
816
817        let namespace = "test".to_string();
818        storage.ensure_namespace(&namespace).await.unwrap();
819
820        // Add to hot tier
821        storage
822            .upsert(&namespace, vec![create_test_vector("v1", 4)])
823            .await
824            .unwrap();
825
826        // Verify in hot
827        assert_eq!(
828            storage.get_tier(&namespace, &"v1".to_string()),
829            Some(StorageTier::Hot)
830        );
831
832        // Demote to warm
833        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
834        assert!(demoted);
835        assert_eq!(
836            storage.get_tier(&namespace, &"v1".to_string()),
837            Some(StorageTier::Warm)
838        );
839
840        // Demote to cold
841        let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
842        assert!(demoted);
843        assert_eq!(
844            storage.get_tier(&namespace, &"v1".to_string()),
845            Some(StorageTier::Cold)
846        );
847
848        // Still accessible
849        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
850        assert_eq!(results.len(), 1);
851
852        let stats = storage.stats();
853        assert_eq!(stats.demotions_to_warm, 1);
854        assert_eq!(stats.demotions_to_cold, 1);
855    }
856
857    #[tokio::test]
858    async fn test_tiered_storage_multi_tier_get() {
859        let config = TieredStorageConfig::default();
860        let storage = TieredStorage::new(
861            config,
862            InMemoryStorage::new(),
863            InMemoryStorage::new(),
864            InMemoryStorage::new(),
865        );
866
867        let namespace = "test".to_string();
868        storage.ensure_namespace(&namespace).await.unwrap();
869
870        // Add vectors and demote some
871        for i in 0..3 {
872            storage
873                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
874                .await
875                .unwrap();
876        }
877
878        // v0 stays hot, v1 goes warm, v2 goes cold
879        storage.demote(&namespace, &"v1".to_string()).await.unwrap();
880        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
881        storage.demote(&namespace, &"v2".to_string()).await.unwrap();
882
883        // Get all at once
884        let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
885        let results = storage.get(&namespace, &ids).await.unwrap();
886        assert_eq!(results.len(), 3);
887    }
888
889    #[tokio::test]
890    async fn test_tier_distribution() {
891        let config = TieredStorageConfig::default();
892        let storage = TieredStorage::new(
893            config,
894            InMemoryStorage::new(),
895            InMemoryStorage::new(),
896            InMemoryStorage::new(),
897        );
898
899        let namespace = "test".to_string();
900        storage.ensure_namespace(&namespace).await.unwrap();
901
902        // Add 5 vectors
903        for i in 0..5 {
904            storage
905                .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
906                .await
907                .unwrap();
908        }
909
910        // Demote 2 to warm, 1 to cold
911        storage.demote(&namespace, &"v3".to_string()).await.unwrap();
912        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
913        storage.demote(&namespace, &"v4".to_string()).await.unwrap();
914
915        let dist = storage.tier_distribution(&namespace);
916        assert_eq!(dist.hot, 3);
917        assert_eq!(dist.warm, 1);
918        assert_eq!(dist.cold, 1);
919    }
920
921    #[tokio::test]
922    async fn test_auto_tiering_purges_stale_namespace_entries() {
923        let config = TieredStorageConfig {
924            auto_tier_enabled: true,
925            hot_to_warm_threshold: Duration::from_millis(0),
926            warm_to_cold_threshold: Duration::from_millis(0),
927            ..Default::default()
928        };
929        let storage = TieredStorage::new(
930            config,
931            InMemoryStorage::new(),
932            InMemoryStorage::new(),
933            InMemoryStorage::new(),
934        );
935
936        let namespace = "deleted_bench_ns".to_string();
937        storage.ensure_namespace(&namespace).await.unwrap();
938        storage
939            .upsert(&namespace, vec![create_test_vector("v1", 4)])
940            .await
941            .unwrap();
942
943        // Delete the namespace (simulates bench cleanup)
944        storage.delete_namespace(&namespace).await.unwrap();
945
946        // Re-insert stale entries into access_info (simulates race / leftover)
947        {
948            let mut access_map = storage.access_info.write();
949            access_map.insert(
950                (namespace.clone(), "v1".to_string()),
951                AccessInfo {
952                    last_access: Instant::now() - Duration::from_secs(7200),
953                    access_count: 1,
954                    tier: StorageTier::Hot,
955                },
956            );
957        }
958
959        // Auto-tiering should succeed (not bail) and purge the stale entry
960        let result = storage.run_auto_tiering().await.unwrap();
961        assert_eq!(result.demoted_to_warm, 0);
962
963        let access_map = storage.access_info.read();
964        assert!(
965            access_map.is_empty(),
966            "Stale entries should have been purged from tiering queue"
967        );
968    }
969
970    #[test]
971    fn test_is_permanent_storage_error() {
972        assert!(TieredStorage::<
973            InMemoryStorage,
974            InMemoryStorage,
975            InMemoryStorage,
976        >::is_permanent_storage_error(
977            &DakeraError::NamespaceNotFound("test".into())
978        ));
979        assert!(TieredStorage::<
980            InMemoryStorage,
981            InMemoryStorage,
982            InMemoryStorage,
983        >::is_permanent_storage_error(&DakeraError::Storage(
984            "PermissionDenied (permanent) at write".into()
985        )));
986        assert!(!TieredStorage::<
987            InMemoryStorage,
988            InMemoryStorage,
989            InMemoryStorage,
990        >::is_permanent_storage_error(
991            &DakeraError::Storage("connection timeout".into())
992        ));
993        assert!(!TieredStorage::<
994            InMemoryStorage,
995            InMemoryStorage,
996            InMemoryStorage,
997        >::is_permanent_storage_error(
998            &DakeraError::DimensionMismatch {
999                expected: 384,
1000                actual: 1024,
1001            }
1002        ));
1003    }
1004}