Skip to main content

CacheManager

Struct CacheManager 

Source
pub struct CacheManager { /* private fields */ }

Implementations§

Source§

impl CacheManager

Source

pub fn new_with_backends( l1_cache: Arc<dyn CacheBackend>, l2_cache: Arc<dyn L2CacheBackend>, streaming_backend: Option<Arc<dyn StreamingBackend>>, ) -> CacheResult<Self>

Create new cache manager with trait objects (pluggable backends)

This is the primary constructor for v0.3.0+, supporting custom cache backends.

§Arguments
  • l1_cache - Any L1 cache backend implementing CacheBackend trait
  • l2_cache - Any L2 cache backend implementing L2CacheBackend trait
  • streaming_backend - Optional streaming backend (None to disable streaming)
§Example
use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
use std::sync::Arc;

let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);

let manager = CacheManager::new_with_backends(l1, l2, None).await?;
§Errors

Returns Ok if successful. Currently no error conditions, but kept for future compatibility.

Source

pub async fn new( l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>, ) -> CacheResult<Self>

Available on crate features moka and redis only.

Create new cache manager with default backends (backward compatible)

This is the legacy constructor maintained for backward compatibility. New code should prefer new_with_backends() or CacheSystemBuilder.

§Arguments
  • l1_cache - Moka L1 cache instance
  • l2_cache - Redis L2 cache instance
§Errors

Returns an error if Redis connection fails.

Source

pub async fn new_with_invalidation( l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>, redis_url: &str, config: InvalidationConfig, ) -> CacheResult<Self>

Available on crate features moka and redis only.

Create new cache manager with invalidation support

This constructor enables cross-instance cache invalidation via Redis Pub/Sub.

§Arguments
  • l1_cache - Moka L1 cache instance
  • l2_cache - Redis L2 cache instance
  • redis_url - Redis connection URL for Pub/Sub
  • config - Invalidation configuration
§Example
use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};

let config = InvalidationConfig {
    channel: "my_app:cache:invalidate".to_string(),
    ..Default::default()
};

let manager = CacheManager::new_with_invalidation(
    l1, l2, "redis://localhost", config
).await?;
§Errors

Returns an error if Redis connection fails or invalidation setup fails.

Source

pub fn new_with_tiers( tiers: Vec<CacheTier>, streaming_backend: Option<Arc<dyn StreamingBackend>>, ) -> CacheResult<Self>

Create new cache manager with multi-tier architecture (v0.5.0+)

This constructor enables dynamic multi-tier caching with 3, 4, or more tiers. Tiers are checked in order (lower tier_level = faster/hotter).

§Arguments
  • tiers - Vector of configured cache tiers (must be sorted by tier_level ascending)
  • streaming_backend - Optional streaming backend
§Example
use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
use std::sync::Arc;

// L1 + L2 + L3 setup
let l1 = Arc::new(L1Cache::new()?);
let l2 = Arc::new(L2Cache::new().await?);
let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);

let tiers = vec![
    CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
    CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
    CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
];

let manager = CacheManager::new_with_tiers(tiers, None).await?;
§Errors

Returns an error if tiers are not sorted by level or if no tiers are provided.

Source

pub fn set_serializer(&mut self, serializer: CacheSerializer)

Set a custom serializer for the cache manager

Source

pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>>

Get value from cache (L1 first, then L2 fallback with promotion)

This method now includes built-in Cache Stampede protection when cache misses occur. Multiple concurrent requests for the same missing key will be coalesced to prevent unnecessary duplicate work on external data sources.

Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).

§Arguments
  • key - Cache key to retrieve
§Returns
  • Ok(Some(value)) - Cache hit, value found in any tier
  • Ok(None) - Cache miss, value not found in any cache
  • Err(error) - Cache operation failed
§Errors

Returns an error if cache operation fails.

§Panics

Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).

Source

pub async fn get_typed<T>(&self, key: &str) -> CacheResult<Option<T>>

Get a value from cache and deserialize it (Type-Safe Version)

§Errors

Returns a SerializationError if deserialization fails, or a BackendError if the cache retrieval fails.

Source

