use async_trait::async_trait;
use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use crate::traits::VectorStorage;
#[derive(Debug, Clone)]
pub struct TieredStorageConfig {
pub hot_tier_capacity: usize,
pub hot_to_warm_threshold: Duration,
pub warm_to_cold_threshold: Duration,
pub auto_tier_enabled: bool,
pub tier_check_interval: Duration,
}
impl Default for TieredStorageConfig {
fn default() -> Self {
Self {
hot_tier_capacity: 100_000,
hot_to_warm_threshold: Duration::from_secs(3600), warm_to_cold_threshold: Duration::from_secs(86400), auto_tier_enabled: true,
tier_check_interval: Duration::from_secs(300), }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StorageTier {
Hot,
Warm,
Cold,
}
impl StorageTier {
pub fn as_str(&self) -> &'static str {
match self {
StorageTier::Hot => "hot",
StorageTier::Warm => "warm",
StorageTier::Cold => "cold",
}
}
}
#[derive(Debug, Clone)]
struct AccessInfo {
last_access: Instant,
access_count: u64,
tier: StorageTier,
}
impl Default for AccessInfo {
fn default() -> Self {
Self {
last_access: Instant::now(),
access_count: 0,
tier: StorageTier::Hot,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TieredStorageStats {
pub hot_count: u64,
pub warm_count: u64,
pub cold_count: u64,
pub hot_hits: u64,
pub warm_hits: u64,
pub cold_hits: u64,
pub promotions_to_hot: u64,
pub demotions_to_warm: u64,
pub demotions_to_cold: u64,
}
pub struct TieredStorage<H, W, C> {
config: TieredStorageConfig,
hot_storage: H,
warm_storage: W,
cold_storage: C,
access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
stats: TieredStorageStatsInner,
}
struct TieredStorageStatsInner {
hot_count: AtomicU64,
warm_count: AtomicU64,
cold_count: AtomicU64,
hot_hits: AtomicU64,
warm_hits: AtomicU64,
cold_hits: AtomicU64,
promotions_to_hot: AtomicU64,
demotions_to_warm: AtomicU64,
demotions_to_cold: AtomicU64,
}
impl Default for TieredStorageStatsInner {
fn default() -> Self {
Self {
hot_count: AtomicU64::new(0),
warm_count: AtomicU64::new(0),
cold_count: AtomicU64::new(0),
hot_hits: AtomicU64::new(0),
warm_hits: AtomicU64::new(0),
cold_hits: AtomicU64::new(0),
promotions_to_hot: AtomicU64::new(0),
demotions_to_warm: AtomicU64::new(0),
demotions_to_cold: AtomicU64::new(0),
}
}
}
impl<H, W, C> TieredStorage<H, W, C>
where
H: VectorStorage,
W: VectorStorage,
C: VectorStorage,
{
pub fn new(
config: TieredStorageConfig,
hot_storage: H,
warm_storage: W,
cold_storage: C,
) -> Self {
Self {
config,
hot_storage,
warm_storage,
cold_storage,
access_info: RwLock::new(HashMap::new()),
stats: TieredStorageStatsInner::default(),
}
}
pub fn config(&self) -> &TieredStorageConfig {
&self.config
}
fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
let key = (namespace.clone(), id.clone());
let mut access_map = self.access_info.write();
let info = access_map.entry(key).or_default();
info.last_access = Instant::now();
info.access_count += 1;
info.tier = tier;
match tier {
StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
};
}
fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
let access_map = self.access_info.read();
access_map
.get(&(namespace.clone(), id.clone()))
.map(|info| info.tier)
}
pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
let current_tier = self.get_tier(namespace, id);
match current_tier {
Some(StorageTier::Warm) => {
let vectors = self
.warm_storage
.get(namespace, std::slice::from_ref(id))
.await?;
if !vectors.is_empty() {
self.hot_storage.upsert(namespace, vectors).await?;
self.warm_storage
.delete(namespace, std::slice::from_ref(id))
.await?;
self.update_tier(namespace, id, StorageTier::Hot);
self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
return Ok(true);
}
}
Some(StorageTier::Cold) => {
let vectors = self
.cold_storage
.get(namespace, std::slice::from_ref(id))
.await?;
if !vectors.is_empty() {
let should_be_hot = {
let access_map = self.access_info.read();
access_map
.get(&(namespace.clone(), id.clone()))
.map(|info| info.access_count > 10)
.unwrap_or(false)
};
if should_be_hot {
self.hot_storage.upsert(namespace, vectors).await?;
self.update_tier(namespace, id, StorageTier::Hot);
self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
} else {
self.warm_storage.upsert(namespace, vectors).await?;
self.update_tier(namespace, id, StorageTier::Warm);
self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
}
return Ok(true);
}
}
_ => {}
}
Ok(false)
}
pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
let current_tier = self.get_tier(namespace, id);
match current_tier {
Some(StorageTier::Hot) => {
let vectors = self
.hot_storage
.get(namespace, std::slice::from_ref(id))
.await?;
if !vectors.is_empty() {
self.warm_storage.upsert(namespace, vectors).await?;
self.hot_storage
.delete(namespace, std::slice::from_ref(id))
.await?;
self.update_tier(namespace, id, StorageTier::Warm);
self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
return Ok(true);
}
}
Some(StorageTier::Warm) => {
let vectors = self
.warm_storage
.get(namespace, std::slice::from_ref(id))
.await?;
if !vectors.is_empty() {
self.cold_storage.upsert(namespace, vectors).await?;
self.warm_storage
.delete(namespace, std::slice::from_ref(id))
.await?;
self.update_tier(namespace, id, StorageTier::Cold);
self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
return Ok(true);
}
}
_ => {}
}
Ok(false)
}
fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
let mut access_map = self.access_info.write();
let key = (namespace.clone(), id.clone());
let info = access_map.entry(key).or_default();
info.tier = tier;
}
pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
if !self.config.auto_tier_enabled {
return Ok(TieringResult::default());
}
let now = Instant::now();
let mut to_demote_to_warm = Vec::new();
let mut to_demote_to_cold = Vec::new();
{
let access_map = self.access_info.read();
for ((namespace, id), info) in access_map.iter() {
let elapsed = now.duration_since(info.last_access);
match info.tier {
StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
to_demote_to_warm.push((namespace.clone(), id.clone()));
}
StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
to_demote_to_cold.push((namespace.clone(), id.clone()));
}
_ => {}
}
}
}
let mut demoted_to_warm = 0;
let mut demoted_to_cold = 0;
for (namespace, id) in to_demote_to_warm {
if self.demote(&namespace, &id).await? {
demoted_to_warm += 1;
}
}
for (namespace, id) in to_demote_to_cold {
if self.demote(&namespace, &id).await? {
demoted_to_cold += 1;
}
}
Ok(TieringResult {
demoted_to_warm,
demoted_to_cold,
promoted_to_hot: 0,
promoted_to_warm: 0,
})
}
pub fn stats(&self) -> TieredStorageStats {
TieredStorageStats {
hot_count: self.stats.hot_count.load(Ordering::Relaxed),
warm_count: self.stats.warm_count.load(Ordering::Relaxed),
cold_count: self.stats.cold_count.load(Ordering::Relaxed),
hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
}
}
pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
let access_map = self.access_info.read();
let mut hot = 0u64;
let mut warm = 0u64;
let mut cold = 0u64;
for ((ns, _), info) in access_map.iter() {
if ns == namespace {
match info.tier {
StorageTier::Hot => hot += 1,
StorageTier::Warm => warm += 1,
StorageTier::Cold => cold += 1,
}
}
}
TierDistribution { hot, warm, cold }
}
}
#[derive(Debug, Clone, Default)]
pub struct TieringResult {
pub demoted_to_warm: u64,
pub demoted_to_cold: u64,
pub promoted_to_hot: u64,
pub promoted_to_warm: u64,
}
#[derive(Debug, Clone)]
pub struct TierDistribution {
pub hot: u64,
pub warm: u64,
pub cold: u64,
}
#[async_trait]
impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
where
H: VectorStorage,
W: VectorStorage,
C: VectorStorage + Clone + Send + Sync + 'static,
{
async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
let cold_vectors = vectors.clone();
let count = self.hot_storage.upsert(namespace, vectors).await?;
let cold = self.cold_storage.clone();
let cold_ns = namespace.clone();
tokio::spawn(async move {
if let Err(e) = cold.ensure_namespace(&cold_ns).await {
tracing::error!(
error = %e,
namespace = %cold_ns,
"Cold tier namespace ensure failed (S3 flush aborted)"
);
return;
}
if let Err(e) = cold.upsert(&cold_ns, cold_vectors).await {
tracing::error!(
error = %e,
namespace = %cold_ns,
"Cold tier S3 flush failed — data is durable in hot tier"
);
}
});
for id in &ids {
self.update_tier(namespace, id, StorageTier::Hot);
self.record_access(namespace, id, StorageTier::Hot);
}
self.stats
.hot_count
.fetch_add(count as u64, Ordering::Relaxed);
Ok(count)
}
async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
let mut results = Vec::with_capacity(ids.len());
let mut remaining_ids: Vec<VectorId> = ids.to_vec();
let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
Ok(v) => v,
Err(DakeraError::NamespaceNotFound(_)) => vec![],
Err(e) => return Err(e),
};
for v in &hot_results {
self.record_access(namespace, &v.id, StorageTier::Hot);
}
let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
remaining_ids.retain(|id| !found_ids.contains(id));
results.extend(hot_results);
if remaining_ids.is_empty() {
return Ok(results);
}
let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
Ok(v) => v,
Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
Err(e) => return Err(e),
};
for v in &warm_results {
self.record_access(namespace, &v.id, StorageTier::Warm);
}
let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
remaining_ids.retain(|id| !found_ids.contains(id));
results.extend(warm_results);
if remaining_ids.is_empty() {
return Ok(results);
}
let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
Ok(v) => v,
Err(DakeraError::NamespaceNotFound(_)) => vec![],
Err(e) => return Err(e),
};
for v in &cold_results {
self.record_access(namespace, &v.id, StorageTier::Cold);
}
results.extend(cold_results);
Ok(results)
}
async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
let mut seen = std::collections::HashSet::new();
let mut results = Vec::new();
let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
match res {
Ok(v) => Ok(v),
Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
Err(e) => Err(e),
}
};
for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
if seen.insert(v.id.clone()) {
results.push(v);
}
}
for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
if seen.insert(v.id.clone()) {
results.push(v);
}
}
for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
if seen.insert(v.id.clone()) {
results.push(v);
}
}
Ok(results)
}
async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
let mut deleted = 0;
match self.hot_storage.delete(namespace, ids).await {
Ok(n) => deleted += n,
Err(DakeraError::NamespaceNotFound(_)) => {}
Err(e) => return Err(e),
}
match self.warm_storage.delete(namespace, ids).await {
Ok(n) => deleted += n,
Err(DakeraError::NamespaceNotFound(_)) => {}
Err(e) => return Err(e),
}
match self.cold_storage.delete(namespace, ids).await {
Ok(n) => deleted += n,
Err(DakeraError::NamespaceNotFound(_)) => {}
Err(e) => return Err(e),
}
{
let mut access_map = self.access_info.write();
for id in ids {
access_map.remove(&(namespace.clone(), id.clone()));
}
}
Ok(deleted)
}
async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
Ok(self.hot_storage.namespace_exists(namespace).await?
|| self.warm_storage.namespace_exists(namespace).await?
|| self.cold_storage.namespace_exists(namespace).await?)
}
async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
self.hot_storage.ensure_namespace(namespace).await?;
self.warm_storage.ensure_namespace(namespace).await?;
self.cold_storage.ensure_namespace(namespace).await?;
Ok(())
}
async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
let cold = self.cold_storage.count(namespace).await?;
if cold > 0 {
return Ok(cold);
}
let hot = self.hot_storage.count(namespace).await?;
let warm = self.warm_storage.count(namespace).await?;
Ok(hot + warm)
}
async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
if let Some(dim) = self.hot_storage.dimension(namespace).await? {
return Ok(Some(dim));
}
if let Some(dim) = self.warm_storage.dimension(namespace).await? {
return Ok(Some(dim));
}
self.cold_storage.dimension(namespace).await
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
let mut namespaces = std::collections::HashSet::new();
namespaces.extend(self.hot_storage.list_namespaces().await?);
namespaces.extend(self.warm_storage.list_namespaces().await?);
namespaces.extend(self.cold_storage.list_namespaces().await?);
Ok(namespaces.into_iter().collect())
}
async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
{
let mut access_map = self.access_info.write();
access_map.retain(|(ns, _), _| ns != namespace);
}
Ok(hot_deleted || warm_deleted || cold_deleted)
}
async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
let mut total = 0;
total += self.hot_storage.cleanup_expired(namespace).await?;
total += self.warm_storage.cleanup_expired(namespace).await?;
total += self.cold_storage.cleanup_expired(namespace).await?;
Ok(total)
}
async fn cleanup_all_expired(&self) -> Result<usize> {
let mut total = 0;
total += self.hot_storage.cleanup_all_expired().await?;
total += self.warm_storage.cleanup_all_expired().await?;
total += self.cold_storage.cleanup_all_expired().await?;
Ok(total)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemoryStorage;
fn create_test_vector(id: &str, dim: usize) -> Vector {
Vector {
id: id.to_string(),
values: vec![1.0; dim],
metadata: None,
ttl_seconds: None,
expires_at: None,
}
}
#[tokio::test]
async fn test_tiered_storage_basic() {
let config = TieredStorageConfig::default();
let storage = TieredStorage::new(
config,
InMemoryStorage::new(),
InMemoryStorage::new(),
InMemoryStorage::new(),
);
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
let vectors = vec![create_test_vector("v1", 4)];
let count = storage.upsert(&namespace, vectors).await.unwrap();
assert_eq!(count, 1);
let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
assert_eq!(results.len(), 1);
let stats = storage.stats();
assert_eq!(stats.hot_hits, 2); }
#[tokio::test]
async fn test_tiered_storage_promotion_demotion() {
let config = TieredStorageConfig::default();
let storage = TieredStorage::new(
config,
InMemoryStorage::new(),
InMemoryStorage::new(),
InMemoryStorage::new(),
);
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
storage
.upsert(&namespace, vec![create_test_vector("v1", 4)])
.await
.unwrap();
assert_eq!(
storage.get_tier(&namespace, &"v1".to_string()),
Some(StorageTier::Hot)
);
let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
assert!(demoted);
assert_eq!(
storage.get_tier(&namespace, &"v1".to_string()),
Some(StorageTier::Warm)
);
let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
assert!(demoted);
assert_eq!(
storage.get_tier(&namespace, &"v1".to_string()),
Some(StorageTier::Cold)
);
let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
assert_eq!(results.len(), 1);
let stats = storage.stats();
assert_eq!(stats.demotions_to_warm, 1);
assert_eq!(stats.demotions_to_cold, 1);
}
#[tokio::test]
async fn test_tiered_storage_multi_tier_get() {
let config = TieredStorageConfig::default();
let storage = TieredStorage::new(
config,
InMemoryStorage::new(),
InMemoryStorage::new(),
InMemoryStorage::new(),
);
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
for i in 0..3 {
storage
.upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
.await
.unwrap();
}
storage.demote(&namespace, &"v1".to_string()).await.unwrap();
storage.demote(&namespace, &"v2".to_string()).await.unwrap();
storage.demote(&namespace, &"v2".to_string()).await.unwrap();
let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
let results = storage.get(&namespace, &ids).await.unwrap();
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn test_tier_distribution() {
let config = TieredStorageConfig::default();
let storage = TieredStorage::new(
config,
InMemoryStorage::new(),
InMemoryStorage::new(),
InMemoryStorage::new(),
);
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
for i in 0..5 {
storage
.upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
.await
.unwrap();
}
storage.demote(&namespace, &"v3".to_string()).await.unwrap();
storage.demote(&namespace, &"v4".to_string()).await.unwrap();
storage.demote(&namespace, &"v4".to_string()).await.unwrap();
let dist = storage.tier_distribution(&namespace);
assert_eq!(dist.hot, 3);
assert_eq!(dist.warm, 1);
assert_eq!(dist.cold, 1);
}
}