multi_tier_cache/redis_streams.rs
1//! Redis Streams Integration
2//!
3//! Provides high-level Redis Streams functionality for event-driven architectures.
4//!
5//! Redis Streams is a data structure that acts like an append-only log, perfect for:
6//! - Event sourcing
7//! - Real-time analytics
8//! - Message queues
9//! - Activity feeds
10//! - Audit logs
11//!
12//! # Features
13//!
14//! - **XADD**: Publish events with automatic ID generation
15//! - **XREVRANGE**: Read latest N entries (newest first)
16//! - **XREAD**: Read new entries (with optional blocking)
17//! - **MAXLEN**: Automatic stream trimming to prevent unbounded growth
18//!
19//! # Example
20//!
21//! ```no_run
22//! use multi_tier_cache::RedisStreams;
23//!
24//! # async fn example() -> anyhow::Result<()> {
25//! let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
26//!
27//! // Publish event
28//! let event_id = streams.stream_add(
29//! "events",
30//! vec![
31//! ("user_id".to_string(), "123".to_string()),
32//! ("action".to_string(), "login".to_string()),
33//! ],
34//! Some(1000) // Keep only latest 1000 events
35//! ).await?;
36//!
37//! // Read latest 10 events
38//! let latest = streams.stream_read_latest("events", 10).await?;
39//! # Ok(())
40//! # }
41//! ```
42
43use anyhow::Result;
44use redis::aio::ConnectionManager;
45use async_trait::async_trait;
46use tracing::debug;
47use crate::traits::StreamingBackend;
48
49/// Type alias for Redis Stream entry: (ID, Field-Value Pairs)
50pub type StreamEntry = (String, Vec<(String, String)>);
51
52/// Redis Streams client for event-driven architectures
53///
54/// Wraps Redis ConnectionManager to provide high-level streaming operations.
55#[derive(Clone)]
56pub struct RedisStreams {
57 conn_manager: ConnectionManager,
58}
59
60impl RedisStreams {
61 /// Create a new RedisStreams instance
62 ///
63 /// # Arguments
64 /// * `redis_url` - Redis connection URL (e.g., "redis://127.0.0.1:6379")
65 ///
66 /// # Example
67 ///
68 /// ```no_run
69 /// # use multi_tier_cache::RedisStreams;
70 /// # async fn example() -> anyhow::Result<()> {
71 /// let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
72 /// # Ok(())
73 /// # }
74 /// ```
75 pub async fn new(redis_url: &str) -> Result<Self> {
76 let client = redis::Client::open(redis_url)?;
77 let conn_manager = ConnectionManager::new(client).await?;
78
79 debug!("Redis Streams initialized at {}", redis_url);
80
81 Ok(Self { conn_manager })
82 }
83
84 /// Publish data to Redis Stream using XADD
85 ///
86 /// # Arguments
87 /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
88 /// * `fields` - Vec of field-value pairs to add to the stream
89 /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
90 ///
91 /// # Returns
92 /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
93 ///
94 /// # Example
95 ///
96 /// ```no_run
97 /// # use multi_tier_cache::RedisStreams;
98 /// # async fn example() -> anyhow::Result<()> {
99 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
100 /// let event_id = streams.stream_add(
101 /// "user_events",
102 /// vec![
103 /// ("user_id".to_string(), "42".to_string()),
104 /// ("action".to_string(), "purchase".to_string()),
105 /// ("amount".to_string(), "99.99".to_string()),
106 /// ],
107 /// Some(10000) // Keep max 10k events
108 /// ).await?;
109 /// println!("Event ID: {}", event_id);
110 /// # Ok(())
111 /// # }
112 /// ```
113 pub async fn stream_add(
114 &self,
115 stream_key: &str,
116 fields: Vec<(String, String)>,
117 maxlen: Option<usize>
118 ) -> Result<String> {
119 let mut conn = self.conn_manager.clone();
120
121 // Build XADD command with MAXLEN trimming if specified
122 let mut cmd = redis::cmd("XADD");
123 cmd.arg(stream_key);
124
125 if let Some(max) = maxlen {
126 // Use MAXLEN ~ for approximate trimming (more efficient)
127 cmd.arg("MAXLEN").arg("~").arg(max);
128 }
129
130 cmd.arg("*"); // Auto-generate ID
131
132 // Add all field-value pairs
133 for (field, value) in &fields {
134 cmd.arg(field).arg(value);
135 }
136
137 let entry_id: String = cmd.query_async(&mut conn).await?;
138
139 debug!("[Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
140 Ok(entry_id)
141 }
142
143 /// Read latest entries from Redis Stream using XREVRANGE
144 ///
145 /// # Arguments
146 /// * `stream_key` - Name of the Redis Stream
147 /// * `count` - Number of latest entries to retrieve
148 ///
149 /// # Returns
150 /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
151 ///
152 /// # Example
153 ///
154 /// ```no_run
155 /// # use multi_tier_cache::RedisStreams;
156 /// # async fn example() -> anyhow::Result<()> {
157 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
158 /// let latest_10 = streams.stream_read_latest("events", 10).await?;
159 /// for (id, fields) in latest_10 {
160 /// println!("Event {}: {:?}", id, fields);
161 /// }
162 /// # Ok(())
163 /// # }
164 /// ```
165 pub async fn stream_read_latest(
166 &self,
167 stream_key: &str,
168 count: usize
169 ) -> Result<Vec<StreamEntry>> {
170 let mut conn = self.conn_manager.clone();
171
172 // XREVRANGE returns a raw Value that we need to parse manually
173 let raw_result: redis::Value = redis::cmd("XREVRANGE")
174 .arg(stream_key)
175 .arg("+") // Start from the newest
176 .arg("-") // To the oldest
177 .arg("COUNT")
178 .arg(count)
179 .query_async(&mut conn)
180 .await?;
181
182 let entries = Self::parse_stream_response(raw_result)?;
183
184 debug!("[Stream] Read {} entries from '{}'", entries.len(), stream_key);
185 Ok(entries)
186 }
187
188 /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
189 ///
190 /// # Arguments
191 /// * `stream_key` - Name of the Redis Stream
192 /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
193 /// * `count` - Maximum number of entries to retrieve
194 /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
195 ///
196 /// # Returns
197 /// Vector of (entry_id, fields) tuples
198 ///
199 /// # Example
200 ///
201 /// ```no_run
202 /// # use multi_tier_cache::RedisStreams;
203 /// # async fn example() -> anyhow::Result<()> {
204 /// # let streams = RedisStreams::new("redis://127.0.0.1:6379").await?;
205 /// // Non-blocking: read new entries since last seen ID
206 /// let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
207 ///
208 /// // Blocking: wait up to 5 seconds for new entries
209 /// let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub async fn stream_read(
214 &self,
215 stream_key: &str,
216 last_id: &str,
217 count: usize,
218 block_ms: Option<usize>
219 ) -> Result<Vec<StreamEntry>> {
220 let mut conn = self.conn_manager.clone();
221
222 let mut cmd = redis::cmd("XREAD");
223
224 // Add BLOCK if specified
225 if let Some(block) = block_ms {
226 cmd.arg("BLOCK").arg(block);
227 }
228
229 cmd.arg("COUNT").arg(count);
230 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
231
232 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
233
234 // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
235 let entries = Self::parse_xread_response(raw_result)?;
236
237 debug!("[Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
238 Ok(entries)
239 }
240
241 /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
242 /// Format: [[id, [field, value, field, value, ...]], ...]
243 fn parse_stream_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
244 match value {
245 redis::Value::Array(entries) => {
246 let mut result = Vec::new();
247
248 for entry in entries {
249 if let redis::Value::Array(entry_parts) = entry {
250 if entry_parts.len() >= 2 {
251 // First element is the ID
252 let id = Self::value_to_string(&entry_parts[0]);
253
254 // Second element is the field-value array
255 if let redis::Value::Array(field_values) = &entry_parts[1] {
256 let mut fields = Vec::new();
257
258 // Process pairs of field-value
259 for chunk in field_values.chunks(2) {
260 if chunk.len() == 2 {
261 let field = Self::value_to_string(&chunk[0]);
262 let value = Self::value_to_string(&chunk[1]);
263 fields.push((field, value));
264 }
265 }
266
267 result.push((id, fields));
268 }
269 }
270 }
271 }
272
273 Ok(result)
274 }
275 redis::Value::Nil => Ok(Vec::new()),
276 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
277 }
278 }
279
280 /// Helper function to parse XREAD response
281 /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
282 fn parse_xread_response(value: redis::Value) -> Result<Vec<StreamEntry>> {
283 match value {
284 redis::Value::Array(streams) => {
285 let mut all_entries = Vec::new();
286
287 for stream in streams {
288 if let redis::Value::Array(stream_parts) = stream {
289 if stream_parts.len() >= 2 {
290 // Second element contains the entries
291 if let redis::Value::Array(entries) = &stream_parts[1] {
292 for entry in entries {
293 if let redis::Value::Array(entry_parts) = entry {
294 if entry_parts.len() >= 2 {
295 let id = Self::value_to_string(&entry_parts[0]);
296
297 if let redis::Value::Array(field_values) = &entry_parts[1] {
298 let mut fields = Vec::new();
299
300 for chunk in field_values.chunks(2) {
301 if chunk.len() == 2 {
302 let field = Self::value_to_string(&chunk[0]);
303 let value = Self::value_to_string(&chunk[1]);
304 fields.push((field, value));
305 }
306 }
307
308 all_entries.push((id, fields));
309 }
310 }
311 }
312 }
313 }
314 }
315 }
316 }
317
318 Ok(all_entries)
319 }
320 redis::Value::Nil => Ok(Vec::new()),
321 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
322 }
323 }
324
325 /// Helper function to convert Redis Value to String
326 fn value_to_string(value: &redis::Value) -> String {
327 match value {
328 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
329 redis::Value::SimpleString(s) => s.clone(),
330 redis::Value::Int(i) => i.to_string(),
331 redis::Value::Okay => "OK".to_string(),
332 redis::Value::Nil => String::new(),
333 _ => String::new(),
334 }
335 }
336}
337
338/// Implement StreamingBackend trait for RedisStreams
339///
340/// This enables RedisStreams to be used as a pluggable streaming backend.
341#[async_trait]
342impl StreamingBackend for RedisStreams {
343 async fn stream_add(
344 &self,
345 stream_key: &str,
346 fields: Vec<(String, String)>,
347 maxlen: Option<usize>,
348 ) -> Result<String> {
349 RedisStreams::stream_add(self, stream_key, fields, maxlen).await
350 }
351
352 async fn stream_read_latest(
353 &self,
354 stream_key: &str,
355 count: usize,
356 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
357 let entries = RedisStreams::stream_read_latest(self, stream_key, count).await?;
358 // Convert StreamEntry to the trait's expected type if they match, or just return
359 // Since StreamEntry IS (String, Vec<(String, String)>), this is fine.
360 // However, the trait definition in traits.rs might not use the alias.
361 // Let's check traits.rs first or just return the result since the types are identical.
362 Ok(entries)
363 }
364
365 async fn stream_read(
366 &self,
367 stream_key: &str,
368 last_id: &str,
369 count: usize,
370 block_ms: Option<usize>,
371 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
372 let entries = RedisStreams::stream_read(self, stream_key, last_id, count, block_ms).await?;
373 Ok(entries)
374 }
375}