Skip to main content

multi_tier_cache/
traits.rs

1//! Cache Backend Traits
2//!
3//! This module defines the trait abstractions that allow users to implement
4//! custom cache backends for both L1 (in-memory) and L2 (distributed) caches.
5//!
6//! # Architecture
7//!
8//! - `CacheBackend`: Core trait for all cache implementations
9//! - `L2CacheBackend`: Extended trait for L2 caches with TTL introspection
10//! - `StreamingBackend`: Optional trait for event streaming capabilities
11//!
12//! # Example: Custom L1 Backend
13//!
14/// ```rust,ignore
15/// use multi_tier_cache::error::{CacheError, CacheResult};
16/// use bytes::Bytes;
17/// use std::time::Duration;
18/// use futures_util::future::BoxFuture;
19/// use multi_tier_cache::CacheBackend;
20///
21/// struct MyCustomCache;
22///
23/// impl CacheBackend for MyCustomCache {
24///     fn get<'a>(&'a self, _key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
25///         Box::pin(async move { None })
26///     }
27///
28///     fn set_with_ttl<'a>(&'a self, _key: &'a str, _value: Bytes, _ttl: Duration) -> BoxFuture<'a, CacheResult<()>> {
29///         Box::pin(async move { Ok(()) })
30///     }
31///
32///     fn remove<'a>(&'a self, _key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
33///         Box::pin(async move { Ok(()) })
34///     }
35///
36///     fn health_check(&self) -> BoxFuture<'_, bool> {
37///         Box::pin(async move { true })
38///     }
39///
40///     fn name(&self) -> &'static str { "MyCache" }
41/// }
42/// ```
43use crate::error::CacheResult;
44use bytes::Bytes;
45use futures_util::future::BoxFuture;
46use std::time::Duration;
47
48/// Core cache backend trait for both L1 and L2 caches
49///
50/// This trait defines the essential operations that any cache backend must support.
51/// Implement this trait to create custom L1 (in-memory) or L2 (distributed) cache backends.
52///
53/// # Required Operations
54///
55/// - `get`: Retrieve a value by key
56/// - `set_with_ttl`: Store a value with a time-to-live
57/// - `remove`: Delete a value by key
58/// - `health_check`: Verify cache backend is operational
59///
60/// # Thread Safety
61///
62/// Implementations must be `Send + Sync` to support concurrent access across async tasks.
63///
64/// # Performance Considerations
65///
66/// - `get` operations should be optimized for low latency (target: <1ms for L1, <5ms for L2)
67/// - `set_with_ttl` operations can be slightly slower but should still be fast
68/// - Consider connection pooling for distributed backends
69///
70/// # Example
71///
72/// See module-level documentation for a complete example.
73pub trait CacheBackend: Send + Sync {
74    /// Get value from cache by key
75    ///
76    /// # Arguments
77    ///
78    /// * `key` - The cache key to retrieve
79    ///
80    /// # Returns
81    ///
82    /// * `Some(value)` - Value found in cache
83    /// * `None` - Key not found or expired
84    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>>;
85
86    /// Set value in cache with time-to-live
87    ///
88    /// # Arguments
89    ///
90    /// * `key` - The cache key
91    /// * `value` - The value to store (raw bytes)
92    /// * `ttl` - Time-to-live duration
93    ///
94    /// # Returns
95    ///
96    /// * `Ok(())` - Value successfully cached
97    /// * `Err(e)` - Cache operation failed
98    fn set_with_ttl<'a>(
99        &'a self,
100        key: &'a str,
101        value: Bytes,
102        ttl: Duration,
103    ) -> BoxFuture<'a, CacheResult<()>>;
104
105    /// Set value in cache with default TTL (5 minutes)
106    fn set<'a>(&'a self, key: &'a str, value: Bytes) -> BoxFuture<'a, CacheResult<()>> {
107        self.set_with_ttl(key, value, std::time::Duration::from_secs(300))
108    }
109
110    /// Remove value from cache
111    ///
112    /// # Arguments
113    ///
114    /// * `key` - The cache key to remove
115    ///
116    /// # Returns
117    ///
118    /// * `Ok(())` - Value removed (or didn't exist)
119    /// * `Err(e)` - Cache operation failed
120    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>>;
121
122    /// Check if cache backend is healthy
123    ///
124    /// This method should verify that the cache backend is operational.
125    /// For distributed caches, this typically involves a ping or connectivity check.
126    ///
127    /// # Returns
128    ///
129    /// * `true` - Cache is healthy and operational
130    /// * `false` - Cache is unhealthy or unreachable
131    fn health_check(&self) -> BoxFuture<'_, bool>;
132
133    /// Remove keys matching a pattern
134    ///
135    /// # Arguments
136    ///
137    /// * `pattern` - Glob-style pattern (e.g. "user:*")
138    ///
139    /// # Returns
140    ///
141    /// * `Ok(())` - Pattern processed
142    /// * `Err(e)` - Operation failed
143    fn remove_pattern<'a>(&'a self, _pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
144        Box::pin(async { Ok(()) })
145    }
146
147    /// Get the name of this cache backend
148    fn name(&self) -> &'static str;
149}
150
151// (No longer needed since traits are now dyn-compatible)
152
153/// Extended trait for L2 cache backends with TTL introspection
154///
155/// This trait extends `CacheBackend` with the ability to retrieve both a value
156/// and its remaining TTL. This is essential for implementing efficient L2-to-L1
157/// promotion with accurate TTL propagation.
158///
159/// # Use Cases
160///
161/// - L2-to-L1 promotion with same TTL
162/// - TTL-based cache warming strategies
163/// - Monitoring and analytics
164///
165/// # Example
166///
167/// ```rust,ignore,ignore
168/// ```rust,ignore
169/// use multi_tier_cache::error::{CacheError, CacheResult};
170/// use bytes::Bytes;
171/// use std::time::Duration;
172/// use futures_util::future::BoxFuture;
173/// use multi_tier_cache::{CacheBackend, L2CacheBackend};
174///
175/// struct MyDistributedCache;
176///
177/// impl CacheBackend for MyDistributedCache {
178///     fn get<'a>(&'a self, _key: &'a str) -> BoxFuture<'a, Option<Bytes>> { Box::pin(async move { None }) }
179///     fn set_with_ttl<'a>(&'a self, _k: &'a str, _v: Bytes, _t: Duration) -> BoxFuture<'a, CacheResult<()>> { Box::pin(async move { Ok(()) }) }
180///     fn remove<'a>(&'a self, _k: &'a str) -> BoxFuture<'a, CacheResult<()>> { Box::pin(async move { Ok(()) }) }
181///     fn health_check(&self) -> BoxFuture<'_, bool> { Box::pin(async move { true }) }
182///     fn name(&self) -> &'static str { "MyDistCache" }
183/// }
184///
185/// impl L2CacheBackend for MyDistributedCache {
186///     fn get_with_ttl<'a>(&'a self, _key: &'a str) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
187///         Box::pin(async move { None })
188///     }
189/// }
190/// ```
191pub trait L2CacheBackend: CacheBackend {
192    /// Get value with its remaining TTL from L2 cache
193    fn get_with_ttl<'a>(&'a self, key: &'a str)
194    -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>>;
195}
196
197// (No longer needed since traits are now dyn-compatible)
198
199/// Optional trait for cache backends that support event streaming
200///
201/// # Type Definitions
202///
203/// * `StreamEntry` - A single entry in a stream: `(id, fields)` where fields are `Vec<(key, value)>`
204pub type StreamEntry = (String, Vec<(String, String)>);
205
206/// Optional trait for cache backends that support event streaming
207///
208/// This trait defines operations for event-driven architectures using
209/// streaming data structures like Redis Streams.
210///
211/// # Capabilities
212///
213/// - Publish events to streams with automatic trimming
214/// - Read latest entries (newest first)
215/// - Read entries with blocking support
216///
217/// # Backend Requirements
218///
219/// Not all cache backends support streaming. This trait is optional and
220/// should only be implemented by backends with native streaming support
221/// (e.g., Redis Streams, Kafka, Pulsar).
222///
223/// # Example
224///
225/// ```rust,ignore,ignore
226/// ```rust,ignore
227/// use multi_tier_cache::error::{CacheError, CacheResult};
228/// use multi_tier_cache::{StreamingBackend, StreamEntry};
229/// use futures_util::future::BoxFuture;
230///
231/// struct MyStreamingCache;
232///
233/// impl StreamingBackend for MyStreamingCache {
234///     fn stream_add<'a>(
235///         &'a self,
236///         _stream_key: &'a str,
237///         _fields: Vec<(String, String)>,
238///         _maxlen: Option<usize>,
239///     ) -> BoxFuture<'a, CacheResult<String>> {
240///         Box::pin(async move { Ok("entry-id".to_string()) })
241///     }
242///
243///     fn stream_read_latest<'a>(
244///         &'a self,
245///         _stream_key: &'a str,
246///         _count: usize,
247///     ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
248///         Box::pin(async move { Ok(vec![]) })
249///     }
250///
251///     fn stream_read<'a>(
252///         &'a self,
253///         _stream_key: &'a str,
254///         _last_id: &'a str,
255///         _count: usize,
256///         _block_ms: Option<usize>,
257///     ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
258///         Box::pin(async move { Ok(vec![]) })
259///     }
260///
261///     fn stream_create_group<'a>(&'a self, _: &'a str, _: &'a str, _: &'a str) -> BoxFuture<'a, CacheResult<()>> {
262///         Box::pin(async move { Ok(()) })
263///     }
264///
265///     fn stream_read_group<'a>(&'a self, _: &'a str, _: &'a str, _: &'a str, _: usize, _: Option<usize>) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
266///         Box::pin(async move { Ok(vec![]) })
267///     }
268///
269///     fn stream_ack<'a>(&'a self, _: &'a str, _: &'a str, _: &'a [String]) -> BoxFuture<'a, CacheResult<()>> {
270///         Box::pin(async move { Ok(()) })
271///     }
272/// }
273/// ```
274pub trait StreamingBackend: Send + Sync {
275    /// Add an entry to a stream
276    fn stream_add<'a>(
277        &'a self,
278        stream_key: &'a str,
279        fields: Vec<(String, String)>,
280        maxlen: Option<usize>,
281    ) -> BoxFuture<'a, CacheResult<String>>;
282
283    /// Read the latest N entries from a stream (newest first)
284    fn stream_read_latest<'a>(
285        &'a self,
286        stream_key: &'a str,
287        count: usize,
288    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
289
290    /// Read entries from a stream with optional blocking
291    fn stream_read<'a>(
292        &'a self,
293        stream_key: &'a str,
294        last_id: &'a str,
295        count: usize,
296        block_ms: Option<usize>,
297    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
298
299    /// Create a consumer group for a stream
300    fn stream_create_group<'a>(
301        &'a self,
302        stream_key: &'a str,
303        group_name: &'a str,
304        id: &'a str,
305    ) -> BoxFuture<'a, CacheResult<()>>;
306
307    /// Read entries from a stream as a consumer group
308    fn stream_read_group<'a>(
309        &'a self,
310        stream_key: &'a str,
311        group_name: &'a str,
312        consumer_name: &'a str,
313        count: usize,
314        block_ms: Option<usize>,
315    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
316
317    /// Acknowledge entry processing
318    fn stream_ack<'a>(
319        &'a self,
320        stream_key: &'a str,
321        group_name: &'a str,
322        ids: &'a [String],
323    ) -> BoxFuture<'a, CacheResult<()>>;
324}