multi_tier_cache/redis_streams.rs
1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//! "events",
30//! vec![
31//! ("user_id".to_string(), "123".to_string()),
32//! ("action".to_string(), "login".to_string()),
33//! ],
34//! Some(1000) // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use anyhow::Result;
44use redis::aio::ConnectionManager;
45use async_trait::async_trait;
46use crate::traits::StreamingBackend;
47
48/// Redis Streams client for event-driven architectures
49///
50/// Wraps Redis ConnectionManager to provide high-level streaming operations.
51#[derive(Clone)]
52pub struct RedisStreams {
53 conn_manager: ConnectionManager,
54}
55
56impl RedisStreams {
57 /// Create a new RedisStreams instance
58 ///
59 /// # Arguments
60 /// * `redis_url` - Redis connection URL (e.g., "redis://127.0.0.1:6379")
61 ///
62 /// # Example
63 ///
64 /// ```no_run
65 /// # use multi_tier_cache::RedisStreams;
66 /// # async fn example() -> anyhow::Result<()> {
67 /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
68 /// # Ok(())
69 /// # }
70 /// ```
71 pub async fn new(redis_url: &str) -> Result<Self> {
72 let client = redis::Client::open(redis_url)?;
73 let conn_manager = ConnectionManager::new(client).await?;
74
75 println!("🌊 Redis Streams initialized at {}", redis_url);
76
77 Ok(Self { conn_manager })
78 }
79
80 /// Publish data to Redis Stream using XADD
81 ///
82 /// # Arguments
83 /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
84 /// * `fields` - Vec of field-value pairs to add to the stream
85 /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
86 ///
87 /// # Returns
88 /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
89 ///
90 /// # Example
91 ///
92 /// ```no_run
93 /// # use multi_tier_cache::RedisStreams;
94 /// # async fn example() -> anyhow::Result<()> {
95 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
96 /// let event_id = streams.stream_add(
97 /// "user_events",
98 /// vec![
99 /// ("user_id".to_string(), "42".to_string()),
100 /// ("action".to_string(), "purchase".to_string()),
101 /// ("amount".to_string(), "99.99".to_string()),
102 /// ],
103 /// Some(10000) // Keep max 10k events
104 /// ).await?;
105 /// println!("Event ID: {}", event_id);
106 /// # Ok(())
107 /// # }
108 /// ```
109 pub async fn stream_add(
110 &self,
111 stream_key: &str,
112 fields: Vec<(String, String)>,
113 maxlen: Option<usize>
114 ) -> Result<String> {
115 let mut conn = self.conn_manager.clone();
116
117 // Build XADD command with MAXLEN trimming if specified
118 let mut cmd = redis::cmd("XADD");
119 cmd.arg(stream_key);
120
121 if let Some(max) = maxlen {
122 // Use MAXLEN ~ for approximate trimming (more efficient)
123 cmd.arg("MAXLEN").arg("~").arg(max);
124 }
125
126 cmd.arg("*"); // Auto-generate ID
127
128 // Add all field-value pairs
129 for (field, value) in &fields {
130 cmd.arg(field).arg(value);
131 }
132
133 let entry_id: String = cmd.query_async(&mut conn).await?;
134
135 println!("📤 [Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
136 Ok(entry_id)
137 }
138
139 /// Read latest entries from Redis Stream using XREVRANGE
140 ///
141 /// # Arguments
142 /// * `stream_key` - Name of the Redis Stream
143 /// * `count` - Number of latest entries to retrieve
144 ///
145 /// # Returns
146 /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
147 ///
148 /// # Example
149 ///
150 /// ```no_run
151 /// # use multi_tier_cache::RedisStreams;
152 /// # async fn example() -> anyhow::Result<()> {
153 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
154 /// let latest_10 = streams.stream_read_latest("events", 10).await?;
155 /// for (id, fields) in latest_10 {
156 /// println!("Event {}: {:?}", id, fields);
157 /// }
158 /// # Ok(())
159 /// # }
160 /// ```
161 pub async fn stream_read_latest(
162 &self,
163 stream_key: &str,
164 count: usize
165 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
166 let mut conn = self.conn_manager.clone();
167
168 // XREVRANGE returns a raw Value that we need to parse manually
169 let raw_result: redis::Value = redis::cmd("XREVRANGE")
170 .arg(stream_key)
171 .arg("+") // Start from the newest
172 .arg("-") // To the oldest
173 .arg("COUNT")
174 .arg(count)
175 .query_async(&mut conn)
176 .await?;
177
178 let entries = Self::parse_stream_response(raw_result)?;
179
180 println!("📥 [Stream] Read {} entries from '{}'", entries.len(), stream_key);
181 Ok(entries)
182 }
183
184 /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
185 ///
186 /// # Arguments
187 /// * `stream_key` - Name of the Redis Stream
188 /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
189 /// * `count` - Maximum number of entries to retrieve
190 /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
191 ///
192 /// # Returns
193 /// Vector of (entry_id, fields) tuples
194 ///
195 /// # Example
196 ///
197 /// ```no_run
198 /// # use multi_tier_cache::RedisStreams;
199 /// # async fn example() -> anyhow::Result<()> {
200 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
201 /// // Non-blocking: read new entries since last seen ID
202 /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
203 ///
204 /// // Blocking: wait up to 5 seconds for new entries
205 /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
206 /// # Ok(())
207 /// # }
208 /// ```
209 pub async fn stream_read(
210 &self,
211 stream_key: &str,
212 last_id: &str,
213 count: usize,
214 block_ms: Option<usize>
215 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
216 let mut conn = self.conn_manager.clone();
217
218 let mut cmd = redis::cmd("XREAD");
219
220 // Add BLOCK if specified
221 if let Some(block) = block_ms {
222 cmd.arg("BLOCK").arg(block);
223 }
224
225 cmd.arg("COUNT").arg(count);
226 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
227
228 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
229
230 // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
231 let entries = Self::parse_xread_response(raw_result)?;
232
233 println!("📥 [Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
234 Ok(entries)
235 }
236
237 /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
238 /// Format: [[id, [field, value, field, value, ...]], ...]
239 fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
240 match value {
241 redis::Value::Array(entries) => {
242 let mut result = Vec::new();
243
244 for entry in entries {
245 if let redis::Value::Array(entry_parts) = entry {
246 if entry_parts.len() >= 2 {
247 // First element is the ID
248 let id = Self::value_to_string(&entry_parts[0]);
249
250 // Second element is the field-value array
251 if let redis::Value::Array(field_values) = &entry_parts[1] {
252 let mut fields = Vec::new();
253
254 // Process pairs of field-value
255 for chunk in field_values.chunks(2) {
256 if chunk.len() == 2 {
257 let field = Self::value_to_string(&chunk[0]);
258 let value = Self::value_to_string(&chunk[1]);
259 fields.push((field, value));
260 }
261 }
262
263 result.push((id, fields));
264 }
265 }
266 }
267 }
268
269 Ok(result)
270 }
271 redis::Value::Nil => Ok(Vec::new()),
272 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
273 }
274 }
275
276 /// Helper function to parse XREAD response
277 /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
278 fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
279 match value {
280 redis::Value::Array(streams) => {
281 let mut all_entries = Vec::new();
282
283 for stream in streams {
284 if let redis::Value::Array(stream_parts) = stream {
285 if stream_parts.len() >= 2 {
286 // Second element contains the entries
287 if let redis::Value::Array(entries) = &stream_parts[1] {
288 for entry in entries {
289 if let redis::Value::Array(entry_parts) = entry {
290 if entry_parts.len() >= 2 {
291 let id = Self::value_to_string(&entry_parts[0]);
292
293 if let redis::Value::Array(field_values) = &entry_parts[1] {
294 let mut fields = Vec::new();
295
296 for chunk in field_values.chunks(2) {
297 if chunk.len() == 2 {
298 let field = Self::value_to_string(&chunk[0]);
299 let value = Self::value_to_string(&chunk[1]);
300 fields.push((field, value));
301 }
302 }
303
304 all_entries.push((id, fields));
305 }
306 }
307 }
308 }
309 }
310 }
311 }
312 }
313
314 Ok(all_entries)
315 }
316 redis::Value::Nil => Ok(Vec::new()),
317 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
318 }
319 }
320
321 /// Helper function to convert Redis Value to String
322 fn value_to_string(value: &redis::Value) -> String {
323 match value {
324 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
325 redis::Value::SimpleString(s) => s.clone(),
326 redis::Value::Int(i) => i.to_string(),
327 redis::Value::Okay => "OK".to_string(),
328 redis::Value::Nil => String::new(),
329 _ => String::new(),
330 }
331 }
332}
333
334/// Implement StreamingBackend trait for RedisStreams
335///
336/// This enables RedisStreams to be used as a pluggable streaming backend.
337#[async_trait]
338impl StreamingBackend for RedisStreams {
339 async fn stream_add(
340 &self,
341 stream_key: &str,
342 fields: Vec<(String, String)>,
343 maxlen: Option<usize>,
344 ) -> Result<String> {
345 RedisStreams::stream_add(self, stream_key, fields, maxlen).await
346 }
347
348 async fn stream_read_latest(
349 &self,
350 stream_key: &str,
351 count: usize,
352 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
353 RedisStreams::stream_read_latest(self, stream_key, count).await
354 }
355
356 async fn stream_read(
357 &self,
358 stream_key: &str,
359 last_id: &str,
360 count: usize,
361 block_ms: Option<usize>,
362 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
363 RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await
364 }
365}