redis_oxide/
streams.rs

1//! Redis Streams support for event sourcing and messaging
2//!
3//! Redis Streams provide a powerful data structure for handling time-series data,
4//! event sourcing, and message queuing. This module provides a high-level API
5//! for working with Redis Streams.
6//!
7//! # Examples
8//!
9//! ## Basic Stream Operations
10//!
11//! ```no_run
12//! use redis_oxide::{Client, ConnectionConfig};
13//! use std::collections::HashMap;
14//!
15//! # #[tokio::main]
16//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//! let config = ConnectionConfig::new("redis://localhost:6379");
18//! let client = Client::connect(config).await?;
19//!
20//! // Add entries to a stream
21//! let mut fields = HashMap::new();
22//! fields.insert("user_id".to_string(), "123".to_string());
23//! fields.insert("action".to_string(), "login".to_string());
24//!
25//! let entry_id = client.xadd("events", "*", fields).await?;
26//! println!("Added entry: {}", entry_id);
27//!
28//! // Read from stream
29//! let entries = client.xrange("events", "-", "+", Some(10)).await?;
30//! for entry in entries {
31//!     println!("Entry {}: {:?}", entry.id, entry.fields);
32//! }
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! ## Consumer Groups
38//!
39//! ```no_run
40//! use redis_oxide::{Client, ConnectionConfig};
41//!
42//! # #[tokio::main]
43//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
44//! let config = ConnectionConfig::new("redis://localhost:6379");
45//! let client = Client::connect(config).await?;
46//!
47//! // Create a consumer group
48//! client.xgroup_create("events", "processors", "$", true).await?;
49//!
50//! // Read from the group
51//! let messages = client.xreadgroup(
52//!     "processors",
53//!     "worker-1",
54//!     vec![("events".to_string(), ">".to_string())],
55//!     Some(1),
56//!     Some(std::time::Duration::from_secs(1))
57//! ).await?;
58//!
59//! for (stream, entries) in messages {
60//!     for entry in entries {
61//!         println!("Processing {}: {:?}", entry.id, entry.fields);
62//!         // Process the message...
63//!         
64//!         // Acknowledge the message
65//!         client.xack("events", "processors", vec![entry.id]).await?;
66//!     }
67//! }
68//! # Ok(())
69//! # }
70//! ```
71
72use crate::core::{
73    error::{RedisError, RedisResult},
74    value::RespValue,
75};
76use std::collections::HashMap;
77use std::time::Duration;
78
79/// Represents a single entry in a Redis Stream
80#[derive(Debug, Clone)]
81pub struct StreamEntry {
82    /// The unique ID of the entry (e.g., "1234567890123-0")
83    pub id: String,
84    /// The field-value pairs of the entry
85    pub fields: HashMap<String, String>,
86}
87
88impl StreamEntry {
89    /// Create a new stream entry
90    pub fn new(id: String, fields: HashMap<String, String>) -> Self {
91        Self { id, fields }
92    }
93
94    /// Get a field value by name
95    #[must_use]
96    pub fn get_field(&self, field: &str) -> Option<&String> {
97        self.fields.get(field)
98    }
99
100    /// Check if the entry has a specific field
101    #[must_use]
102    pub fn has_field(&self, field: &str) -> bool {
103        self.fields.contains_key(field)
104    }
105
106    /// Get the timestamp part of the entry ID
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use redis_oxide::StreamEntry;
112    /// use std::collections::HashMap;
113    ///
114    /// let entry = StreamEntry::new("1234567890123-0".to_string(), HashMap::new());
115    /// assert_eq!(entry.timestamp(), Some(1234567890123));
116    /// ```
117    #[must_use]
118    pub fn timestamp(&self) -> Option<u64> {
119        self.id.split('-').next()?.parse().ok()
120    }
121
122    /// Get the sequence number part of the entry ID
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use redis_oxide::StreamEntry;
128    /// use std::collections::HashMap;
129    ///
130    /// let entry = StreamEntry::new("1234567890123-5".to_string(), HashMap::new());
131    /// assert_eq!(entry.sequence(), Some(5));
132    /// ```
133    #[must_use]
134    pub fn sequence(&self) -> Option<u64> {
135        self.id.split('-').nth(1)?.parse().ok()
136    }
137}
138
139/// Information about a Redis Stream
140#[derive(Debug, Clone)]
141pub struct StreamInfo {
142    /// The length of the stream (number of entries)
143    pub length: u64,
144    /// The number of consumer groups
145    pub groups: u64,
146    /// The ID of the first entry
147    pub first_entry: Option<String>,
148    /// The ID of the last entry
149    pub last_entry: Option<String>,
150    /// The last generated ID
151    pub last_generated_id: String,
152}
153
154/// Information about a consumer group
155#[derive(Debug, Clone)]
156pub struct ConsumerGroupInfo {
157    /// The name of the consumer group
158    pub name: String,
159    /// The number of consumers in the group
160    pub consumers: u64,
161    /// The number of pending messages
162    pub pending: u64,
163    /// The ID of the last delivered message
164    pub last_delivered_id: String,
165}
166
167/// Information about a consumer in a group
168#[derive(Debug, Clone)]
169pub struct ConsumerInfo {
170    /// The name of the consumer
171    pub name: String,
172    /// The number of pending messages for this consumer
173    pub pending: u64,
174    /// The idle time in milliseconds
175    pub idle: u64,
176}
177
178/// Pending message information
179#[derive(Debug, Clone)]
180pub struct PendingMessage {
181    /// The message ID
182    pub id: String,
183    /// The consumer name
184    pub consumer: String,
185    /// The idle time in milliseconds
186    pub idle_time: u64,
187    /// The delivery count
188    pub delivery_count: u64,
189}
190
191/// Stream range options for XRANGE and XREVRANGE
192#[derive(Debug, Clone)]
193pub struct StreamRange {
194    /// Start ID (inclusive)
195    pub start: String,
196    /// End ID (inclusive)
197    pub end: String,
198    /// Maximum number of entries to return
199    pub count: Option<u64>,
200}
201
202impl StreamRange {
203    /// Create a new stream range
204    pub fn new(start: impl Into<String>, end: impl Into<String>) -> Self {
205        Self {
206            start: start.into(),
207            end: end.into(),
208            count: None,
209        }
210    }
211
212    /// Set the maximum number of entries to return
213    pub fn with_count(mut self, count: u64) -> Self {
214        self.count = Some(count);
215        self
216    }
217
218    /// Create a range for all entries
219    pub fn all() -> Self {
220        Self::new("-", "+")
221    }
222
223    /// Create a range from a specific ID to the end
224    pub fn from(start: impl Into<String>) -> Self {
225        Self::new(start, "+")
226    }
227
228    /// Create a range from the beginning to a specific ID
229    pub fn to(end: impl Into<String>) -> Self {
230        Self::new("-", end)
231    }
232}
233
234/// Options for XREAD and XREADGROUP commands
235#[derive(Debug, Clone)]
236pub struct ReadOptions {
237    /// Maximum number of entries per stream
238    pub count: Option<u64>,
239    /// Block timeout in milliseconds (None for non-blocking)
240    pub block: Option<Duration>,
241}
242
243impl ReadOptions {
244    /// Create new read options
245    #[must_use]
246    pub fn new() -> Self {
247        Self {
248            count: None,
249            block: None,
250        }
251    }
252
253    /// Set the maximum number of entries per stream
254    pub fn with_count(mut self, count: u64) -> Self {
255        self.count = Some(count);
256        self
257    }
258
259    /// Set the block timeout
260    pub fn with_block(mut self, timeout: Duration) -> Self {
261        self.block = Some(timeout);
262        self
263    }
264
265    /// Create blocking read options
266    pub fn blocking(timeout: Duration) -> Self {
267        Self::new().with_block(timeout)
268    }
269
270    /// Create non-blocking read options with count limit
271    pub fn non_blocking(count: u64) -> Self {
272        Self::new().with_count(count)
273    }
274}
275
276impl Default for ReadOptions {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282/// Parse stream entries from Redis response
283pub fn parse_stream_entries(response: RespValue) -> RedisResult<Vec<StreamEntry>> {
284    match response {
285        RespValue::Array(items) => {
286            let mut entries = Vec::new();
287
288            for item in items {
289                match item {
290                    RespValue::Array(entry_data) if entry_data.len() == 2 => {
291                        let id = entry_data[0].as_string()?;
292
293                        match &entry_data[1] {
294                            RespValue::Array(field_values) => {
295                                let mut fields = HashMap::new();
296
297                                // Fields are stored as [field1, value1, field2, value2, ...]
298                                for chunk in field_values.chunks(2) {
299                                    if chunk.len() == 2 {
300                                        let field = chunk[0].as_string()?;
301                                        let value = chunk[1].as_string()?;
302                                        fields.insert(field, value);
303                                    }
304                                }
305
306                                entries.push(StreamEntry::new(id, fields));
307                            }
308                            _ => {
309                                return Err(RedisError::Type(format!(
310                                    "Invalid stream entry field format: {:?}",
311                                    entry_data[1]
312                                )))
313                            }
314                        }
315                    }
316                    _ => {
317                        return Err(RedisError::Type(format!(
318                            "Invalid stream entry format: {:?}",
319                            item
320                        )))
321                    }
322                }
323            }
324
325            Ok(entries)
326        }
327        _ => Err(RedisError::Type(format!(
328            "Expected array for stream entries, got: {:?}",
329            response
330        ))),
331    }
332}
333
334/// Parse XREAD/XREADGROUP response
335pub fn parse_xread_response(response: RespValue) -> RedisResult<HashMap<String, Vec<StreamEntry>>> {
336    match response {
337        RespValue::Array(streams) => {
338            let mut result = HashMap::new();
339
340            for stream in streams {
341                match stream {
342                    RespValue::Array(stream_data) if stream_data.len() == 2 => {
343                        let stream_name = stream_data[0].as_string()?;
344                        let entries = parse_stream_entries(stream_data[1].clone())?;
345                        result.insert(stream_name, entries);
346                    }
347                    _ => {
348                        return Err(RedisError::Type(format!(
349                            "Invalid XREAD response format: {:?}",
350                            stream
351                        )))
352                    }
353                }
354            }
355
356            Ok(result)
357        }
358        RespValue::Null => Ok(HashMap::new()), // No new entries
359        _ => Err(RedisError::Type(format!(
360            "Expected array or null for XREAD response, got: {:?}",
361            response
362        ))),
363    }
364}
365
366/// Parse stream info from XINFO STREAM response
367pub fn parse_stream_info(response: RespValue) -> RedisResult<StreamInfo> {
368    match response {
369        RespValue::Array(items) => {
370            let mut length = 0;
371            let mut groups = 0;
372            let mut first_entry = None;
373            let mut last_entry = None;
374            let mut last_generated_id = String::new();
375
376            // Parse key-value pairs
377            for chunk in items.chunks(2) {
378                if chunk.len() == 2 {
379                    let key = chunk[0].as_string()?;
380                    match key.as_str() {
381                        "length" => length = chunk[1].as_int()? as u64,
382                        "groups" => groups = chunk[1].as_int()? as u64,
383                        "first-entry" => {
384                            if !chunk[1].is_null() {
385                                if let RespValue::Array(entry) = &chunk[1] {
386                                    if !entry.is_empty() {
387                                        first_entry = Some(entry[0].as_string()?);
388                                    }
389                                }
390                            }
391                        }
392                        "last-entry" => {
393                            if !chunk[1].is_null() {
394                                if let RespValue::Array(entry) = &chunk[1] {
395                                    if !entry.is_empty() {
396                                        last_entry = Some(entry[0].as_string()?);
397                                    }
398                                }
399                            }
400                        }
401                        "last-generated-id" => last_generated_id = chunk[1].as_string()?,
402                        _ => {} // Ignore unknown fields
403                    }
404                }
405            }
406
407            Ok(StreamInfo {
408                length,
409                groups,
410                first_entry,
411                last_entry,
412                last_generated_id,
413            })
414        }
415        _ => Err(RedisError::Type(format!(
416            "Expected array for stream info, got: {:?}",
417            response
418        ))),
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn test_stream_entry_creation() {
428        let mut fields = HashMap::new();
429        fields.insert("user".to_string(), "alice".to_string());
430        fields.insert("action".to_string(), "login".to_string());
431
432        let entry = StreamEntry::new("1234567890123-0".to_string(), fields.clone());
433
434        assert_eq!(entry.id, "1234567890123-0");
435        assert_eq!(entry.fields, fields);
436        assert_eq!(entry.get_field("user"), Some(&"alice".to_string()));
437        assert!(entry.has_field("action"));
438        assert!(!entry.has_field("nonexistent"));
439    }
440
441    #[test]
442    fn test_stream_entry_timestamp_parsing() {
443        let entry = StreamEntry::new("1234567890123-5".to_string(), HashMap::new());
444
445        assert_eq!(entry.timestamp(), Some(1_234_567_890_123));
446        assert_eq!(entry.sequence(), Some(5));
447    }
448
449    #[test]
450    fn test_stream_entry_invalid_id() {
451        let entry = StreamEntry::new("invalid-id".to_string(), HashMap::new());
452
453        assert_eq!(entry.timestamp(), None);
454        assert_eq!(entry.sequence(), None);
455    }
456
457    #[test]
458    fn test_stream_range_creation() {
459        let range = StreamRange::new("1000", "2000").with_count(10);
460
461        assert_eq!(range.start, "1000");
462        assert_eq!(range.end, "2000");
463        assert_eq!(range.count, Some(10));
464    }
465
466    #[test]
467    fn test_stream_range_presets() {
468        let all = StreamRange::all();
469        assert_eq!(all.start, "-");
470        assert_eq!(all.end, "+");
471
472        let from = StreamRange::from("1000");
473        assert_eq!(from.start, "1000");
474        assert_eq!(from.end, "+");
475
476        let to = StreamRange::to("2000");
477        assert_eq!(to.start, "-");
478        assert_eq!(to.end, "2000");
479    }
480
481    #[test]
482    fn test_read_options() {
483        let options = ReadOptions::new()
484            .with_count(5)
485            .with_block(Duration::from_secs(1));
486
487        assert_eq!(options.count, Some(5));
488        assert_eq!(options.block, Some(Duration::from_secs(1)));
489
490        let blocking = ReadOptions::blocking(Duration::from_millis(500));
491        assert_eq!(blocking.block, Some(Duration::from_millis(500)));
492
493        let non_blocking = ReadOptions::non_blocking(10);
494        assert_eq!(non_blocking.count, Some(10));
495        assert_eq!(non_blocking.block, None);
496    }
497
498    #[test]
499    fn test_parse_stream_entries() {
500        let response = RespValue::Array(vec![
501            RespValue::Array(vec![
502                RespValue::from("1234567890123-0"),
503                RespValue::Array(vec![
504                    RespValue::from("user"),
505                    RespValue::from("alice"),
506                    RespValue::from("action"),
507                    RespValue::from("login"),
508                ]),
509            ]),
510            RespValue::Array(vec![
511                RespValue::from("1234567890124-0"),
512                RespValue::Array(vec![
513                    RespValue::from("user"),
514                    RespValue::from("bob"),
515                    RespValue::from("action"),
516                    RespValue::from("logout"),
517                ]),
518            ]),
519        ]);
520
521        let entries = parse_stream_entries(response).unwrap();
522        assert_eq!(entries.len(), 2);
523
524        assert_eq!(entries[0].id, "1234567890123-0");
525        assert_eq!(entries[0].get_field("user"), Some(&"alice".to_string()));
526        assert_eq!(entries[0].get_field("action"), Some(&"login".to_string()));
527
528        assert_eq!(entries[1].id, "1234567890124-0");
529        assert_eq!(entries[1].get_field("user"), Some(&"bob".to_string()));
530        assert_eq!(entries[1].get_field("action"), Some(&"logout".to_string()));
531    }
532
533    #[test]
534    fn test_parse_xread_response() {
535        let response = RespValue::Array(vec![RespValue::Array(vec![
536            RespValue::from("stream1"),
537            RespValue::Array(vec![RespValue::Array(vec![
538                RespValue::from("1234567890123-0"),
539                RespValue::Array(vec![RespValue::from("field1"), RespValue::from("value1")]),
540            ])]),
541        ])]);
542
543        let result = parse_xread_response(response).unwrap();
544        assert_eq!(result.len(), 1);
545        assert!(result.contains_key("stream1"));
546
547        let entries = &result["stream1"];
548        assert_eq!(entries.len(), 1);
549        assert_eq!(entries[0].id, "1234567890123-0");
550        assert_eq!(entries[0].get_field("field1"), Some(&"value1".to_string()));
551    }
552
553    #[test]
554    fn test_parse_xread_response_null() {
555        let response = RespValue::Null;
556        let result = parse_xread_response(response).unwrap();
557        assert!(result.is_empty());
558    }
559}