1use crate::key::RedisKey;
2use crate::raw;
3use crate::RedisError;
4use crate::RedisString;
5use crate::Status;
6use std::os::raw::c_long;
7use std::ptr;
8
9#[derive(Debug)]
10pub struct StreamRecord {
11 pub id: raw::RedisModuleStreamID,
12 pub fields: Vec<(RedisString, RedisString)>,
13}
14
15#[derive(Debug)]
16pub struct StreamIterator<'key> {
17 key: &'key RedisKey,
18}
19
20impl<'key> StreamIterator<'key> {
21 pub(crate) fn new(
22 key: &RedisKey,
23 mut from: Option<raw::RedisModuleStreamID>,
24 mut to: Option<raw::RedisModuleStreamID>,
25 exclusive: bool,
26 reverse: bool,
27 ) -> Result<StreamIterator, RedisError> {
28 let mut flags = if exclusive {
29 raw::REDISMODULE_STREAM_ITERATOR_EXCLUSIVE as i32
30 } else {
31 0
32 };
33
34 flags |= if reverse {
35 raw::REDISMODULE_STREAM_ITERATOR_REVERSE as i32
36 } else {
37 0
38 };
39
40 let res = unsafe {
41 raw::RedisModule_StreamIteratorStart.unwrap()(
42 key.key_inner,
43 flags,
44 from.as_mut().map_or(ptr::null_mut(), |v| v),
45 to.as_mut().map_or(ptr::null_mut(), |v| v),
46 )
47 };
48 if Status::Ok == res.into() {
49 Ok(StreamIterator { key })
50 } else {
51 Err(RedisError::Str("Failed creating stream iterator"))
52 }
53 }
54}
55
56impl<'key> Iterator for StreamIterator<'key> {
57 type Item = StreamRecord;
58
59 fn next(&mut self) -> Option<Self::Item> {
60 let mut id = raw::RedisModuleStreamID { ms: 0, seq: 0 };
61 let mut num_fields: c_long = 0;
62 let mut field_name: *mut raw::RedisModuleString = ptr::null_mut();
63 let mut field_val: *mut raw::RedisModuleString = ptr::null_mut();
64 if Status::Ok
65 != unsafe {
66 raw::RedisModule_StreamIteratorNextID.unwrap()(
67 self.key.key_inner,
68 &mut id,
69 &mut num_fields,
70 )
71 }
72 .into()
73 {
74 return None;
75 }
76 let mut fields = Vec::new();
77 while Status::Ok
78 == unsafe {
79 raw::RedisModule_StreamIteratorNextField.unwrap()(
80 self.key.key_inner,
81 &mut field_name,
82 &mut field_val,
83 )
84 .into()
85 }
86 {
87 fields.push((
88 RedisString::from_redis_module_string(ptr::null_mut(), field_name),
89 RedisString::from_redis_module_string(ptr::null_mut(), field_val),
90 ));
91 }
92 Some(StreamRecord { id, fields })
93 }
94}
95
96impl<'key> Drop for StreamIterator<'key> {
97 fn drop(&mut self) {
98 unsafe { raw::RedisModule_StreamIteratorDelete.unwrap()(self.key.key_inner) };
99 }
100}