multi_tier_cache/redis_streams.rs
1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//! "events",
30//! vec![
31//! ("user_id".to_string(), "123".to_string()),
32//! ("action".to_string(), "login".to_string()),
33//! ],
34//! Some(1000) // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use crate::traits::StreamingBackend;
44use anyhow::Result;
45use async_trait::async_trait;
46use redis::aio::ConnectionManager;
47use tracing::debug;
48
49/// Type alias for Redis Stream entry: (ID, Field-Value Pairs)
50pub type StreamEntry = (String, Vec<(String, String)>);
51
52/// Redis Streams client for event-driven architectures
53///
54/// Wraps Redis `ConnectionManager` to provide high-level streaming operations.
55#[derive(Clone)]
56pub struct RedisStreams {
57 conn_manager: ConnectionManager,
58}
59
60impl RedisStreams {
61 /// Create a new `RedisStreams` instance
62 ///
63 /// # Arguments
64 /// * `redis_url` - Redis connection URL (e.g., `<redis://127.0.0.1:6379>`)
65 ///
66 /// # Example
67 ///
68 /// ```no_run
69 /// # use multi_tier_cache::RedisStreams;
70 /// # async fn example() -> anyhow::Result<()> {
71 /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
72 /// # Ok(())
73 /// # }
74 /// ```
75 /// # Errors
76 ///
77 /// Returns an error if Redis connection fails.
78 pub async fn new(redis_url: &str) -> Result<Self> {
79 let client = redis::Client::open(redis_url)?;
80 let conn_manager = ConnectionManager::new(client).await?;
81
82 debug!("Redis Streams initialized at {}", redis_url);
83
84 Ok(Self { conn_manager })
85 }
86
87 /// Publish data to Redis Stream using XADD
88 ///
89 /// # Arguments
90 /// * `stream_key` - Name of the Redis Stream (e.g., "`events_stream`")
91 /// * `fields` - Vec of field-value pairs to add to the stream
92 /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
93 ///
94 /// # Returns
95 /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
96 ///
97 /// # Example
98 ///
99 /// ```no_run
100 /// # use multi_tier_cache::RedisStreams;
101 /// # async fn example() -> anyhow::Result<()> {
102 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
103 /// let event_id = streams.stream_add(
104 /// "user_events",
105 /// vec![
106 /// ("user_id".to_string(), "42".to_string()),
107 /// ("action".to_string(), "purchase".to_string()),
108 /// ("amount".to_string(), "99.99".to_string()),
109 /// ],
110 /// Some(10000) // Keep max 10k events
111 /// ).await?;
112 /// println!("Event ID: {}", event_id);
113 /// # Ok(())
114 /// # }
115 /// ```
116 /// # Errors
117 ///
118 /// Returns an error if the XADD command fails.
119 pub async fn stream_add(
120 &self,
121 stream_key: &str,
122 fields: Vec<(String, String)>,
123 maxlen: Option<usize>,
124 ) -> Result<String> {
125 let mut conn = self.conn_manager.clone();
126
127 // Build XADD command with MAXLEN trimming if specified
128 let mut cmd = redis::cmd("XADD");
129 cmd.arg(stream_key);
130
131 if let Some(max) = maxlen {
132 // Use MAXLEN ~ for approximate trimming (more efficient)
133 cmd.arg("MAXLEN").arg("~").arg(max);
134 }
135
136 cmd.arg("*"); // Auto-generate ID
137
138 // Add all field-value pairs
139 for (field, value) in &fields {
140 cmd.arg(field).arg(value);
141 }
142
143 let entry_id: String = cmd.query_async(&mut conn).await?;
144
145 debug!(
146 "[Stream] Published to '{}' (ID: {}, fields: {})",
147 stream_key,
148 entry_id,
149 fields.len()
150 );
151 Ok(entry_id)
152 }
153
154 /// Read latest entries from Redis Stream using XREVRANGE
155 ///
156 /// # Arguments
157 /// * `stream_key` - Name of the Redis Stream
158 /// * `count` - Number of latest entries to retrieve
159 ///
160 /// # Returns
161 /// Vector of (`entry_id`, fields) tuples, ordered from newest to oldest
162 ///
163 /// # Example
164 ///
165 /// ```no_run
166 /// # use multi_tier_cache::RedisStreams;
167 /// # async fn example() -> anyhow::Result<()> {
168 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
169 /// let latest_10 = streams.stream_read_latest("events", 10).await?;
170 /// for (id, fields) in latest_10 {
171 /// println!("Event {}: {:?}", id, fields);
172 /// }
173 /// # Ok(())
174 /// # }
175 /// ```
176 /// # Errors
177 ///
178 /// Returns an error if the XREVRANGE command fails.
179 pub async fn stream_read_latest(
180 &self,
181 stream_key: &str,
182 count: usize,
183 ) -> Result<Vec<StreamEntry>> {
184 let mut conn = self.conn_manager.clone();
185
186 // XREVRANGE returns a raw Value that we need to parse manually
187 let raw_result: redis::Value = redis::cmd("XREVRANGE")
188 .arg(stream_key)
189 .arg("+") // Start from the newest
190 .arg("-") // To the oldest
191 .arg("COUNT")
192 .arg(count)
193 .query_async(&mut conn)
194 .await?;
195
196 let entries = Self::parse_stream_response(raw_result)?;
197
198 debug!(
199 "[Stream] Read {} entries from '{}'",
200 entries.len(),
201 stream_key
202 );
203 Ok(entries)
204 }
205
206 /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
207 ///
208 /// # Arguments
209 /// * `stream_key` - Name of the Redis Stream
210 /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
211 /// * `count` - Maximum number of entries to retrieve
212 /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
213 ///
214 /// # Returns
215 /// Vector of (`entry_id`, fields) tuples
216 ///
217 /// # Example
218 ///
219 /// ```no_run
220 /// # use multi_tier_cache::RedisStreams;
221 /// # async fn example() -> anyhow::Result<()> {
222 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
223 /// // Non-blocking: read new entries since last seen ID
224 /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
225 ///
226 /// // Blocking: wait up to 5 seconds for new entries
227 /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
228 /// # Ok(())
229 /// # }
230 /// ```
231 /// # Errors
232 ///
233 /// Returns an error if the XREAD command fails.
234 pub async fn stream_read(
235 &self,
236 stream_key: &str,
237 last_id: &str,
238 count: usize,
239 block_ms: Option<usize>,
240 ) -> Result<Vec<StreamEntry>> {
241 let mut conn = self.conn_manager.clone();
242
243 let mut cmd = redis::cmd("XREAD");
244
245 // Add BLOCK if specified
246 if let Some(block) = block_ms {
247 cmd.arg("BLOCK").arg(block);
248 }
249
250 cmd.arg("COUNT").arg(count);
251 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
252
253 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
254
255 // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
256 let entries = Self::parse_xread_response(raw_result)?;
257
258 debug!(
259 "[Stream] XREAD retrieved {} entries from '{}'",
260 entries.len(),
261 stream_key
262 );
263 Ok(entries)
264 }
265
266 /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
267 /// Format: [[id, [field, value, field, value, ...]], ...]
268 fn parse_stream_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
269 match value {
270 redis::Value::Array(entries) => {
271 let mut result = Vec::new();
272
273 for entry in entries {
274 if let redis::Value::Array(entry_parts) = entry {
275 if entry_parts.len() >= 2 {
276 // First element is the ID
277 if let Some(id_val) = entry_parts.first() {
278 let id = Self::value_to_string(id_val);
279
280 // Second element is the field-value array
281 if let Some(redis::Value::Array(field_values)) = entry_parts.get(1)
282 {
283 let mut fields = Vec::new();
284
285 // Process pairs of field-value
286 for chunk in field_values.chunks(2) {
287 if chunk.len() == 2 {
288 if let (Some(field_val), Some(value_val)) =
289 (chunk.first(), chunk.get(1))
290 {
291 let field = Self::value_to_string(field_val);
292 let value = Self::value_to_string(value_val);
293 fields.push((field, value));
294 }
295 }
296 }
297
298 result.push((id, fields));
299 }
300 }
301 }
302 }
303 }
304
305 Ok(result)
306 }
307 redis::Value::Nil => Ok(Vec::new()),
308 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format")),
309 }
310 }
311
312 /// Helper function to parse XREAD response
313 /// Format: [[`stream_name`, [[id, [field, value, ...]], ...]], ...]
314 fn parse_xread_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
315 match value {
316 redis::Value::Array(streams) => {
317 let mut all_entries = Vec::new();
318
319 for stream in streams {
320 if let redis::Value::Array(stream_parts) = stream {
321 if stream_parts.len() >= 2 {
322 // Second element contains the entries
323 if let Some(redis::Value::Array(entries)) = stream_parts.get(1) {
324 for entry in entries {
325 if let redis::Value::Array(entry_parts) = entry {
326 if entry_parts.len() >= 2 {
327 if let Some(id_val) = entry_parts.first() {
328 let id = Self::value_to_string(id_val);
329
330 if let Some(redis::Value::Array(field_values)) =
331 entry_parts.get(1)
332 {
333 let mut fields = Vec::new();
334
335 for chunk in field_values.chunks(2) {
336 if chunk.len() == 2 {
337 if let (
338 Some(field_val),
339 Some(value_val),
340 ) = (chunk.first(), chunk.get(1))
341 {
342 let field = Self::value_to_string(
343 field_val,
344 );
345 let value = Self::value_to_string(
346 value_val,
347 );
348 fields.push((field, value));
349 }
350 }
351 }
352
353 all_entries.push((id, fields));
354 }
355 }
356 }
357 }
358 }
359 }
360 }
361 }
362 }
363
364 Ok(all_entries)
365 }
366 redis::Value::Nil => Ok(Vec::new()),
367 _ => Err(anyhow::anyhow!("Unexpected XREAD response format")),
368 }
369 }
370
371 /// Helper function to convert Redis Value to String
372 fn value_to_string(value: &redis::Value) -> String {
373 match value {
374 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
375 redis::Value::SimpleString(s) => s.clone(),
376 redis::Value::Int(i) => i.to_string(),
377 redis::Value::Okay => "OK".to_string(),
378 _ => String::new(),
379 }
380 }
381}
382
383/// Implement `StreamingBackend` trait for `RedisStreams`
384///
385/// This enables `RedisStreams` to be used as a pluggable streaming backend.
386#[async_trait]
387impl StreamingBackend for RedisStreams {
388 async fn stream_add(
389 &self,
390 stream_key: &str,
391 fields: Vec<(String, String)>,
392 maxlen: Option<usize>,
393 ) -> Result<String> {
394 RedisStreams::stream_add(self, stream_key, fields, maxlen).await
395 }
396
397 async fn stream_read_latest(
398 &self,
399 stream_key: &str,
400 count: usize,
401 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
402 let entries = RedisStreams::stream_read_latest(self, stream_key, count).await?;
403 // Convert StreamEntry to the trait's expected type if they match, or just return
404 // Since StreamEntry IS (String, Vec<(String, String)>), this is fine.
405 // However, the trait definition in traits.rs might not use the alias.
406 // Let's check traits.rs first or just return the result since the types are identical.
407 Ok(entries)
408 }
409
410 async fn stream_read(
411 &self,
412 stream_key: &str,
413 last_id: &str,
414 count: usize,
415 block_ms: Option<usize>,
416 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
417 let entries = RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await?;
418 Ok(entries)
419 }
420}