1use crate::core::{
73    error::{RedisError, RedisResult},
74    value::RespValue,
75};
76use std::collections::HashMap;
77use std::time::Duration;
78
79#[derive(Debug, Clone)]
81pub struct StreamEntry {
82    pub id: String,
84    pub fields: HashMap<String, String>,
86}
87
88impl StreamEntry {
89    pub fn new(id: String, fields: HashMap<String, String>) -> Self {
91        Self { id, fields }
92    }
93
94    #[must_use]
96    pub fn get_field(&self, field: &str) -> Option<&String> {
97        self.fields.get(field)
98    }
99
100    #[must_use]
102    pub fn has_field(&self, field: &str) -> bool {
103        self.fields.contains_key(field)
104    }
105
106    #[must_use]
118    pub fn timestamp(&self) -> Option<u64> {
119        self.id.split('-').next()?.parse().ok()
120    }
121
122    #[must_use]
134    pub fn sequence(&self) -> Option<u64> {
135        self.id.split('-').nth(1)?.parse().ok()
136    }
137}
138
139#[derive(Debug, Clone)]
141pub struct StreamInfo {
142    pub length: u64,
144    pub groups: u64,
146    pub first_entry: Option<String>,
148    pub last_entry: Option<String>,
150    pub last_generated_id: String,
152}
153
154#[derive(Debug, Clone)]
156pub struct ConsumerGroupInfo {
157    pub name: String,
159    pub consumers: u64,
161    pub pending: u64,
163    pub last_delivered_id: String,
165}
166
167#[derive(Debug, Clone)]
169pub struct ConsumerInfo {
170    pub name: String,
172    pub pending: u64,
174    pub idle: u64,
176}
177
178#[derive(Debug, Clone)]
180pub struct PendingMessage {
181    pub id: String,
183    pub consumer: String,
185    pub idle_time: u64,
187    pub delivery_count: u64,
189}
190
191#[derive(Debug, Clone)]
193pub struct StreamRange {
194    pub start: String,
196    pub end: String,
198    pub count: Option<u64>,
200}
201
202impl StreamRange {
203    pub fn new(start: impl Into<String>, end: impl Into<String>) -> Self {
205        Self {
206            start: start.into(),
207            end: end.into(),
208            count: None,
209        }
210    }
211
212    pub fn with_count(mut self, count: u64) -> Self {
214        self.count = Some(count);
215        self
216    }
217
218    pub fn all() -> Self {
220        Self::new("-", "+")
221    }
222
223    pub fn from(start: impl Into<String>) -> Self {
225        Self::new(start, "+")
226    }
227
228    pub fn to(end: impl Into<String>) -> Self {
230        Self::new("-", end)
231    }
232}
233
234#[derive(Debug, Clone)]
236pub struct ReadOptions {
237    pub count: Option<u64>,
239    pub block: Option<Duration>,
241}
242
243impl ReadOptions {
244    #[must_use]
246    pub fn new() -> Self {
247        Self {
248            count: None,
249            block: None,
250        }
251    }
252
253    pub fn with_count(mut self, count: u64) -> Self {
255        self.count = Some(count);
256        self
257    }
258
259    pub fn with_block(mut self, timeout: Duration) -> Self {
261        self.block = Some(timeout);
262        self
263    }
264
265    pub fn blocking(timeout: Duration) -> Self {
267        Self::new().with_block(timeout)
268    }
269
270    pub fn non_blocking(count: u64) -> Self {
272        Self::new().with_count(count)
273    }
274}
275
276impl Default for ReadOptions {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282pub fn parse_stream_entries(response: RespValue) -> RedisResult<Vec<StreamEntry>> {
284    match response {
285        RespValue::Array(items) => {
286            let mut entries = Vec::new();
287
288            for item in items {
289                match item {
290                    RespValue::Array(entry_data) if entry_data.len() == 2 => {
291                        let id = entry_data[0].as_string()?;
292
293                        match &entry_data[1] {
294                            RespValue::Array(field_values) => {
295                                let mut fields = HashMap::new();
296
297                                for chunk in field_values.chunks(2) {
299                                    if chunk.len() == 2 {
300                                        let field = chunk[0].as_string()?;
301                                        let value = chunk[1].as_string()?;
302                                        fields.insert(field, value);
303                                    }
304                                }
305
306                                entries.push(StreamEntry::new(id, fields));
307                            }
308                            _ => {
309                                return Err(RedisError::Type(format!(
310                                    "Invalid stream entry field format: {:?}",
311                                    entry_data[1]
312                                )))
313                            }
314                        }
315                    }
316                    _ => {
317                        return Err(RedisError::Type(format!(
318                            "Invalid stream entry format: {:?}",
319                            item
320                        )))
321                    }
322                }
323            }
324
325            Ok(entries)
326        }
327        _ => Err(RedisError::Type(format!(
328            "Expected array for stream entries, got: {:?}",
329            response
330        ))),
331    }
332}
333
334pub fn parse_xread_response(response: RespValue) -> RedisResult<HashMap<String, Vec<StreamEntry>>> {
336    match response {
337        RespValue::Array(streams) => {
338            let mut result = HashMap::new();
339
340            for stream in streams {
341                match stream {
342                    RespValue::Array(stream_data) if stream_data.len() == 2 => {
343                        let stream_name = stream_data[0].as_string()?;
344                        let entries = parse_stream_entries(stream_data[1].clone())?;
345                        result.insert(stream_name, entries);
346                    }
347                    _ => {
348                        return Err(RedisError::Type(format!(
349                            "Invalid XREAD response format: {:?}",
350                            stream
351                        )))
352                    }
353                }
354            }
355
356            Ok(result)
357        }
358        RespValue::Null => Ok(HashMap::new()), _ => Err(RedisError::Type(format!(
360            "Expected array or null for XREAD response, got: {:?}",
361            response
362        ))),
363    }
364}
365
366pub fn parse_stream_info(response: RespValue) -> RedisResult<StreamInfo> {
368    match response {
369        RespValue::Array(items) => {
370            let mut length = 0;
371            let mut groups = 0;
372            let mut first_entry = None;
373            let mut last_entry = None;
374            let mut last_generated_id = String::new();
375
376            for chunk in items.chunks(2) {
378                if chunk.len() == 2 {
379                    let key = chunk[0].as_string()?;
380                    match key.as_str() {
381                        "length" => length = chunk[1].as_int()? as u64,
382                        "groups" => groups = chunk[1].as_int()? as u64,
383                        "first-entry" => {
384                            if !chunk[1].is_null() {
385                                if let RespValue::Array(entry) = &chunk[1] {
386                                    if !entry.is_empty() {
387                                        first_entry = Some(entry[0].as_string()?);
388                                    }
389                                }
390                            }
391                        }
392                        "last-entry" => {
393                            if !chunk[1].is_null() {
394                                if let RespValue::Array(entry) = &chunk[1] {
395                                    if !entry.is_empty() {
396                                        last_entry = Some(entry[0].as_string()?);
397                                    }
398                                }
399                            }
400                        }
401                        "last-generated-id" => last_generated_id = chunk[1].as_string()?,
402                        _ => {} }
404                }
405            }
406
407            Ok(StreamInfo {
408                length,
409                groups,
410                first_entry,
411                last_entry,
412                last_generated_id,
413            })
414        }
415        _ => Err(RedisError::Type(format!(
416            "Expected array for stream info, got: {:?}",
417            response
418        ))),
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn test_stream_entry_creation() {
428        let mut fields = HashMap::new();
429        fields.insert("user".to_string(), "alice".to_string());
430        fields.insert("action".to_string(), "login".to_string());
431
432        let entry = StreamEntry::new("1234567890123-0".to_string(), fields.clone());
433
434        assert_eq!(entry.id, "1234567890123-0");
435        assert_eq!(entry.fields, fields);
436        assert_eq!(entry.get_field("user"), Some(&"alice".to_string()));
437        assert!(entry.has_field("action"));
438        assert!(!entry.has_field("nonexistent"));
439    }
440
441    #[test]
442    fn test_stream_entry_timestamp_parsing() {
443        let entry = StreamEntry::new("1234567890123-5".to_string(), HashMap::new());
444
445        assert_eq!(entry.timestamp(), Some(1_234_567_890_123));
446        assert_eq!(entry.sequence(), Some(5));
447    }
448
449    #[test]
450    fn test_stream_entry_invalid_id() {
451        let entry = StreamEntry::new("invalid-id".to_string(), HashMap::new());
452
453        assert_eq!(entry.timestamp(), None);
454        assert_eq!(entry.sequence(), None);
455    }
456
457    #[test]
458    fn test_stream_range_creation() {
459        let range = StreamRange::new("1000", "2000").with_count(10);
460
461        assert_eq!(range.start, "1000");
462        assert_eq!(range.end, "2000");
463        assert_eq!(range.count, Some(10));
464    }
465
466    #[test]
467    fn test_stream_range_presets() {
468        let all = StreamRange::all();
469        assert_eq!(all.start, "-");
470        assert_eq!(all.end, "+");
471
472        let from = StreamRange::from("1000");
473        assert_eq!(from.start, "1000");
474        assert_eq!(from.end, "+");
475
476        let to = StreamRange::to("2000");
477        assert_eq!(to.start, "-");
478        assert_eq!(to.end, "2000");
479    }
480
481    #[test]
482    fn test_read_options() {
483        let options = ReadOptions::new()
484            .with_count(5)
485            .with_block(Duration::from_secs(1));
486
487        assert_eq!(options.count, Some(5));
488        assert_eq!(options.block, Some(Duration::from_secs(1)));
489
490        let blocking = ReadOptions::blocking(Duration::from_millis(500));
491        assert_eq!(blocking.block, Some(Duration::from_millis(500)));
492
493        let non_blocking = ReadOptions::non_blocking(10);
494        assert_eq!(non_blocking.count, Some(10));
495        assert_eq!(non_blocking.block, None);
496    }
497
498    #[test]
499    fn test_parse_stream_entries() {
500        let response = RespValue::Array(vec![
501            RespValue::Array(vec![
502                RespValue::from("1234567890123-0"),
503                RespValue::Array(vec![
504                    RespValue::from("user"),
505                    RespValue::from("alice"),
506                    RespValue::from("action"),
507                    RespValue::from("login"),
508                ]),
509            ]),
510            RespValue::Array(vec![
511                RespValue::from("1234567890124-0"),
512                RespValue::Array(vec![
513                    RespValue::from("user"),
514                    RespValue::from("bob"),
515                    RespValue::from("action"),
516                    RespValue::from("logout"),
517                ]),
518            ]),
519        ]);
520
521        let entries = parse_stream_entries(response).unwrap();
522        assert_eq!(entries.len(), 2);
523
524        assert_eq!(entries[0].id, "1234567890123-0");
525        assert_eq!(entries[0].get_field("user"), Some(&"alice".to_string()));
526        assert_eq!(entries[0].get_field("action"), Some(&"login".to_string()));
527
528        assert_eq!(entries[1].id, "1234567890124-0");
529        assert_eq!(entries[1].get_field("user"), Some(&"bob".to_string()));
530        assert_eq!(entries[1].get_field("action"), Some(&"logout".to_string()));
531    }
532
533    #[test]
534    fn test_parse_xread_response() {
535        let response = RespValue::Array(vec![RespValue::Array(vec![
536            RespValue::from("stream1"),
537            RespValue::Array(vec![RespValue::Array(vec![
538                RespValue::from("1234567890123-0"),
539                RespValue::Array(vec![RespValue::from("field1"), RespValue::from("value1")]),
540            ])]),
541        ])]);
542
543        let result = parse_xread_response(response).unwrap();
544        assert_eq!(result.len(), 1);
545        assert!(result.contains_key("stream1"));
546
547        let entries = &result["stream1"];
548        assert_eq!(entries.len(), 1);
549        assert_eq!(entries[0].id, "1234567890123-0");
550        assert_eq!(entries[0].get_field("field1"), Some(&"value1".to_string()));
551    }
552
553    #[test]
554    fn test_parse_xread_response_null() {
555        let response = RespValue::Null;
556        let result = parse_xread_response(response).unwrap();
557        assert!(result.is_empty());
558    }
559}