pub struct CacheManager { /* private fields */ }Expand description
Cache Manager - Unified operations across L1 and L2
Implementations§
Source§impl CacheManager
impl CacheManager
Sourcepub async fn new_with_backends(
l1_cache: Arc<dyn CacheBackend>,
l2_cache: Arc<dyn L2CacheBackend>,
streaming_backend: Option<Arc<dyn StreamingBackend>>,
) -> Result<Self>
pub async fn new_with_backends( l1_cache: Arc<dyn CacheBackend>, l2_cache: Arc<dyn L2CacheBackend>, streaming_backend: Option<Arc<dyn StreamingBackend>>, ) -> Result<Self>
Create new cache manager with trait objects (pluggable backends)
This is the primary constructor for v0.3.0+, supporting custom cache backends.
§Arguments
l1_cache- Any L1 cache backend implementingCacheBackendtraitl2_cache- Any L2 cache backend implementingL2CacheBackendtraitstreaming_backend- Optional streaming backend (None to disable streaming)
§Example
use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
use std::sync::Arc;
let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
let manager = CacheManager::new_with_backends(l1, l2, None).await?;Sourcepub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self>
pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self>
Create new cache manager with default backends (backward compatible)
This is the legacy constructor maintained for backward compatibility.
New code should prefer new_with_backends() or CacheSystemBuilder.
§Arguments
l1_cache- Moka L1 cache instancel2_cache- Redis L2 cache instance
Sourcepub async fn new_with_invalidation(
l1_cache: Arc<L1Cache>,
l2_cache: Arc<L2Cache>,
redis_url: &str,
config: InvalidationConfig,
) -> Result<Self>
pub async fn new_with_invalidation( l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>, redis_url: &str, config: InvalidationConfig, ) -> Result<Self>
Create new cache manager with invalidation support
This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
§Arguments
l1_cache- Moka L1 cache instancel2_cache- Redis L2 cache instanceredis_url- Redis connection URL for Pub/Subconfig- Invalidation configuration
§Example
use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
let config = InvalidationConfig {
channel: "my_app:cache:invalidate".to_string(),
..Default::default()
};
let manager = CacheManager::new_with_invalidation(
l1, l2, "redis://localhost", config
).await?;Sourcepub async fn get(&self, key: &str) -> Result<Option<Value>>
pub async fn get(&self, key: &str) -> Result<Option<Value>>
Get value from cache (L1 first, then L2 fallback with promotion)
This method now includes built-in Cache Stampede protection when cache misses occur. Multiple concurrent requests for the same missing key will be coalesced to prevent unnecessary duplicate work on external data sources.
§Arguments
key- Cache key to retrieve
§Returns
Ok(Some(value))- Cache hit, value found in L1 or L2Ok(None)- Cache miss, value not found in either cacheErr(error)- Cache operation failed
Sourcepub async fn set_with_strategy(
&self,
key: &str,
value: Value,
strategy: CacheStrategy,
) -> Result<()>
pub async fn set_with_strategy( &self, key: &str, value: Value, strategy: CacheStrategy, ) -> Result<()>
Get value from cache with fallback computation (enhanced backward compatibility)
This is a convenience method that combines get() with optional computation.
If the value is not found in cache, it will execute the compute function
and cache the result automatically.
§Arguments
key- Cache keycompute_fn- Optional function to compute value if not in cachestrategy- Cache strategy for storing computed value (default: ShortTerm)
§Returns
Ok(Some(value))- Value found in cache or computed successfullyOk(None)- Value not in cache and no compute function providedErr(error)- Cache operation or computation failed
§Example
// Simple cache get (existing behavior)
let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
// Get with computation fallback (new enhanced behavior)
let api_data = cache_manager.get_with_fallback(
"api_response",
Some(|| async { fetch_data_from_api().await }),
Some(CacheStrategy::RealTime)
).await?;Set value with specific cache strategy (both L1 and L2)
Sourcepub async fn get_or_compute_with<F, Fut>(
&self,
key: &str,
strategy: CacheStrategy,
compute_fn: F,
) -> Result<Value>
pub async fn get_or_compute_with<F, Fut>( &self, key: &str, strategy: CacheStrategy, compute_fn: F, ) -> Result<Value>
Get or compute value with Cache Stampede protection across L1+L2+Compute
This method provides comprehensive Cache Stampede protection:
- Check L1 cache first (uses Moka’s built-in coalescing)
- Check L2 cache with mutex-based coalescing
- Compute fresh data with protection against concurrent computations
§Arguments
key- Cache keystrategy- Cache strategy for TTL and storage behaviorcompute_fn- Async function to compute the value if not in any cache
§Example
let api_data = cache_manager.get_or_compute_with(
"api_response",
CacheStrategy::RealTime,
|| async {
fetch_data_from_api().await
}
).await?;Sourcepub async fn get_or_compute_typed<T, F, Fut>(
&self,
key: &str,
strategy: CacheStrategy,
compute_fn: F,
) -> Result<T>
pub async fn get_or_compute_typed<T, F, Fut>( &self, key: &str, strategy: CacheStrategy, compute_fn: F, ) -> Result<T>
Get or compute typed value with Cache Stampede protection (Type-Safe Version)
This method provides the same functionality as get_or_compute_with() but with
type-safe automatic serialization/deserialization. Perfect for database queries,
API calls, or any computation that returns structured data.
§Type Safety
- Returns your actual type
Tinstead ofserde_json::Value - Compiler enforces Serialize + DeserializeOwned bounds
- No manual JSON conversion needed
§Cache Flow
- Check L1 cache → deserialize if found
- Check L2 cache → deserialize + promote to L1 if found
- Execute compute_fn → serialize → store in L1+L2
- Full stampede protection (only ONE request computes)
§Arguments
key- Cache keystrategy- Cache strategy for TTLcompute_fn- Async function returningResult<T>
§Example - Database Query
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct User {
id: i64,
name: String,
}
// Type-safe database caching
let user: User = cache_manager.get_or_compute_typed(
"user:123",
CacheStrategy::MediumTerm,
|| async {
// Your database query here
sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
.bind(123)
.fetch_one(&pool)
.await
}
).await?;§Example - API Call
#[derive(Serialize, Deserialize)]
struct ApiResponse {
data: String,
timestamp: i64,
}
let response: ApiResponse = cache_manager.get_or_compute_typed(
"api:endpoint",
CacheStrategy::RealTime,
|| async {
reqwest::get("https://api.example.com/data")
.await?
.json::<ApiResponse>()
.await
}
).await?;§Performance
- L1 hit: <1ms + deserialization (~10-50μs for small structs)
- L2 hit: 2-5ms + deserialization + L1 promotion
- Compute: Your function time + serialization + L1+L2 storage
- Stampede protection: 99.6% latency reduction under high concurrency
§Errors
Returns error if:
- Compute function fails
- Serialization fails (invalid type for JSON)
- Deserialization fails (cache data doesn’t match type T)
- Cache operations fail (Redis connection issues)
Sourcepub fn get_stats(&self) -> CacheManagerStats
pub fn get_stats(&self) -> CacheManagerStats
Get comprehensive cache statistics
Sourcepub async fn publish_to_stream(
&self,
stream_key: &str,
fields: Vec<(String, String)>,
maxlen: Option<usize>,
) -> Result<String>
pub async fn publish_to_stream( &self, stream_key: &str, fields: Vec<(String, String)>, maxlen: Option<usize>, ) -> Result<String>
Sourcepub async fn read_stream_latest(
&self,
stream_key: &str,
count: usize,
) -> Result<Vec<(String, Vec<(String, String)>)>>
pub async fn read_stream_latest( &self, stream_key: &str, count: usize, ) -> Result<Vec<(String, Vec<(String, String)>)>>
Sourcepub async fn read_stream(
&self,
stream_key: &str,
last_id: &str,
count: usize,
block_ms: Option<usize>,
) -> Result<Vec<(String, Vec<(String, String)>)>>
pub async fn read_stream( &self, stream_key: &str, last_id: &str, count: usize, block_ms: Option<usize>, ) -> Result<Vec<(String, Vec<(String, String)>)>>
Read from Redis Stream with optional blocking
§Arguments
stream_key- Name of the streamlast_id- Last ID seen (“0” for start, “$” for new only)count- Max entries to retrieveblock_ms- Optional blocking timeout in ms
§Returns
Vector of (entry_id, fields) tuples
§Errors
Returns error if streaming backend is not configured
Sourcepub async fn invalidate(&self, key: &str) -> Result<()>
pub async fn invalidate(&self, key: &str) -> Result<()>
Invalidate a cache key across all instances
This removes the key from both L1 and L2 caches and broadcasts the invalidation to all other cache instances via Redis Pub/Sub.
§Arguments
key- Cache key to invalidate
§Example
// Invalidate user cache after profile update
cache_manager.invalidate("user:123").await?;Sourcepub async fn update_cache(
&self,
key: &str,
value: Value,
ttl: Option<Duration>,
) -> Result<()>
pub async fn update_cache( &self, key: &str, value: Value, ttl: Option<Duration>, ) -> Result<()>
Update cache value across all instances
This updates the key in both L1 and L2 caches and broadcasts the update to all other cache instances, avoiding cache misses.
§Arguments
key- Cache key to updatevalue- New valuettl- Optional TTL (uses default if None)
§Example
// Update user cache with new data
let user_data = serde_json::json!({"id": 123, "name": "Alice"});
cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;Sourcepub async fn invalidate_pattern(&self, pattern: &str) -> Result<()>
pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()>
Invalidate all keys matching a pattern
This scans L2 cache for keys matching the pattern, removes them, and broadcasts the invalidation. L1 caches will be cleared via broadcast.
§Arguments
pattern- Glob-style pattern (e.g., “user:”, “product:123:”)
§Example
// Invalidate all user caches
cache_manager.invalidate_pattern("user:*").await?;
// Invalidate specific user's related caches
cache_manager.invalidate_pattern("user:123:*").await?;Sourcepub async fn set_with_broadcast(
&self,
key: &str,
value: Value,
strategy: CacheStrategy,
) -> Result<()>
pub async fn set_with_broadcast( &self, key: &str, value: Value, strategy: CacheStrategy, ) -> Result<()>
Set value with automatic broadcast to all instances
This is a write-through operation that updates the cache and broadcasts the update to all other instances automatically.
§Arguments
key- Cache keyvalue- Value to cachestrategy- Cache strategy (determines TTL)
§Example
// Update and broadcast in one call
let data = serde_json::json!({"status": "active"});
cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;Sourcepub fn get_invalidation_stats(&self) -> Option<InvalidationStats>
pub fn get_invalidation_stats(&self) -> Option<InvalidationStats>
Get invalidation statistics
Returns statistics about invalidation operations if invalidation is enabled.