pub async fn set_with_strategy( &self, key: &str, value: Bytes, strategy: CacheStrategy, ) -> CacheResult<()>

Set value with specific cache strategy (all tiers)

Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+). In multi-tier mode, stores to ALL tiers with their respective TTL scaling.

§Errors

Returns an error if cache set operation fails.

Source

pub async fn get_or_compute_with<F, Fut>( &self, key: &str, strategy: CacheStrategy, compute_fn: F, ) -> CacheResult<Bytes>
where F: FnOnce() -> Fut + Send, Fut: Future<Output = CacheResult<Bytes>> + Send,

Get or compute value with Cache Stampede protection across L1+L2+Compute

This method provides comprehensive Cache Stampede protection:

  1. Check L1 cache first (uses Moka’s built-in coalescing)
  2. Check L2 cache with mutex-based coalescing
  3. Compute fresh data with protection against concurrent computations
§Arguments
  • key - Cache key
  • strategy - Cache strategy for TTL and storage behavior
  • compute_fn - Async function to compute the value if not in any cache
§Example
let api_data = cache_manager.get_or_compute_with(
    "api_response",
    CacheStrategy::RealTime,
    || async {
        fetch_data_from_api().await
    }
).await?;
§Errors

Returns an error if compute function fails or cache operations fail.

Source

pub async fn get_or_compute_typed<T, F, Fut>( &self, key: &str, strategy: CacheStrategy, compute_fn: F, ) -> CacheResult<T>
where T: Serialize + DeserializeOwned + Send + Sync + 'static, F: FnOnce() -> Fut + Send, Fut: Future<Output = CacheResult<T>> + Send,

Get or compute typed value with Cache Stampede protection (Type-Safe Version)

This method provides the same functionality as get_or_compute_with() but with type-safe automatic serialization/deserialization. Perfect for database queries, API calls, or any computation that returns structured data.

§Type Safety
  • Returns your actual type T instead of serde_json::Value
  • Compiler enforces Serialize + DeserializeOwned bounds
  • No manual JSON conversion needed
§Cache Flow
  1. Check L1 cache → deserialize if found
  2. Check L2 cache → deserialize + promote to L1 if found
  3. Execute compute_fn → serialize → store in L1+L2
  4. Full stampede protection (only ONE request computes)
§Arguments
  • key - Cache key
  • strategy - Cache strategy for TTL
  • compute_fn - Async function returning Result<T>
§Example - Database Query

#[derive(Serialize, Deserialize)]
struct User {
    id: i64,
    name: String,
}

// Type-safe database caching (example - requires sqlx)
// let user: User = cache_manager.get_or_compute_typed(
//     "user:123",
//     CacheStrategy::MediumTerm,
//     || async {
//         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
//             .bind(123)
//             .fetch_one(&pool)
//             .await
//     }
// ).await?;
§Example - API Call
#[derive(Serialize, Deserialize)]
struct ApiResponse {
    data: String,
    timestamp: i64,
}

// API call caching (example - requires reqwest)
// let response: ApiResponse = cache_manager.get_or_compute_typed(
//     "api:endpoint",
//     CacheStrategy::RealTime,
//     || async {
//         reqwest::get("https://api.example.com/data")
//             .await?
//             .json::<ApiResponse>()
//             .await
//     }
// ).await?;
§Performance
  • L1 hit: <1ms + deserialization (~10-50μs for small structs)
  • L2 hit: 2-5ms + deserialization + L1 promotion
  • Compute: Your function time + serialization + L1+L2 storage
  • Stampede protection: 99.6% latency reduction under high concurrency
§Errors

Returns error if:

  • Compute function fails
  • Serialization fails (invalid type for JSON)
  • Deserialization fails (cache data doesn’t match type T)
  • Cache operations fail (Redis connection issues)
Source

pub fn get_stats(&self) -> CacheManagerStats

Get comprehensive cache statistics

In multi-tier mode, aggregates statistics from all tiers. In legacy mode, returns L1 and L2 stats.

Source

pub fn get_tier_stats(&self) -> Vec<TierStats>

Get per-tier statistics (v0.5.0+)

Returns statistics for each tier if multi-tier mode is enabled. Returns None if using legacy 2-tier mode.

