1use crate::core::{
73 error::{RedisError, RedisResult},
74 value::RespValue,
75};
76use std::collections::HashMap;
77use std::time::Duration;
78
79#[derive(Debug, Clone)]
81pub struct StreamEntry {
82 pub id: String,
84 pub fields: HashMap<String, String>,
86}
87
88impl StreamEntry {
89 pub fn new(id: String, fields: HashMap<String, String>) -> Self {
91 Self { id, fields }
92 }
93
94 #[must_use]
96 pub fn get_field(&self, field: &str) -> Option<&String> {
97 self.fields.get(field)
98 }
99
100 #[must_use]
102 pub fn has_field(&self, field: &str) -> bool {
103 self.fields.contains_key(field)
104 }
105
106 #[must_use]
118 pub fn timestamp(&self) -> Option<u64> {
119 self.id.split('-').next()?.parse().ok()
120 }
121
122 #[must_use]
134 pub fn sequence(&self) -> Option<u64> {
135 self.id.split('-').nth(1)?.parse().ok()
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct StreamInfo {
142 pub length: u64,
144 pub groups: u64,
146 pub first_entry: Option<String>,
148 pub last_entry: Option<String>,
150 pub last_generated_id: String,
152}
153
154#[derive(Debug, Clone)]
156pub struct ConsumerGroupInfo {
157 pub name: String,
159 pub consumers: u64,
161 pub pending: u64,
163 pub last_delivered_id: String,
165}
166
167#[derive(Debug, Clone)]
169pub struct ConsumerInfo {
170 pub name: String,
172 pub pending: u64,
174 pub idle: u64,
176}
177
178#[derive(Debug, Clone)]
180pub struct PendingMessage {
181 pub id: String,
183 pub consumer: String,
185 pub idle_time: u64,
187 pub delivery_count: u64,
189}
190
191#[derive(Debug, Clone)]
193pub struct StreamRange {
194 pub start: String,
196 pub end: String,
198 pub count: Option<u64>,
200}
201
202impl StreamRange {
203 pub fn new(start: impl Into<String>, end: impl Into<String>) -> Self {
205 Self {
206 start: start.into(),
207 end: end.into(),
208 count: None,
209 }
210 }
211
212 pub fn with_count(mut self, count: u64) -> Self {
214 self.count = Some(count);
215 self
216 }
217
218 pub fn all() -> Self {
220 Self::new("-", "+")
221 }
222
223 pub fn from(start: impl Into<String>) -> Self {
225 Self::new(start, "+")
226 }
227
228 pub fn to(end: impl Into<String>) -> Self {
230 Self::new("-", end)
231 }
232}
233
234#[derive(Debug, Clone)]
236pub struct ReadOptions {
237 pub count: Option<u64>,
239 pub block: Option<Duration>,
241}
242
243impl ReadOptions {
244 #[must_use]
246 pub fn new() -> Self {
247 Self {
248 count: None,
249 block: None,
250 }
251 }
252
253 pub fn with_count(mut self, count: u64) -> Self {
255 self.count = Some(count);
256 self
257 }
258
259 pub fn with_block(mut self, timeout: Duration) -> Self {
261 self.block = Some(timeout);
262 self
263 }
264
265 pub fn blocking(timeout: Duration) -> Self {
267 Self::new().with_block(timeout)
268 }
269
270 pub fn non_blocking(count: u64) -> Self {
272 Self::new().with_count(count)
273 }
274}
275
276impl Default for ReadOptions {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282pub fn parse_stream_entries(response: RespValue) -> RedisResult<Vec<StreamEntry>> {
284 match response {
285 RespValue::Array(items) => {
286 let mut entries = Vec::new();
287
288 for item in items {
289 match item {
290 RespValue::Array(entry_data) if entry_data.len() == 2 => {
291 let id = entry_data[0].as_string()?;
292
293 match &entry_data[1] {
294 RespValue::Array(field_values) => {
295 let mut fields = HashMap::new();
296
297 for chunk in field_values.chunks(2) {
299 if chunk.len() == 2 {
300 let field = chunk[0].as_string()?;
301 let value = chunk[1].as_string()?;
302 fields.insert(field, value);
303 }
304 }
305
306 entries.push(StreamEntry::new(id, fields));
307 }
308 _ => {
309 return Err(RedisError::Type(format!(
310 "Invalid stream entry field format: {:?}",
311 entry_data[1]
312 )))
313 }
314 }
315 }
316 _ => {
317 return Err(RedisError::Type(format!(
318 "Invalid stream entry format: {:?}",
319 item
320 )))
321 }
322 }
323 }
324
325 Ok(entries)
326 }
327 _ => Err(RedisError::Type(format!(
328 "Expected array for stream entries, got: {:?}",
329 response
330 ))),
331 }
332}
333
334pub fn parse_xread_response(response: RespValue) -> RedisResult<HashMap<String, Vec<StreamEntry>>> {
336 match response {
337 RespValue::Array(streams) => {
338 let mut result = HashMap::new();
339
340 for stream in streams {
341 match stream {
342 RespValue::Array(stream_data) if stream_data.len() == 2 => {
343 let stream_name = stream_data[0].as_string()?;
344 let entries = parse_stream_entries(stream_data[1].clone())?;
345 result.insert(stream_name, entries);
346 }
347 _ => {
348 return Err(RedisError::Type(format!(
349 "Invalid XREAD response format: {:?}",
350 stream
351 )))
352 }
353 }
354 }
355
356 Ok(result)
357 }
358 RespValue::Null => Ok(HashMap::new()), _ => Err(RedisError::Type(format!(
360 "Expected array or null for XREAD response, got: {:?}",
361 response
362 ))),
363 }
364}
365
366pub fn parse_stream_info(response: RespValue) -> RedisResult<StreamInfo> {
368 match response {
369 RespValue::Array(items) => {
370 let mut length = 0;
371 let mut groups = 0;
372 let mut first_entry = None;
373 let mut last_entry = None;
374 let mut last_generated_id = String::new();
375
376 for chunk in items.chunks(2) {
378 if chunk.len() == 2 {
379 let key = chunk[0].as_string()?;
380 match key.as_str() {
381 "length" => length = chunk[1].as_int()? as u64,
382 "groups" => groups = chunk[1].as_int()? as u64,
383 "first-entry" => {
384 if !chunk[1].is_null() {
385 if let RespValue::Array(entry) = &chunk[1] {
386 if !entry.is_empty() {
387 first_entry = Some(entry[0].as_string()?);
388 }
389 }
390 }
391 }
392 "last-entry" => {
393 if !chunk[1].is_null() {
394 if let RespValue::Array(entry) = &chunk[1] {
395 if !entry.is_empty() {
396 last_entry = Some(entry[0].as_string()?);
397 }
398 }
399 }
400 }
401 "last-generated-id" => last_generated_id = chunk[1].as_string()?,
402 _ => {} }
404 }
405 }
406
407 Ok(StreamInfo {
408 length,
409 groups,
410 first_entry,
411 last_entry,
412 last_generated_id,
413 })
414 }
415 _ => Err(RedisError::Type(format!(
416 "Expected array for stream info, got: {:?}",
417 response
418 ))),
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 #[test]
427 fn test_stream_entry_creation() {
428 let mut fields = HashMap::new();
429 fields.insert("user".to_string(), "alice".to_string());
430 fields.insert("action".to_string(), "login".to_string());
431
432 let entry = StreamEntry::new("1234567890123-0".to_string(), fields.clone());
433
434 assert_eq!(entry.id, "1234567890123-0");
435 assert_eq!(entry.fields, fields);
436 assert_eq!(entry.get_field("user"), Some(&"alice".to_string()));
437 assert!(entry.has_field("action"));
438 assert!(!entry.has_field("nonexistent"));
439 }
440
441 #[test]
442 fn test_stream_entry_timestamp_parsing() {
443 let entry = StreamEntry::new("1234567890123-5".to_string(), HashMap::new());
444
445 assert_eq!(entry.timestamp(), Some(1_234_567_890_123));
446 assert_eq!(entry.sequence(), Some(5));
447 }
448
449 #[test]
450 fn test_stream_entry_invalid_id() {
451 let entry = StreamEntry::new("invalid-id".to_string(), HashMap::new());
452
453 assert_eq!(entry.timestamp(), None);
454 assert_eq!(entry.sequence(), None);
455 }
456
457 #[test]
458 fn test_stream_range_creation() {
459 let range = StreamRange::new("1000", "2000").with_count(10);
460
461 assert_eq!(range.start, "1000");
462 assert_eq!(range.end, "2000");
463 assert_eq!(range.count, Some(10));
464 }
465
466 #[test]
467 fn test_stream_range_presets() {
468 let all = StreamRange::all();
469 assert_eq!(all.start, "-");
470 assert_eq!(all.end, "+");
471
472 let from = StreamRange::from("1000");
473 assert_eq!(from.start, "1000");
474 assert_eq!(from.end, "+");
475
476 let to = StreamRange::to("2000");
477 assert_eq!(to.start, "-");
478 assert_eq!(to.end, "2000");
479 }
480
481 #[test]
482 fn test_read_options() {
483 let options = ReadOptions::new()
484 .with_count(5)
485 .with_block(Duration::from_secs(1));
486
487 assert_eq!(options.count, Some(5));
488 assert_eq!(options.block, Some(Duration::from_secs(1)));
489
490 let blocking = ReadOptions::blocking(Duration::from_millis(500));
491 assert_eq!(blocking.block, Some(Duration::from_millis(500)));
492
493 let non_blocking = ReadOptions::non_blocking(10);
494 assert_eq!(non_blocking.count, Some(10));
495 assert_eq!(non_blocking.block, None);
496 }
497
498 #[test]
499 fn test_parse_stream_entries() {
500 let response = RespValue::Array(vec![
501 RespValue::Array(vec![
502 RespValue::from("1234567890123-0"),
503 RespValue::Array(vec![
504 RespValue::from("user"),
505 RespValue::from("alice"),
506 RespValue::from("action"),
507 RespValue::from("login"),
508 ]),
509 ]),
510 RespValue::Array(vec![
511 RespValue::from("1234567890124-0"),
512 RespValue::Array(vec![
513 RespValue::from("user"),
514 RespValue::from("bob"),
515 RespValue::from("action"),
516 RespValue::from("logout"),
517 ]),
518 ]),
519 ]);
520
521 let entries = parse_stream_entries(response).unwrap();
522 assert_eq!(entries.len(), 2);
523
524 assert_eq!(entries[0].id, "1234567890123-0");
525 assert_eq!(entries[0].get_field("user"), Some(&"alice".to_string()));
526 assert_eq!(entries[0].get_field("action"), Some(&"login".to_string()));
527
528 assert_eq!(entries[1].id, "1234567890124-0");
529 assert_eq!(entries[1].get_field("user"), Some(&"bob".to_string()));
530 assert_eq!(entries[1].get_field("action"), Some(&"logout".to_string()));
531 }
532
533 #[test]
534 fn test_parse_xread_response() {
535 let response = RespValue::Array(vec![RespValue::Array(vec![
536 RespValue::from("stream1"),
537 RespValue::Array(vec![RespValue::Array(vec![
538 RespValue::from("1234567890123-0"),
539 RespValue::Array(vec![RespValue::from("field1"), RespValue::from("value1")]),
540 ])]),
541 ])]);
542
543 let result = parse_xread_response(response).unwrap();
544 assert_eq!(result.len(), 1);
545 assert!(result.contains_key("stream1"));
546
547 let entries = &result["stream1"];
548 assert_eq!(entries.len(), 1);
549 assert_eq!(entries[0].id, "1234567890123-0");
550 assert_eq!(entries[0].get_field("field1"), Some(&"value1".to_string()));
551 }
552
553 #[test]
554 fn test_parse_xread_response_null() {
555 let response = RespValue::Null;
556 let result = parse_xread_response(response).unwrap();
557 assert!(result.is_empty());
558 }
559}