multi_tier_cache/
redis_streams.rs

1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//!     "events",
30//!     vec![
31//!         ("user_id".to_string(), "123".to_string()),
32//!         ("action".to_string(), "login".to_string()),
33//!     ],
34//!     Some(1000)  // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use crate::traits::StreamingBackend;
44use anyhow::Result;
45use async_trait::async_trait;
46use redis::aio::ConnectionManager;
47use tracing::debug;
48
49/// Type alias for Redis Stream entry: (ID, Field-Value Pairs)
50pub type StreamEntry = (String, Vec<(String, String)>);
51
52/// Redis Streams client for event-driven architectures
53///
54/// Wraps Redis `ConnectionManager` to provide high-level streaming operations.
55#[derive(Clone)]
56pub struct RedisStreams {
57    conn_manager: ConnectionManager,
58}
59
60impl RedisStreams {
61    /// Create a new `RedisStreams` instance
62    ///
63    /// # Arguments
64    /// * `redis_url` - Redis connection URL (e.g., `<redis://127.0.0.1:6379>`)
65    ///
66    /// # Example
67    ///
68    /// ```no_run
69    /// # use multi_tier_cache::RedisStreams;
70    /// # async fn example() -> anyhow::Result<()> {
71    /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
72    /// # Ok(())
73    /// # }
74    /// ```
75    /// # Errors
76    ///
77    /// Returns an error if Redis connection fails.
78    pub async fn new(redis_url: &str) -> Result<Self> {
79        let client = redis::Client::open(redis_url)?;
80        let conn_manager = ConnectionManager::new(client).await?;
81
82        debug!("Redis Streams initialized at {}", redis_url);
83
84        Ok(Self { conn_manager })
85    }
86
87    /// Publish data to Redis Stream using XADD
88    ///
89    /// # Arguments
90    /// * `stream_key` - Name of the Redis Stream (e.g., "`events_stream`")
91    /// * `fields` - Vec of field-value pairs to add to the stream
92    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
93    ///
94    /// # Returns
95    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
96    ///
97    /// # Example
98    ///
99    /// ```no_run
100    /// # use multi_tier_cache::RedisStreams;
101    /// # async fn example() -> anyhow::Result<()> {
102    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
103    /// let event_id = streams.stream_add(
104    ///     "user_events",
105    ///     vec![
106    ///         ("user_id".to_string(), "42".to_string()),
107    ///         ("action".to_string(), "purchase".to_string()),
108    ///         ("amount".to_string(), "99.99".to_string()),
109    ///     ],
110    ///     Some(10000)  // Keep max 10k events
111    /// ).await?;
112    /// println!("Event ID: {}", event_id);
113    /// # Ok(())
114    /// # }
115    /// ```
116    /// # Errors
117    ///
118    /// Returns an error if the XADD command fails.
119    pub async fn stream_add(
120        &self,
121        stream_key: &str,
122        fields: Vec<(String, String)>,
123        maxlen: Option<usize>,
124    ) -> Result<String> {
125        let mut conn = self.conn_manager.clone();
126
127        // Build XADD command with MAXLEN trimming if specified
128        let mut cmd = redis::cmd("XADD");
129        cmd.arg(stream_key);
130
131        if let Some(max) = maxlen {
132            // Use MAXLEN ~ for approximate trimming (more efficient)
133            cmd.arg("MAXLEN").arg("~").arg(max);
134        }
135
136        cmd.arg("*"); // Auto-generate ID
137
138        // Add all field-value pairs
139        for (field, value) in &fields {
140            cmd.arg(field).arg(value);
141        }
142
143        let entry_id: String = cmd.query_async(&mut conn).await?;
144
145        debug!(
146            "[Stream] Published to '{}' (ID: {}, fields: {})",
147            stream_key,
148            entry_id,
149            fields.len()
150        );
151        Ok(entry_id)
152    }
153
154    /// Read latest entries from Redis Stream using XREVRANGE
155    ///
156    /// # Arguments
157    /// * `stream_key` - Name of the Redis Stream
158    /// * `count` - Number of latest entries to retrieve
159    ///
160    /// # Returns
161    /// Vector of (`entry_id`, fields) tuples, ordered from newest to oldest
162    ///
163    /// # Example
164    ///
165    /// ```no_run
166    /// # use multi_tier_cache::RedisStreams;
167    /// # async fn example() -> anyhow::Result<()> {
168    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
169    /// let latest_10 = streams.stream_read_latest("events", 10).await?;
170    /// for (id, fields) in latest_10 {
171    ///     println!("Event {}: {:?}", id, fields);
172    /// }
173    /// # Ok(())
174    /// # }
175    /// ```
176    /// # Errors
177    ///
178    /// Returns an error if the XREVRANGE command fails.
179    pub async fn stream_read_latest(
180        &self,
181        stream_key: &str,
182        count: usize,
183    ) -> Result<Vec<StreamEntry>> {
184        let mut conn = self.conn_manager.clone();
185
186        // XREVRANGE returns a raw Value that we need to parse manually
187        let raw_result: redis::Value = redis::cmd("XREVRANGE")
188            .arg(stream_key)
189            .arg("+") // Start from the newest
190            .arg("-") // To the oldest
191            .arg("COUNT")
192            .arg(count)
193            .query_async(&mut conn)
194            .await?;
195
196        let entries = Self::parse_stream_response(raw_result)?;
197
198        debug!(
199            "[Stream] Read {} entries from '{}'",
200            entries.len(),
201            stream_key
202        );
203        Ok(entries)
204    }
205
206    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
207    ///
208    /// # Arguments
209    /// * `stream_key` - Name of the Redis Stream
210    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
211    /// * `count` - Maximum number of entries to retrieve
212    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
213    ///
214    /// # Returns
215    /// Vector of (`entry_id`, fields) tuples
216    ///
217    /// # Example
218    ///
219    /// ```no_run
220    /// # use multi_tier_cache::RedisStreams;
221    /// # async fn example() -> anyhow::Result<()> {
222    /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
223    /// // Non-blocking: read new entries since last seen ID
224    /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
225    ///
226    /// // Blocking: wait up to 5 seconds for new entries
227    /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    /// # Errors
232    ///
233    /// Returns an error if the XREAD command fails.
234    pub async fn stream_read(
235        &self,
236        stream_key: &str,
237        last_id: &str,
238        count: usize,
239        block_ms: Option<usize>,
240    ) -> Result<Vec<StreamEntry>> {
241        let mut conn = self.conn_manager.clone();
242
243        let mut cmd = redis::cmd("XREAD");
244
245        // Add BLOCK if specified
246        if let Some(block) = block_ms {
247            cmd.arg("BLOCK").arg(block);
248        }
249
250        cmd.arg("COUNT").arg(count);
251        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
252
253        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
254
255        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
256        let entries = Self::parse_xread_response(raw_result)?;
257
258        debug!(
259            "[Stream] XREAD retrieved {} entries from '{}'",
260            entries.len(),
261            stream_key
262        );
263        Ok(entries)
264    }
265
266    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
267    /// Format: [[id, [field, value, field, value, ...]], ...]
268    fn parse_stream_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
269        match value {
270            redis::Value::Array(entries) => {
271                let mut result = Vec::new();
272
273                for entry in entries {
274                    if let redis::Value::Array(entry_parts) = entry {
275                        if entry_parts.len() >= 2 {
276                            // First element is the ID
277                            if let Some(id_val) = entry_parts.first() {
278                                let id = Self::value_to_string(id_val);
279
280                                // Second element is the field-value array
281                                if let Some(redis::Value::Array(field_values)) = entry_parts.get(1)
282                                {
283                                    let mut fields = Vec::new();
284
285                                    // Process pairs of field-value
286                                    for chunk in field_values.chunks(2) {
287                                        if chunk.len() == 2 {
288                                            if let (Some(field_val), Some(value_val)) =
289                                                (chunk.first(), chunk.get(1))
290                                            {
291                                                let field = Self::value_to_string(field_val);
292                                                let value = Self::value_to_string(value_val);
293                                                fields.push((field, value));
294                                            }
295                                        }
296                                    }
297
298                                    result.push((id, fields));
299                                }
300                            }
301                        }
302                    }
303                }
304
305                Ok(result)
306            }
307            redis::Value::Nil => Ok(Vec::new()),
308            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format")),
309        }
310    }
311
312    /// Helper function to parse XREAD response
313    /// Format: [[`stream_name`, [[id, [field, value, ...]], ...]], ...]
314    fn parse_xread_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
315        match value {
316            redis::Value::Array(streams) => {
317                let mut all_entries = Vec::new();
318
319                for stream in streams {
320                    if let redis::Value::Array(stream_parts) = stream {
321                        if stream_parts.len() >= 2 {
322                            // Second element contains the entries
323                            if let Some(redis::Value::Array(entries)) = stream_parts.get(1) {
324                                for entry in entries {
325                                    if let redis::Value::Array(entry_parts) = entry {
326                                        if entry_parts.len() >= 2 {
327                                            if let Some(id_val) = entry_parts.first() {
328                                                let id = Self::value_to_string(id_val);
329
330                                                if let Some(redis::Value::Array(field_values)) =
331                                                    entry_parts.get(1)
332                                                {
333                                                    let mut fields = Vec::new();
334
335                                                    for chunk in field_values.chunks(2) {
336                                                        if chunk.len() == 2 {
337                                                            if let (
338                                                                Some(field_val),
339                                                                Some(value_val),
340                                                            ) = (chunk.first(), chunk.get(1))
341                                                            {
342                                                                let field = Self::value_to_string(
343                                                                    field_val,
344                                                                );
345                                                                let value = Self::value_to_string(
346                                                                    value_val,
347                                                                );
348                                                                fields.push((field, value));
349                                                            }
350                                                        }
351                                                    }
352
353                                                    all_entries.push((id, fields));
354                                                }
355                                            }
356                                        }
357                                    }
358                                }
359                            }
360                        }
361                    }
362                }
363
364                Ok(all_entries)
365            }
366            redis::Value::Nil => Ok(Vec::new()),
367            _ => Err(anyhow::anyhow!("Unexpected XREAD response format")),
368        }
369    }
370
371    /// Helper function to convert Redis Value to String
372    fn value_to_string(value: &redis::Value) -> String {
373        match value {
374            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
375            redis::Value::SimpleString(s) => s.clone(),
376            redis::Value::Int(i) => i.to_string(),
377            redis::Value::Okay => "OK".to_string(),
378            _ => String::new(),
379        }
380    }
381}
382
383/// Implement `StreamingBackend` trait for `RedisStreams`
384///
385/// This enables `RedisStreams` to be used as a pluggable streaming backend.
386#[async_trait]
387impl StreamingBackend for RedisStreams {
388    async fn stream_add(
389        &self,
390        stream_key: &str,
391        fields: Vec<(String, String)>,
392        maxlen: Option<usize>,
393    ) -> Result<String> {
394        RedisStreams::stream_add(self, stream_key, fields, maxlen).await
395    }
396
397    async fn stream_read_latest(
398        &self,
399        stream_key: &str,
400        count: usize,
401    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
402        let entries = RedisStreams::stream_read_latest(self, stream_key, count).await?;
403        // Convert StreamEntry to the trait's expected type if they match, or just return
404        // Since StreamEntry IS (String, Vec<(String, String)>), this is fine.
405        // However, the trait definition in traits.rs might not use the alias.
406        // Let's check traits.rs first or just return the result since the types are identical.
407        Ok(entries)
408    }
409
410    async fn stream_read(
411        &self,
412        stream_key: &str,
413        last_id: &str,
414        count: usize,
415        block_ms: Option<usize>,
416    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
417        let entries = RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await?;
418        Ok(entries)
419    }
420}