§Example
if let Some(tier_stats) = cache_manager.get_tier_stats() {
    for stats in tier_stats {
        println!("L{}: {} hits ({})",
                 stats.tier_level,
                 stats.hit_count(),
                 stats.backend_name);
    }
}
Source§

impl CacheManager

Source

pub async fn publish_to_stream( &self, stream_key: &str, fields: Vec<(String, String)>, maxlen: Option<usize>, ) -> CacheResult<String>

Publish data to Redis Stream

§Arguments
  • stream_key - Name of the stream (e.g., “events_stream”)
  • fields - Field-value pairs to publish
  • maxlen - Optional max length for stream trimming
§Returns

The entry ID generated by Redis

§Errors

Returns error if streaming backend is not configured

Source

pub async fn read_stream_latest( &self, stream_key: &str, count: usize, ) -> CacheResult<Vec<(String, Vec<(String, String)>)>>

Read latest entries from Redis Stream

§Arguments
  • stream_key - Name of the stream
  • count - Number of latest entries to retrieve
§Returns

Vector of (entry_id, fields) tuples (newest first)

§Errors

Returns error if streaming backend is not configured

Source

pub async fn read_stream( &self, stream_key: &str, last_id: &str, count: usize, block_ms: Option<usize>, ) -> CacheResult<Vec<(String, Vec<(String, String)>)>>

Read from Redis Stream with optional blocking

§Arguments
  • stream_key - Name of the stream
  • last_id - Last ID seen (“0” for start, “$” for new only)
  • count - Max entries to retrieve
  • block_ms - Optional blocking timeout in ms
§Returns

Vector of (entry_id, fields) tuples

§Errors

Returns error if streaming backend is not configured

Source

pub async fn invalidate(&self, key: &str) -> CacheResult<()>

Invalidate a cache key across all instances

This removes the key from all cache tiers and broadcasts the invalidation to all other cache instances via Redis Pub/Sub.

Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).

§Arguments
  • key - Cache key to invalidate
§Example
// Invalidate user cache after profile update
cache_manager.invalidate("user:123").await?;
§Errors

Returns an error if invalidation fails.

Source

pub async fn update_cache( &self, key: &str, value: Bytes, ttl: Option<Duration>, ) -> CacheResult<()>

Update cache value across all instances

This updates the key in all cache tiers and broadcasts the update to all other cache instances, avoiding cache misses.

Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).

§Arguments
  • key - Cache key to update
  • value - New value
  • ttl - Optional TTL (uses default if None)
§Example
// Update user cache with new data
let user_data = Bytes::from("alice");
cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
§Errors

Returns an error if cache update fails.

Source

pub async fn invalidate_pattern(&self, pattern: &str) -> CacheResult<()>

Invalidate all keys matching a pattern

This scans L2 cache for keys matching the pattern, removes them from all tiers, and broadcasts the invalidation. L1 caches will be cleared via broadcast.

Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).

Note: Pattern scanning requires a concrete L2Cache instance with scan_keys(). In multi-tier mode, this scans from L2 but removes from all tiers.

§Arguments
  • pattern - Glob-style pattern (e.g., “user:”, “product:123:”)
§Example
// Invalidate all user caches
cache_manager.invalidate_pattern("user:*").await?;

// Invalidate specific user's related caches
cache_manager.invalidate_pattern("user:123:*").await?;
§Errors

Returns an error if invalidation fails.

Source

pub async fn set_with_broadcast( &self, key: &str, value: Bytes, strategy: CacheStrategy, ) -> CacheResult<()>

Set value with automatic broadcast to all instances

This is a write-through operation that updates the cache and broadcasts the update to all other instances automatically.

§Arguments
  • key - Cache key
  • value - Value to cache
  • strategy - Cache strategy (determines TTL)
§Example
// Update and broadcast in one call
let data = Bytes::from("active");
cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
§Errors

Returns an error if cache set or broadcast fails.

Source

pub fn invalidation_stats(&self) -> Option<InvalidationStats>

Available on crate feature redis only.

Get invalidation statistics

Returns statistics about invalidation operations if invalidation is enabled.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more