multi_tier_cache/
redis_streams.rs

1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//!     "events",
30//!     vec![
31//!         ("user_id".to_string(), "123".to_string()),
32//!         ("action".to_string(), "login".to_string()),
33//!     ],
34//!     Some(1000)  // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use anyhow::Result;
44use redis::aio::ConnectionManager;
45use async_trait::async_trait;
46use crate::traits::StreamingBackend;
47
48/// Redis Streams client for event-driven architectures
49///
50/// Wraps Redis ConnectionManager to provide high-level streaming operations.
51#[derive(Clone)]
52pub struct RedisStreams {
53    conn_manager: ConnectionManager,
54}
55
56impl RedisStreams {
57    /// Create a new RedisStreams instance
58    ///
59    /// # Arguments
60    /// * `redis_url` - Redis connection URL (e.g., "redis://127.0.0.1:6379")
61    ///
62    /// # Example
63    ///
64    /// ```no_run
65    /// # use multi_tier_cache::RedisStreams;
66    /// # async fn example() -> anyhow::Result<()> {
67    /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
68    /// # Ok(())
69    /// # }
70    /// ```
71    pub async fn new(redis_url: &str) -> Result<Self> {
72        let client = redis::Client::open(redis_url)?;
73        let conn_manager = ConnectionManager::new(client).await?;
74
75        println!("🌊 Redis Streams initialized at {}", redis_url);
76
77        Ok(Self { conn_manager })
78    }
79
80    /// Publish data to Redis Stream using XADD
81    ///
82    /// # Arguments
83    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
84    /// * `fields` - Vec of field-value pairs to add to the stream
85    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
86    ///
87    /// # Returns
88    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
89    ///
90    /// # Example
91    ///
92    /// ```no_run
93    /// # use multi_tier_cache::RedisStreams;
94    /// # async fn example() -> anyhow::Result<()> {
95    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
96    /// let event_id = streams.stream_add(
97    ///     "user_events",
98    ///     vec![
99    ///         ("user_id".to_string(), "42".to_string()),
100    ///         ("action".to_string(), "purchase".to_string()),
101    ///         ("amount".to_string(), "99.99".to_string()),
102    ///     ],
103    ///     Some(10000)  // Keep max 10k events
104    /// ).await?;
105    /// println!("Event ID: {}", event_id);
106    /// # Ok(())
107    /// # }
108    /// ```
109    pub async fn stream_add(
110        &self,
111        stream_key: &str,
112        fields: Vec<(String, String)>,
113        maxlen: Option<usize>
114    ) -> Result<String> {
115        let mut conn = self.conn_manager.clone();
116
117        // Build XADD command with MAXLEN trimming if specified
118        let mut cmd = redis::cmd("XADD");
119        cmd.arg(stream_key);
120
121        if let Some(max) = maxlen {
122            // Use MAXLEN ~ for approximate trimming (more efficient)
123            cmd.arg("MAXLEN").arg("~").arg(max);
124        }
125
126        cmd.arg("*");  // Auto-generate ID
127
128        // Add all field-value pairs
129        for (field, value) in &fields {
130            cmd.arg(field).arg(value);
131        }
132
133        let entry_id: String = cmd.query_async(&mut conn).await?;
134
135        println!("📤 [Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
136        Ok(entry_id)
137    }
138
139    /// Read latest entries from Redis Stream using XREVRANGE
140    ///
141    /// # Arguments
142    /// * `stream_key` - Name of the Redis Stream
143    /// * `count` - Number of latest entries to retrieve
144    ///
145    /// # Returns
146    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
147    ///
148    /// # Example
149    ///
150    /// ```no_run
151    /// # use multi_tier_cache::RedisStreams;
152    /// # async fn example() -> anyhow::Result<()> {
153    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
154    /// let latest_10 = streams.stream_read_latest("events", 10).await?;
155    /// for (id, fields) in latest_10 {
156    ///     println!("Event {}: {:?}", id, fields);
157    /// }
158    /// # Ok(())
159    /// # }
160    /// ```
161    pub async fn stream_read_latest(
162        &self,
163        stream_key: &str,
164        count: usize
165    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
166        let mut conn = self.conn_manager.clone();
167
168        // XREVRANGE returns a raw Value that we need to parse manually
169        let raw_result: redis::Value = redis::cmd("XREVRANGE")
170            .arg(stream_key)
171            .arg("+")  // Start from the newest
172            .arg("-")  // To the oldest
173            .arg("COUNT")
174            .arg(count)
175            .query_async(&mut conn)
176            .await?;
177
178        let entries = Self::parse_stream_response(raw_result)?;
179
180        println!("📥 [Stream] Read {} entries from '{}'", entries.len(), stream_key);
181        Ok(entries)
182    }
183
184    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
185    ///
186    /// # Arguments
187    /// * `stream_key` - Name of the Redis Stream
188    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
189    /// * `count` - Maximum number of entries to retrieve
190    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
191    ///
192    /// # Returns
193    /// Vector of (entry_id, fields) tuples
194    ///
195    /// # Example
196    ///
197    /// ```no_run
198    /// # use multi_tier_cache::RedisStreams;
199    /// # async fn example() -> anyhow::Result<()> {
200    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
201    /// // Non-blocking: read new entries since last seen ID
202    /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
203    ///
204    /// // Blocking: wait up to 5 seconds for new entries
205    /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
206    /// # Ok(())
207    /// # }
208    /// ```
209    pub async fn stream_read(
210        &self,
211        stream_key: &str,
212        last_id: &str,
213        count: usize,
214        block_ms: Option<usize>
215    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
216        let mut conn = self.conn_manager.clone();
217
218        let mut cmd = redis::cmd("XREAD");
219
220        // Add BLOCK if specified
221        if let Some(block) = block_ms {
222            cmd.arg("BLOCK").arg(block);
223        }
224
225        cmd.arg("COUNT").arg(count);
226        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
227
228        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
229
230        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
231        let entries = Self::parse_xread_response(raw_result)?;
232
233        println!("📥 [Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
234        Ok(entries)
235    }
236
237    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
238    /// Format: [[id, [field, value, field, value, ...]], ...]
239    fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
240        match value {
241            redis::Value::Array(entries) => {
242                let mut result = Vec::new();
243
244                for entry in entries {
245                    if let redis::Value::Array(entry_parts) = entry {
246                        if entry_parts.len() >= 2 {
247                            // First element is the ID
248                            let id = Self::value_to_string(&entry_parts[0]);
249
250                            // Second element is the field-value array
251                            if let redis::Value::Array(field_values) = &entry_parts[1] {
252                                let mut fields = Vec::new();
253
254                                // Process pairs of field-value
255                                for chunk in field_values.chunks(2) {
256                                    if chunk.len() == 2 {
257                                        let field = Self::value_to_string(&chunk[0]);
258                                        let value = Self::value_to_string(&chunk[1]);
259                                        fields.push((field, value));
260                                    }
261                                }
262
263                                result.push((id, fields));
264                            }
265                        }
266                    }
267                }
268
269                Ok(result)
270            }
271            redis::Value::Nil => Ok(Vec::new()),
272            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
273        }
274    }
275
276    /// Helper function to parse XREAD response
277    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
278    fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
279        match value {
280            redis::Value::Array(streams) => {
281                let mut all_entries = Vec::new();
282
283                for stream in streams {
284                    if let redis::Value::Array(stream_parts) = stream {
285                        if stream_parts.len() >= 2 {
286                            // Second element contains the entries
287                            if let redis::Value::Array(entries) = &stream_parts[1] {
288                                for entry in entries {
289                                    if let redis::Value::Array(entry_parts) = entry {
290                                        if entry_parts.len() >= 2 {
291                                            let id = Self::value_to_string(&entry_parts[0]);
292
293                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
294                                                let mut fields = Vec::new();
295
296                                                for chunk in field_values.chunks(2) {
297                                                    if chunk.len() == 2 {
298                                                        let field = Self::value_to_string(&chunk[0]);
299                                                        let value = Self::value_to_string(&chunk[1]);
300                                                        fields.push((field, value));
301                                                    }
302                                                }
303
304                                                all_entries.push((id, fields));
305                                            }
306                                        }
307                                    }
308                                }
309                            }
310                        }
311                    }
312                }
313
314                Ok(all_entries)
315            }
316            redis::Value::Nil => Ok(Vec::new()),
317            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
318        }
319    }
320
321    /// Helper function to convert Redis Value to String
322    fn value_to_string(value: &redis::Value) -> String {
323        match value {
324            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
325            redis::Value::SimpleString(s) => s.clone(),
326            redis::Value::Int(i) => i.to_string(),
327            redis::Value::Okay => "OK".to_string(),
328            redis::Value::Nil => String::new(),
329            _ => String::new(),
330        }
331    }
332}
333
334/// Implement StreamingBackend trait for RedisStreams
335///
336/// This enables RedisStreams to be used as a pluggable streaming backend.
337#[async_trait]
338impl StreamingBackend for RedisStreams {
339    async fn stream_add(
340        &self,
341        stream_key: &str,
342        fields: Vec<(String, String)>,
343        maxlen: Option<usize>,
344    ) -> Result<String> {
345        RedisStreams::stream_add(self, stream_key, fields, maxlen).await
346    }
347
348    async fn stream_read_latest(
349        &self,
350        stream_key: &str,
351        count: usize,
352    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
353        RedisStreams::stream_read_latest(self, stream_key, count).await
354    }
355
356    async fn stream_read(
357        &self,
358        stream_key: &str,
359        last_id: &str,
360        count: usize,
361        block_ms: Option<usize>,
362    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
363        RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await
364    }
365}