1use async_trait::async_trait;
9use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use crate::traits::VectorStorage;
17
18#[derive(Debug, Clone)]
20pub struct TieredStorageConfig {
21 pub hot_tier_capacity: usize,
23 pub hot_to_warm_threshold: Duration,
25 pub warm_to_cold_threshold: Duration,
27 pub auto_tier_enabled: bool,
29 pub tier_check_interval: Duration,
31}
32
33impl Default for TieredStorageConfig {
34 fn default() -> Self {
35 Self {
36 hot_tier_capacity: 100_000,
37 hot_to_warm_threshold: Duration::from_secs(3600), warm_to_cold_threshold: Duration::from_secs(86400), auto_tier_enabled: true,
40 tier_check_interval: Duration::from_secs(300), }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum StorageTier {
48 Hot,
50 Warm,
52 Cold,
54}
55
56impl StorageTier {
57 pub fn as_str(&self) -> &'static str {
58 match self {
59 StorageTier::Hot => "hot",
60 StorageTier::Warm => "warm",
61 StorageTier::Cold => "cold",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68struct AccessInfo {
69 last_access: Instant,
71 access_count: u64,
73 tier: StorageTier,
75}
76
77impl Default for AccessInfo {
78 fn default() -> Self {
79 Self {
80 last_access: Instant::now(),
81 access_count: 0,
82 tier: StorageTier::Hot,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct TieredStorageStats {
90 pub hot_count: u64,
92 pub warm_count: u64,
94 pub cold_count: u64,
96 pub hot_hits: u64,
98 pub warm_hits: u64,
100 pub cold_hits: u64,
102 pub promotions_to_hot: u64,
104 pub demotions_to_warm: u64,
106 pub demotions_to_cold: u64,
108}
109
110pub struct TieredStorage<H, W, C> {
112 config: TieredStorageConfig,
114 hot_storage: H,
116 warm_storage: W,
118 cold_storage: C,
120 access_info: RwLock<HashMap<(NamespaceId, VectorId), AccessInfo>>,
122 stats: TieredStorageStatsInner,
124}
125
126struct TieredStorageStatsInner {
127 hot_count: AtomicU64,
128 warm_count: AtomicU64,
129 cold_count: AtomicU64,
130 hot_hits: AtomicU64,
131 warm_hits: AtomicU64,
132 cold_hits: AtomicU64,
133 promotions_to_hot: AtomicU64,
134 demotions_to_warm: AtomicU64,
135 demotions_to_cold: AtomicU64,
136}
137
138impl Default for TieredStorageStatsInner {
139 fn default() -> Self {
140 Self {
141 hot_count: AtomicU64::new(0),
142 warm_count: AtomicU64::new(0),
143 cold_count: AtomicU64::new(0),
144 hot_hits: AtomicU64::new(0),
145 warm_hits: AtomicU64::new(0),
146 cold_hits: AtomicU64::new(0),
147 promotions_to_hot: AtomicU64::new(0),
148 demotions_to_warm: AtomicU64::new(0),
149 demotions_to_cold: AtomicU64::new(0),
150 }
151 }
152}
153
154impl<H, W, C> TieredStorage<H, W, C>
155where
156 H: VectorStorage,
157 W: VectorStorage,
158 C: VectorStorage,
159{
160 pub fn new(
162 config: TieredStorageConfig,
163 hot_storage: H,
164 warm_storage: W,
165 cold_storage: C,
166 ) -> Self {
167 Self {
168 config,
169 hot_storage,
170 warm_storage,
171 cold_storage,
172 access_info: RwLock::new(HashMap::new()),
173 stats: TieredStorageStatsInner::default(),
174 }
175 }
176
177 pub fn config(&self) -> &TieredStorageConfig {
179 &self.config
180 }
181
182 fn record_access(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
184 let key = (namespace.clone(), id.clone());
185 let mut access_map = self.access_info.write();
186
187 let info = access_map.entry(key).or_default();
188 info.last_access = Instant::now();
189 info.access_count += 1;
190 info.tier = tier;
191
192 match tier {
194 StorageTier::Hot => self.stats.hot_hits.fetch_add(1, Ordering::Relaxed),
195 StorageTier::Warm => self.stats.warm_hits.fetch_add(1, Ordering::Relaxed),
196 StorageTier::Cold => self.stats.cold_hits.fetch_add(1, Ordering::Relaxed),
197 };
198 }
199
200 fn get_tier(&self, namespace: &NamespaceId, id: &VectorId) -> Option<StorageTier> {
202 let access_map = self.access_info.read();
203 access_map
204 .get(&(namespace.clone(), id.clone()))
205 .map(|info| info.tier)
206 }
207
208 pub async fn promote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
210 let current_tier = self.get_tier(namespace, id);
211
212 match current_tier {
213 Some(StorageTier::Warm) => {
214 let vectors = self
216 .warm_storage
217 .get(namespace, std::slice::from_ref(id))
218 .await?;
219 if !vectors.is_empty() {
220 self.hot_storage.upsert(namespace, vectors).await?;
221 self.warm_storage
222 .delete(namespace, std::slice::from_ref(id))
223 .await?;
224
225 self.update_tier(namespace, id, StorageTier::Hot);
226 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
227 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
228 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
229
230 return Ok(true);
231 }
232 }
233 Some(StorageTier::Cold) => {
234 let vectors = self
236 .cold_storage
237 .get(namespace, std::slice::from_ref(id))
238 .await?;
239 if !vectors.is_empty() {
240 let should_be_hot = {
242 let access_map = self.access_info.read();
243 access_map
244 .get(&(namespace.clone(), id.clone()))
245 .map(|info| info.access_count > 10)
246 .unwrap_or(false)
247 };
248
249 if should_be_hot {
250 self.hot_storage.upsert(namespace, vectors).await?;
251 self.update_tier(namespace, id, StorageTier::Hot);
252 self.stats.promotions_to_hot.fetch_add(1, Ordering::Relaxed);
253 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
254 } else {
255 self.warm_storage.upsert(namespace, vectors).await?;
256 self.update_tier(namespace, id, StorageTier::Warm);
257 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
258 }
259 return Ok(true);
264 }
265 }
266 _ => {}
267 }
268
269 Ok(false)
270 }
271
272 pub async fn demote(&self, namespace: &NamespaceId, id: &VectorId) -> Result<bool> {
274 let current_tier = self.get_tier(namespace, id);
275
276 match current_tier {
277 Some(StorageTier::Hot) => {
278 let vectors = self
280 .hot_storage
281 .get(namespace, std::slice::from_ref(id))
282 .await?;
283 if !vectors.is_empty() {
284 self.warm_storage.upsert(namespace, vectors).await?;
285 self.hot_storage
286 .delete(namespace, std::slice::from_ref(id))
287 .await?;
288
289 self.update_tier(namespace, id, StorageTier::Warm);
290 self.stats.demotions_to_warm.fetch_add(1, Ordering::Relaxed);
291 self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
292 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
293
294 return Ok(true);
295 }
296 }
297 Some(StorageTier::Warm) => {
298 let vectors = self
300 .warm_storage
301 .get(namespace, std::slice::from_ref(id))
302 .await?;
303 if !vectors.is_empty() {
304 self.cold_storage.upsert(namespace, vectors).await?;
305 self.warm_storage
306 .delete(namespace, std::slice::from_ref(id))
307 .await?;
308
309 self.update_tier(namespace, id, StorageTier::Cold);
310 self.stats.demotions_to_cold.fetch_add(1, Ordering::Relaxed);
311 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
312 self.stats.cold_count.fetch_add(1, Ordering::Relaxed);
313
314 return Ok(true);
315 }
316 }
317 _ => {}
318 }
319
320 Ok(false)
321 }
322
323 fn update_tier(&self, namespace: &NamespaceId, id: &VectorId, tier: StorageTier) {
325 let mut access_map = self.access_info.write();
326 let key = (namespace.clone(), id.clone());
327 let info = access_map.entry(key).or_default();
328 info.tier = tier;
329 }
330
331 fn is_permanent_storage_error(error: &DakeraError) -> bool {
332 match error {
333 DakeraError::NamespaceNotFound(_) => true,
334 DakeraError::Storage(msg) => {
335 msg.contains("PermissionDenied") || msg.contains("permission denied")
336 }
337 _ => false,
338 }
339 }
340
341 pub async fn run_auto_tiering(&self) -> Result<TieringResult> {
343 if !self.config.auto_tier_enabled {
344 return Ok(TieringResult::default());
345 }
346
347 let now = Instant::now();
348 let mut to_demote_to_warm = Vec::new();
349 let mut to_demote_to_cold = Vec::new();
350
351 {
353 let access_map = self.access_info.read();
354 for ((namespace, id), info) in access_map.iter() {
355 let elapsed = now.duration_since(info.last_access);
356
357 match info.tier {
358 StorageTier::Hot if elapsed > self.config.hot_to_warm_threshold => {
359 to_demote_to_warm.push((namespace.clone(), id.clone()));
360 }
361 StorageTier::Warm if elapsed > self.config.warm_to_cold_threshold => {
362 to_demote_to_cold.push((namespace.clone(), id.clone()));
363 }
364 _ => {}
365 }
366 }
367 }
368
369 let mut demoted_to_warm = 0;
372 let mut demoted_to_cold = 0;
373 let mut stale_entries = Vec::new();
374
375 for (namespace, id) in to_demote_to_warm {
376 match self.demote(&namespace, &id).await {
377 Ok(true) => demoted_to_warm += 1,
378 Ok(false) => {}
379 Err(e) => {
380 if Self::is_permanent_storage_error(&e) {
381 stale_entries.push((namespace, id));
382 } else {
383 tracing::debug!(error = %e, "Transient demotion error, will retry next cycle");
384 }
385 }
386 }
387 }
388
389 for (namespace, id) in to_demote_to_cold {
390 match self.demote(&namespace, &id).await {
391 Ok(true) => demoted_to_cold += 1,
392 Ok(false) => {}
393 Err(e) => {
394 if Self::is_permanent_storage_error(&e) {
395 stale_entries.push((namespace, id));
396 } else {
397 tracing::debug!(error = %e, "Transient demotion error, will retry next cycle");
398 }
399 }
400 }
401 }
402
403 if !stale_entries.is_empty() {
404 let removed = stale_entries.len();
405 let mut access_map = self.access_info.write();
406 for (ns, id) in &stale_entries {
407 access_map.remove(&(ns.clone(), id.clone()));
408 }
409 tracing::warn!(
410 removed = removed,
411 "Purged stale entries from tiering queue (deleted namespace or permanent error)"
412 );
413 }
414
415 Ok(TieringResult {
416 demoted_to_warm,
417 demoted_to_cold,
418 promoted_to_hot: 0,
419 promoted_to_warm: 0,
420 })
421 }
422
423 pub fn stats(&self) -> TieredStorageStats {
425 TieredStorageStats {
426 hot_count: self.stats.hot_count.load(Ordering::Relaxed),
427 warm_count: self.stats.warm_count.load(Ordering::Relaxed),
428 cold_count: self.stats.cold_count.load(Ordering::Relaxed),
429 hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
430 warm_hits: self.stats.warm_hits.load(Ordering::Relaxed),
431 cold_hits: self.stats.cold_hits.load(Ordering::Relaxed),
432 promotions_to_hot: self.stats.promotions_to_hot.load(Ordering::Relaxed),
433 demotions_to_warm: self.stats.demotions_to_warm.load(Ordering::Relaxed),
434 demotions_to_cold: self.stats.demotions_to_cold.load(Ordering::Relaxed),
435 }
436 }
437
438 pub fn tier_distribution(&self, namespace: &NamespaceId) -> TierDistribution {
440 let access_map = self.access_info.read();
441 let mut hot = 0u64;
442 let mut warm = 0u64;
443 let mut cold = 0u64;
444
445 for ((ns, _), info) in access_map.iter() {
446 if ns == namespace {
447 match info.tier {
448 StorageTier::Hot => hot += 1,
449 StorageTier::Warm => warm += 1,
450 StorageTier::Cold => cold += 1,
451 }
452 }
453 }
454
455 TierDistribution { hot, warm, cold }
456 }
457}
458
459#[derive(Debug, Clone, Default)]
461pub struct TieringResult {
462 pub demoted_to_warm: u64,
464 pub demoted_to_cold: u64,
466 pub promoted_to_hot: u64,
468 pub promoted_to_warm: u64,
470}
471
472#[derive(Debug, Clone)]
474pub struct TierDistribution {
475 pub hot: u64,
476 pub warm: u64,
477 pub cold: u64,
478}
479
480#[async_trait]
481impl<H, W, C> VectorStorage for TieredStorage<H, W, C>
482where
483 H: VectorStorage,
484 W: VectorStorage,
485 C: VectorStorage + Clone + Send + Sync + 'static,
486{
487 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
488 let ids: Vec<VectorId> = vectors.iter().map(|v| v.id.clone()).collect();
490
491 let cold_vectors = vectors.clone();
493
494 let count = self.hot_storage.upsert(namespace, vectors).await?;
496
497 let cold = self.cold_storage.clone();
500 let cold_ns = namespace.clone();
501 tokio::spawn(async move {
502 if let Err(e) = cold.ensure_namespace(&cold_ns).await {
503 tracing::error!(
504 error = %e,
505 namespace = %cold_ns,
506 "Cold tier namespace ensure failed (S3 flush aborted)"
507 );
508 return;
509 }
510 let retry_vectors = cold_vectors.clone();
511 match cold.upsert(&cold_ns, cold_vectors).await {
512 Ok(_) => {}
513 Err(DakeraError::DimensionMismatch { expected, actual }) => {
514 tracing::warn!(
515 namespace = %cold_ns,
516 cold_dim = expected,
517 hot_dim = actual,
518 "Cold tier dimension mismatch — resetting stale cold namespace"
519 );
520 if let Err(e) = cold.delete_namespace(&cold_ns).await {
521 tracing::error!(error = %e, namespace = %cold_ns,
522 "Failed to delete stale cold namespace");
523 return;
524 }
525 if let Err(e) = cold.ensure_namespace(&cold_ns).await {
526 tracing::error!(error = %e, namespace = %cold_ns,
527 "Failed to recreate cold namespace after dimension fix");
528 return;
529 }
530 if let Err(e) = cold.upsert(&cold_ns, retry_vectors).await {
531 tracing::error!(
532 error = %e,
533 namespace = %cold_ns,
534 "Cold tier S3 flush failed after dimension fix — data is durable in hot tier"
535 );
536 }
537 }
538 Err(e) => {
539 tracing::error!(
540 error = %e,
541 namespace = %cold_ns,
542 "Cold tier S3 flush failed — data is durable in hot tier"
543 );
544 }
545 }
546 });
547
548 for id in &ids {
549 self.update_tier(namespace, id, StorageTier::Hot);
550 self.record_access(namespace, id, StorageTier::Hot);
551 }
552
553 self.stats
554 .hot_count
555 .fetch_add(count as u64, Ordering::Relaxed);
556 Ok(count)
557 }
558
559 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
560 let mut results = Vec::with_capacity(ids.len());
561 let mut remaining_ids: Vec<VectorId> = ids.to_vec();
562
563 let hot_results = match self.hot_storage.get(namespace, &remaining_ids).await {
565 Ok(v) => v,
566 Err(DakeraError::NamespaceNotFound(_)) => vec![],
567 Err(e) => return Err(e),
568 };
569 for v in &hot_results {
570 self.record_access(namespace, &v.id, StorageTier::Hot);
571 }
572
573 let found_ids: std::collections::HashSet<_> = hot_results.iter().map(|v| &v.id).collect();
575 remaining_ids.retain(|id| !found_ids.contains(id));
576 results.extend(hot_results);
577
578 if remaining_ids.is_empty() {
579 return Ok(results);
580 }
581
582 let warm_results = match self.warm_storage.get(namespace, &remaining_ids).await {
584 Ok(v) => v,
585 Err(common::DakeraError::NamespaceNotFound(_)) => vec![],
586 Err(e) => return Err(e),
587 };
588 for v in &warm_results {
589 self.record_access(namespace, &v.id, StorageTier::Warm);
590 }
591
592 let found_ids: std::collections::HashSet<_> = warm_results.iter().map(|v| &v.id).collect();
593 remaining_ids.retain(|id| !found_ids.contains(id));
594 results.extend(warm_results);
595
596 if remaining_ids.is_empty() {
597 return Ok(results);
598 }
599
600 let cold_results = match self.cold_storage.get(namespace, &remaining_ids).await {
602 Ok(v) => v,
603 Err(DakeraError::NamespaceNotFound(_)) => vec![],
604 Err(e) => return Err(e),
605 };
606 for v in &cold_results {
607 self.record_access(namespace, &v.id, StorageTier::Cold);
608 }
609 results.extend(cold_results);
610
611 Ok(results)
612 }
613
614 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
615 let mut seen = std::collections::HashSet::new();
616 let mut results = Vec::new();
617
618 let tier_get_all = |res: common::Result<Vec<Vector>>| -> common::Result<Vec<Vector>> {
620 match res {
621 Ok(v) => Ok(v),
622 Err(common::DakeraError::NamespaceNotFound(_)) => Ok(vec![]),
623 Err(e) => Err(e),
624 }
625 };
626
627 for v in tier_get_all(self.hot_storage.get_all(namespace).await)? {
631 if seen.insert(v.id.clone()) {
632 results.push(v);
633 }
634 }
635 for v in tier_get_all(self.warm_storage.get_all(namespace).await)? {
636 if seen.insert(v.id.clone()) {
637 results.push(v);
638 }
639 }
640 for v in tier_get_all(self.cold_storage.get_all(namespace).await)? {
641 if seen.insert(v.id.clone()) {
642 results.push(v);
643 }
644 }
645
646 Ok(results)
647 }
648
649 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
650 let mut deleted = 0;
651
652 match self.hot_storage.delete(namespace, ids).await {
655 Ok(n) => deleted += n,
656 Err(DakeraError::NamespaceNotFound(_)) => {}
657 Err(e) => return Err(e),
658 }
659 match self.warm_storage.delete(namespace, ids).await {
660 Ok(n) => deleted += n,
661 Err(DakeraError::NamespaceNotFound(_)) => {}
662 Err(e) => return Err(e),
663 }
664 match self.cold_storage.delete(namespace, ids).await {
665 Ok(n) => deleted += n,
666 Err(DakeraError::NamespaceNotFound(_)) => {}
667 Err(e) => return Err(e),
668 }
669
670 {
672 let mut access_map = self.access_info.write();
673 for id in ids {
674 access_map.remove(&(namespace.clone(), id.clone()));
675 }
676 }
677
678 Ok(deleted)
679 }
680
681 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
682 Ok(self.hot_storage.namespace_exists(namespace).await?
684 || self.warm_storage.namespace_exists(namespace).await?
685 || self.cold_storage.namespace_exists(namespace).await?)
686 }
687
688 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
689 self.hot_storage.ensure_namespace(namespace).await?;
691 self.warm_storage.ensure_namespace(namespace).await?;
692 self.cold_storage.ensure_namespace(namespace).await?;
693 Ok(())
694 }
695
696 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
697 let cold = self.cold_storage.count(namespace).await?;
702 if cold > 0 {
703 return Ok(cold);
704 }
705 let hot = self.hot_storage.count(namespace).await?;
707 let warm = self.warm_storage.count(namespace).await?;
708 Ok(hot + warm)
709 }
710
711 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
712 if let Some(dim) = self.hot_storage.dimension(namespace).await? {
714 return Ok(Some(dim));
715 }
716 if let Some(dim) = self.warm_storage.dimension(namespace).await? {
717 return Ok(Some(dim));
718 }
719 self.cold_storage.dimension(namespace).await
720 }
721
722 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
723 let mut namespaces = std::collections::HashSet::new();
724
725 namespaces.extend(self.hot_storage.list_namespaces().await?);
726 namespaces.extend(self.warm_storage.list_namespaces().await?);
727 namespaces.extend(self.cold_storage.list_namespaces().await?);
728
729 Ok(namespaces.into_iter().collect())
730 }
731
732 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
733 let hot_deleted = self.hot_storage.delete_namespace(namespace).await?;
735 let warm_deleted = self.warm_storage.delete_namespace(namespace).await?;
736 let cold_deleted = self.cold_storage.delete_namespace(namespace).await?;
737
738 {
740 let mut access_map = self.access_info.write();
741 access_map.retain(|(ns, _), _| ns != namespace);
742 }
743
744 Ok(hot_deleted || warm_deleted || cold_deleted)
745 }
746
747 async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
748 let mut total = 0;
750 total += self.hot_storage.cleanup_expired(namespace).await?;
751 total += self.warm_storage.cleanup_expired(namespace).await?;
752 total += self.cold_storage.cleanup_expired(namespace).await?;
753 Ok(total)
754 }
755
756 async fn cleanup_all_expired(&self) -> Result<usize> {
757 let mut total = 0;
759 total += self.hot_storage.cleanup_all_expired().await?;
760 total += self.warm_storage.cleanup_all_expired().await?;
761 total += self.cold_storage.cleanup_all_expired().await?;
762 Ok(total)
763 }
764}
765
766#[cfg(test)]
767mod tests {
768 use super::*;
769 use crate::memory::InMemoryStorage;
770
771 fn create_test_vector(id: &str, dim: usize) -> Vector {
772 Vector {
773 id: id.to_string(),
774 values: vec![1.0; dim],
775 metadata: None,
776 ttl_seconds: None,
777 expires_at: None,
778 }
779 }
780
781 #[tokio::test]
782 async fn test_tiered_storage_basic() {
783 let config = TieredStorageConfig::default();
784 let storage = TieredStorage::new(
785 config,
786 InMemoryStorage::new(),
787 InMemoryStorage::new(),
788 InMemoryStorage::new(),
789 );
790
791 let namespace = "test".to_string();
792 storage.ensure_namespace(&namespace).await.unwrap();
793
794 let vectors = vec![create_test_vector("v1", 4)];
796 let count = storage.upsert(&namespace, vectors).await.unwrap();
797 assert_eq!(count, 1);
798
799 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
801 assert_eq!(results.len(), 1);
802
803 let stats = storage.stats();
804 assert_eq!(stats.hot_hits, 2); }
806
807 #[tokio::test]
808 async fn test_tiered_storage_promotion_demotion() {
809 let config = TieredStorageConfig::default();
810 let storage = TieredStorage::new(
811 config,
812 InMemoryStorage::new(),
813 InMemoryStorage::new(),
814 InMemoryStorage::new(),
815 );
816
817 let namespace = "test".to_string();
818 storage.ensure_namespace(&namespace).await.unwrap();
819
820 storage
822 .upsert(&namespace, vec![create_test_vector("v1", 4)])
823 .await
824 .unwrap();
825
826 assert_eq!(
828 storage.get_tier(&namespace, &"v1".to_string()),
829 Some(StorageTier::Hot)
830 );
831
832 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
834 assert!(demoted);
835 assert_eq!(
836 storage.get_tier(&namespace, &"v1".to_string()),
837 Some(StorageTier::Warm)
838 );
839
840 let demoted = storage.demote(&namespace, &"v1".to_string()).await.unwrap();
842 assert!(demoted);
843 assert_eq!(
844 storage.get_tier(&namespace, &"v1".to_string()),
845 Some(StorageTier::Cold)
846 );
847
848 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
850 assert_eq!(results.len(), 1);
851
852 let stats = storage.stats();
853 assert_eq!(stats.demotions_to_warm, 1);
854 assert_eq!(stats.demotions_to_cold, 1);
855 }
856
857 #[tokio::test]
858 async fn test_tiered_storage_multi_tier_get() {
859 let config = TieredStorageConfig::default();
860 let storage = TieredStorage::new(
861 config,
862 InMemoryStorage::new(),
863 InMemoryStorage::new(),
864 InMemoryStorage::new(),
865 );
866
867 let namespace = "test".to_string();
868 storage.ensure_namespace(&namespace).await.unwrap();
869
870 for i in 0..3 {
872 storage
873 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
874 .await
875 .unwrap();
876 }
877
878 storage.demote(&namespace, &"v1".to_string()).await.unwrap();
880 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
881 storage.demote(&namespace, &"v2".to_string()).await.unwrap();
882
883 let ids: Vec<_> = (0..3).map(|i| format!("v{}", i)).collect();
885 let results = storage.get(&namespace, &ids).await.unwrap();
886 assert_eq!(results.len(), 3);
887 }
888
889 #[tokio::test]
890 async fn test_tier_distribution() {
891 let config = TieredStorageConfig::default();
892 let storage = TieredStorage::new(
893 config,
894 InMemoryStorage::new(),
895 InMemoryStorage::new(),
896 InMemoryStorage::new(),
897 );
898
899 let namespace = "test".to_string();
900 storage.ensure_namespace(&namespace).await.unwrap();
901
902 for i in 0..5 {
904 storage
905 .upsert(&namespace, vec![create_test_vector(&format!("v{}", i), 4)])
906 .await
907 .unwrap();
908 }
909
910 storage.demote(&namespace, &"v3".to_string()).await.unwrap();
912 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
913 storage.demote(&namespace, &"v4".to_string()).await.unwrap();
914
915 let dist = storage.tier_distribution(&namespace);
916 assert_eq!(dist.hot, 3);
917 assert_eq!(dist.warm, 1);
918 assert_eq!(dist.cold, 1);
919 }
920
921 #[tokio::test]
922 async fn test_auto_tiering_purges_stale_namespace_entries() {
923 let config = TieredStorageConfig {
924 auto_tier_enabled: true,
925 hot_to_warm_threshold: Duration::from_millis(0),
926 warm_to_cold_threshold: Duration::from_millis(0),
927 ..Default::default()
928 };
929 let storage = TieredStorage::new(
930 config,
931 InMemoryStorage::new(),
932 InMemoryStorage::new(),
933 InMemoryStorage::new(),
934 );
935
936 let namespace = "deleted_bench_ns".to_string();
937 storage.ensure_namespace(&namespace).await.unwrap();
938 storage
939 .upsert(&namespace, vec![create_test_vector("v1", 4)])
940 .await
941 .unwrap();
942
943 storage.delete_namespace(&namespace).await.unwrap();
945
946 {
948 let mut access_map = storage.access_info.write();
949 access_map.insert(
950 (namespace.clone(), "v1".to_string()),
951 AccessInfo {
952 last_access: Instant::now() - Duration::from_secs(7200),
953 access_count: 1,
954 tier: StorageTier::Hot,
955 },
956 );
957 }
958
959 let result = storage.run_auto_tiering().await.unwrap();
961 assert_eq!(result.demoted_to_warm, 0);
962
963 let access_map = storage.access_info.read();
964 assert!(
965 access_map.is_empty(),
966 "Stale entries should have been purged from tiering queue"
967 );
968 }
969
970 #[test]
971 fn test_is_permanent_storage_error() {
972 assert!(TieredStorage::<
973 InMemoryStorage,
974 InMemoryStorage,
975 InMemoryStorage,
976 >::is_permanent_storage_error(
977 &DakeraError::NamespaceNotFound("test".into())
978 ));
979 assert!(TieredStorage::<
980 InMemoryStorage,
981 InMemoryStorage,
982 InMemoryStorage,
983 >::is_permanent_storage_error(&DakeraError::Storage(
984 "PermissionDenied (permanent) at write".into()
985 )));
986 assert!(!TieredStorage::<
987 InMemoryStorage,
988 InMemoryStorage,
989 InMemoryStorage,
990 >::is_permanent_storage_error(
991 &DakeraError::Storage("connection timeout".into())
992 ));
993 assert!(!TieredStorage::<
994 InMemoryStorage,
995 InMemoryStorage,
996 InMemoryStorage,
997 >::is_permanent_storage_error(
998 &DakeraError::DimensionMismatch {
999 expected: 384,
1000 actual: 1024,
1001 }
1002 ));
1003 }
1004}