redis_module/
stream.rs

1use crate::key::RedisKey;
2use crate::raw;
3use crate::RedisError;
4use crate::RedisString;
5use crate::Status;
6use std::os::raw::c_long;
7use std::ptr;
8
9#[derive(Debug)]
10pub struct StreamRecord {
11    pub id: raw::RedisModuleStreamID,
12    pub fields: Vec<(RedisString, RedisString)>,
13}
14
15#[derive(Debug)]
16pub struct StreamIterator<'key> {
17    key: &'key RedisKey,
18}
19
20impl<'key> StreamIterator<'key> {
21    pub(crate) fn new(
22        key: &RedisKey,
23        mut from: Option<raw::RedisModuleStreamID>,
24        mut to: Option<raw::RedisModuleStreamID>,
25        exclusive: bool,
26        reverse: bool,
27    ) -> Result<StreamIterator, RedisError> {
28        let mut flags = if exclusive {
29            raw::REDISMODULE_STREAM_ITERATOR_EXCLUSIVE as i32
30        } else {
31            0
32        };
33
34        flags |= if reverse {
35            raw::REDISMODULE_STREAM_ITERATOR_REVERSE as i32
36        } else {
37            0
38        };
39
40        let res = unsafe {
41            raw::RedisModule_StreamIteratorStart.unwrap()(
42                key.key_inner,
43                flags,
44                from.as_mut().map_or(ptr::null_mut(), |v| v),
45                to.as_mut().map_or(ptr::null_mut(), |v| v),
46            )
47        };
48        if Status::Ok == res.into() {
49            Ok(StreamIterator { key })
50        } else {
51            Err(RedisError::Str("Failed creating stream iterator"))
52        }
53    }
54}
55
56impl<'key> Iterator for StreamIterator<'key> {
57    type Item = StreamRecord;
58
59    fn next(&mut self) -> Option<Self::Item> {
60        let mut id = raw::RedisModuleStreamID { ms: 0, seq: 0 };
61        let mut num_fields: c_long = 0;
62        let mut field_name: *mut raw::RedisModuleString = ptr::null_mut();
63        let mut field_val: *mut raw::RedisModuleString = ptr::null_mut();
64        if Status::Ok
65            != unsafe {
66                raw::RedisModule_StreamIteratorNextID.unwrap()(
67                    self.key.key_inner,
68                    &mut id,
69                    &mut num_fields,
70                )
71            }
72            .into()
73        {
74            return None;
75        }
76        let mut fields = Vec::new();
77        while Status::Ok
78            == unsafe {
79                raw::RedisModule_StreamIteratorNextField.unwrap()(
80                    self.key.key_inner,
81                    &mut field_name,
82                    &mut field_val,
83                )
84                .into()
85            }
86        {
87            fields.push((
88                RedisString::from_redis_module_string(ptr::null_mut(), field_name),
89                RedisString::from_redis_module_string(ptr::null_mut(), field_val),
90            ));
91        }
92        Some(StreamRecord { id, fields })
93    }
94}
95
96impl<'key> Drop for StreamIterator<'key> {
97    fn drop(&mut self) {
98        unsafe { raw::RedisModule_StreamIteratorDelete.unwrap()(self.key.key_inner) };
99    }
100}