1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::key::RedisKey;
use crate::raw;
use crate::RedisError;
use crate::RedisString;
use crate::Status;
use std::os::raw::c_long;
use std::ptr;

pub struct StreamRecord {
    pub id: raw::RedisModuleStreamID,
    pub fields: Vec<(RedisString, RedisString)>,
}

pub struct StreamIterator<'key> {
    key: &'key RedisKey,
}

impl<'key> StreamIterator<'key> {
    pub(crate) fn new(
        key: &RedisKey,
        mut from: Option<raw::RedisModuleStreamID>,
        mut to: Option<raw::RedisModuleStreamID>,
        exclusive: bool,
        reverse: bool,
    ) -> Result<StreamIterator, RedisError> {
        let mut flags = if exclusive {
            raw::REDISMODULE_STREAM_ITERATOR_EXCLUSIVE as i32
        } else {
            0
        };

        flags |= if reverse {
            raw::REDISMODULE_STREAM_ITERATOR_REVERSE as i32
        } else {
            0
        };

        let res = unsafe {
            raw::RedisModule_StreamIteratorStart.unwrap()(
                key.key_inner,
                flags,
                from.as_mut().map_or(ptr::null_mut(), |v| v),
                to.as_mut().map_or(ptr::null_mut(), |v| v),
            )
        };
        if Status::Ok == res.into() {
            Ok(StreamIterator { key })
        } else {
            Err(RedisError::Str("Failed creating stream iterator"))
        }
    }
}

impl<'key> Iterator for StreamIterator<'key> {
    type Item = StreamRecord;

    fn next(&mut self) -> Option<Self::Item> {
        let mut id = raw::RedisModuleStreamID { ms: 0, seq: 0 };
        let mut num_fields: c_long = 0;
        let mut field_name: *mut raw::RedisModuleString = ptr::null_mut();
        let mut field_val: *mut raw::RedisModuleString = ptr::null_mut();
        if Status::Ok
            != unsafe {
                raw::RedisModule_StreamIteratorNextID.unwrap()(
                    self.key.key_inner,
                    &mut id,
                    &mut num_fields,
                )
            }
            .into()
        {
            return None;
        }
        let mut fields = Vec::new();
        while Status::Ok
            == unsafe {
                raw::RedisModule_StreamIteratorNextField.unwrap()(
                    self.key.key_inner,
                    &mut field_name,
                    &mut field_val,
                )
                .into()
            }
        {
            fields.push((
                RedisString::from_redis_module_string(ptr::null_mut(), field_name),
                RedisString::from_redis_module_string(ptr::null_mut(), field_val),
            ));
        }
        Some(StreamRecord { id, fields })
    }
}

impl<'key> Drop for StreamIterator<'key> {
    fn drop(&mut self) {
        unsafe { raw::RedisModule_StreamIteratorDelete.unwrap()(self.key.key_inner) };
    }
}