multi_tier_cache/
redis_streams.rs

1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//!     "events",
30//!     vec![
31//!         ("user_id".to_string(), "123".to_string()),
32//!         ("action".to_string(), "login".to_string()),
33//!     ],
34//!     Some(1000)  // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use anyhow::Result;
44use redis::aio::ConnectionManager;
45use async_trait::async_trait;
46use tracing::debug;
47use crate::traits::StreamingBackend;
48
49/// Type alias for Redis Stream entry: (ID, Field-Value Pairs)
50pub type StreamEntry = (String, Vec<(String, String)>);
51
52/// Redis Streams client for event-driven architectures
53///
54/// Wraps Redis ConnectionManager to provide high-level streaming operations.
55#[derive(Clone)]
56pub struct RedisStreams {
57    conn_manager: ConnectionManager,
58}
59
60impl RedisStreams {
61    /// Create a new RedisStreams instance
62    ///
63    /// # Arguments
64    /// * `redis_url` - Redis connection URL (e.g., "redis://127.0.0.1:6379")
65    ///
66    /// # Example
67    ///
68    /// ```no_run
69    /// # use multi_tier_cache::RedisStreams;
70    /// # async fn example() -> anyhow::Result<()> {
71    /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
72    /// # Ok(())
73    /// # }
74    /// ```
75    pub async fn new(redis_url: &str) -> Result<Self> {
76        let client = redis::Client::open(redis_url)?;
77        let conn_manager = ConnectionManager::new(client).await?;
78
79        debug!("Redis Streams initialized at {}", redis_url);
80
81        Ok(Self { conn_manager })
82    }
83
84    /// Publish data to Redis Stream using XADD
85    ///
86    /// # Arguments
87    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
88    /// * `fields` - Vec of field-value pairs to add to the stream
89    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
90    ///
91    /// # Returns
92    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
93    ///
94    /// # Example
95    ///
96    /// ```no_run
97    /// # use multi_tier_cache::RedisStreams;
98    /// # async fn example() -> anyhow::Result<()> {
99    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
100    /// let event_id = streams.stream_add(
101    ///     "user_events",
102    ///     vec![
103    ///         ("user_id".to_string(), "42".to_string()),
104    ///         ("action".to_string(), "purchase".to_string()),
105    ///         ("amount".to_string(), "99.99".to_string()),
106    ///     ],
107    ///     Some(10000)  // Keep max 10k events
108    /// ).await?;
109    /// println!("Event ID: {}", event_id);
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub async fn stream_add(
114        &self,
115        stream_key: &str,
116        fields: Vec<(String, String)>,
117        maxlen: Option<usize>
118    ) -> Result<String> {
119        let mut conn = self.conn_manager.clone();
120
121        // Build XADD command with MAXLEN trimming if specified
122        let mut cmd = redis::cmd("XADD");
123        cmd.arg(stream_key);
124
125        if let Some(max) = maxlen {
126            // Use MAXLEN ~ for approximate trimming (more efficient)
127            cmd.arg("MAXLEN").arg("~").arg(max);
128        }
129
130        cmd.arg("*");  // Auto-generate ID
131
132        // Add all field-value pairs
133        for (field, value) in &fields {
134            cmd.arg(field).arg(value);
135        }
136
137        let entry_id: String = cmd.query_async(&mut conn).await?;
138
139        debug!("[Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
140        Ok(entry_id)
141    }
142
143    /// Read latest entries from Redis Stream using XREVRANGE
144    ///
145    /// # Arguments
146    /// * `stream_key` - Name of the Redis Stream
147    /// * `count` - Number of latest entries to retrieve
148    ///
149    /// # Returns
150    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
151    ///
152    /// # Example
153    ///
154    /// ```no_run
155    /// # use multi_tier_cache::RedisStreams;
156    /// # async fn example() -> anyhow::Result<()> {
157    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
158    /// let latest_10 = streams.stream_read_latest("events", 10).await?;
159    /// for (id, fields) in latest_10 {
160    ///     println!("Event {}: {:?}", id, fields);
161    /// }
162    /// # Ok(())
163    /// # }
164    /// ```
165    pub async fn stream_read_latest(
166        &self,
167        stream_key: &str,
168        count: usize
169    ) -> Result<Vec<StreamEntry>> {
170        let mut conn = self.conn_manager.clone();
171
172        // XREVRANGE returns a raw Value that we need to parse manually
173        let raw_result: redis::Value = redis::cmd("XREVRANGE")
174            .arg(stream_key)
175            .arg("+")  // Start from the newest
176            .arg("-")  // To the oldest
177            .arg("COUNT")
178            .arg(count)
179            .query_async(&mut conn)
180            .await?;
181
182        let entries = Self::parse_stream_response(raw_result)?;
183
184        debug!("[Stream] Read {} entries from '{}'", entries.len(), stream_key);
185        Ok(entries)
186    }
187
188    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
189    ///
190    /// # Arguments
191    /// * `stream_key` - Name of the Redis Stream
192    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
193    /// * `count` - Maximum number of entries to retrieve
194    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
195    ///
196    /// # Returns
197    /// Vector of (entry_id, fields) tuples
198    ///
199    /// # Example
200    ///
201    /// ```no_run
202    /// # use multi_tier_cache::RedisStreams;
203    /// # async fn example() -> anyhow::Result<()> {
204    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
205    /// // Non-blocking: read new entries since last seen ID
206    /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
207    ///
208    /// // Blocking: wait up to 5 seconds for new entries
209    /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub async fn stream_read(
214        &self,
215        stream_key: &str,
216        last_id: &str,
217        count: usize,
218        block_ms: Option<usize>
219    ) -> Result<Vec<StreamEntry>> {
220        let mut conn = self.conn_manager.clone();
221
222        let mut cmd = redis::cmd("XREAD");
223
224        // Add BLOCK if specified
225        if let Some(block) = block_ms {
226            cmd.arg("BLOCK").arg(block);
227        }
228
229        cmd.arg("COUNT").arg(count);
230        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
231
232        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
233
234        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
235        let entries = Self::parse_xread_response(raw_result)?;
236
237        debug!("[Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
238        Ok(entries)
239    }
240
241    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
242    /// Format: [[id, [field, value, field, value, ...]], ...]
243    fn parse_stream_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
244        match value {
245            redis::Value::Array(entries) => {
246                let mut result = Vec::new();
247
248                for entry in entries {
249                    if let redis::Value::Array(entry_parts) = entry {
250                        if entry_parts.len() >= 2 {
251                            // First element is the ID
252                            let id = Self::value_to_string(&entry_parts[0]);
253
254                            // Second element is the field-value array
255                            if let redis::Value::Array(field_values) = &entry_parts[1] {
256                                let mut fields = Vec::new();
257
258                                // Process pairs of field-value
259                                for chunk in field_values.chunks(2) {
260                                    if chunk.len() == 2 {
261                                        let field = Self::value_to_string(&chunk[0]);
262                                        let value = Self::value_to_string(&chunk[1]);
263                                        fields.push((field, value));
264                                    }
265                                }
266
267                                result.push((id, fields));
268                            }
269                        }
270                    }
271                }
272
273                Ok(result)
274            }
275            redis::Value::Nil => Ok(Vec::new()),
276            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
277        }
278    }
279
280    /// Helper function to parse XREAD response
281    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
282    fn parse_xread_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
283        match value {
284            redis::Value::Array(streams) => {
285                let mut all_entries = Vec::new();
286
287                for stream in streams {
288                    if let redis::Value::Array(stream_parts) = stream {
289                        if stream_parts.len() >= 2 {
290                            // Second element contains the entries
291                            if let redis::Value::Array(entries) = &stream_parts[1] {
292                                for entry in entries {
293                                    if let redis::Value::Array(entry_parts) = entry {
294                                        if entry_parts.len() >= 2 {
295                                            let id = Self::value_to_string(&entry_parts[0]);
296
297                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
298                                                let mut fields = Vec::new();
299
300                                                for chunk in field_values.chunks(2) {
301                                                    if chunk.len() == 2 {
302                                                        let field = Self::value_to_string(&chunk[0]);
303                                                        let value = Self::value_to_string(&chunk[1]);
304                                                        fields.push((field, value));
305                                                    }
306                                                }
307
308                                                all_entries.push((id, fields));
309                                            }
310                                        }
311                                    }
312                                }
313                            }
314                        }
315                    }
316                }
317
318                Ok(all_entries)
319            }
320            redis::Value::Nil => Ok(Vec::new()),
321            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
322        }
323    }
324
325    /// Helper function to convert Redis Value to String
326    fn value_to_string(value: &redis::Value) -> String {
327        match value {
328            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
329            redis::Value::SimpleString(s) => s.clone(),
330            redis::Value::Int(i) => i.to_string(),
331            redis::Value::Okay => "OK".to_string(),
332            redis::Value::Nil => String::new(),
333            _ => String::new(),
334        }
335    }
336}
337
338/// Implement StreamingBackend trait for RedisStreams
339///
340/// This enables RedisStreams to be used as a pluggable streaming backend.
341#[async_trait]
342impl StreamingBackend for RedisStreams {
343    async fn stream_add(
344        &self,
345        stream_key: &str,
346        fields: Vec<(String, String)>,
347        maxlen: Option<usize>,
348    ) -> Result<String> {
349        RedisStreams::stream_add(self, stream_key, fields, maxlen).await
350    }
351
352    async fn stream_read_latest(
353        &self,
354        stream_key: &str,
355        count: usize,
356    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
357        let entries = RedisStreams::stream_read_latest(self, stream_key, count).await?;
358        // Convert StreamEntry to the trait's expected type if they match, or just return
359        // Since StreamEntry IS (String, Vec<(String, String)>), this is fine.
360        // However, the trait definition in traits.rs might not use the alias.
361        // Let's check traits.rs first or just return the result since the types are identical.
362        Ok(entries)
363    }
364
365    async fn stream_read(
366        &self,
367        stream_key: &str,
368        last_id: &str,
369        count: usize,
370        block_ms: Option<usize>,
371    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
372        let entries = RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await?;
373        Ok(entries)
374    }
375}