redis_om/stream_model/
async.rs

1use redis::aio::ConnectionLike;
2use redis::streams::StreamMaxlen;
3use redis::{FromRedisValue, RedisResult, ToRedisArgs};
4
5use super::cmds;
6use super::message::Message;
7use super::reply::StreamReadReply;
8use super::transformers;
9
10impl Message {
11    pub async fn ack<Data: StreamModel, C: ConnectionLike + Send>(
12        &self,
13        conn: &mut C,
14    ) -> RedisResult<()> {
15        Data::ack(&self.group, &[&self.id], conn).await
16    }
17}
18
19/// Stream Model for consuming and subscribing to redis stream data type
20#[async_trait::async_trait]
21pub trait StreamModel: Sized + Send {
22    /// Data that will published and consumed from the stream
23    type Data: ToRedisArgs + FromRedisValue + Sync;
24
25    /// Redis Stream Key
26    fn stream_key() -> &'static str;
27
28    /// Group Name
29    fn group_name(&self) -> &str;
30
31    /// Consumer Name
32    fn consumer_name(&self) -> &str;
33
34    /// Publish self to stream, returning event id
35    async fn publish<C: ConnectionLike + Send>(
36        data: &Self::Data,
37        conn: &mut C,
38    ) -> RedisResult<String> {
39        cmds::publish::<Self, _>(data)?.query_async(conn).await
40    }
41
42    /// Ensure group stream exists for [`Self::stream_key`], creates a new if it doesn't exists.
43    /// Errors if it fails to ensure stream
44    async fn ensure_group_stream<C: ConnectionLike + Send>(&self, conn: &mut C) -> RedisResult<()> {
45        let res = cmds::ensure_group_stream::<Self>(self)?
46            .query_async(conn)
47            .await;
48        transformers::ensure_group_stream_success(res)
49    }
50
51    /// Read from [`Self::stream_key`] with group name and consumer name.
52    async fn read<C: ConnectionLike + Send>(
53        &self,
54        read_count: Option<usize>,
55        block_interval: Option<usize>,
56        conn: &mut C,
57    ) -> RedisResult<Vec<Message>> {
58        cmds::read::<Self>(self, read_count, block_interval)?
59            .query_async::<_, StreamReadReply>(conn)
60            .await
61            .map(|reply| transformers::stream_read_reply_to_messages(self, reply))?
62    }
63
64    /// Abstraction with default options and without a group.
65    async fn read_no_group<C: ConnectionLike + Send>(
66        id: impl AsRef<str> + Send,
67        conn: &mut C,
68    ) -> RedisResult<Vec<Message>> {
69        cmds::read_no_group::<Self>(id)?
70            .query_async::<_, StreamReadReply>(conn)
71            .await
72            .map(transformers::stream_read_no_group_reply_to_messages)?
73    }
74
75    /// Autoclaim an event and return a stream of messages found during the autoclaim.
76    async fn autoclaim<C: ConnectionLike + Send>(
77        group: impl AsRef<str> + Send,
78        consumer: impl AsRef<str> + Send,
79        min_idle_time: usize,
80        last_autocalim_id: impl AsRef<str> + Send,
81        read_count: Option<usize>,
82        conn: &mut C,
83    ) -> RedisResult<(String, Vec<Message>)> {
84        let group = group.as_ref();
85        cmds::autoclaim::<Self>(
86            group,
87            consumer,
88            min_idle_time,
89            last_autocalim_id,
90            read_count,
91        )?
92        .query_async(conn)
93        .await
94        .map(|(new_id, reply)| {
95            transformers::autoclaim_range_to_id_and_messages(group, new_id, reply)
96        })?
97    }
98
99    /// Acknowledge a given list of ids for group
100    async fn ack<C: ConnectionLike + Send, I: ToRedisArgs + Sync>(
101        group: impl ToRedisArgs + Send,
102        ids: &[I],
103        conn: &mut C,
104    ) -> RedisResult<()> {
105        let cmd = cmds::ack::<Self>(group, ids)?;
106
107        cmd.query_async(conn).await
108    }
109
110    /// Return the length of the stream
111    async fn len<C: ConnectionLike + Send>(conn: &mut C) -> RedisResult<usize> {
112        cmds::len::<Self>()?.query_async(conn).await
113    }
114
115    /// Trim a stream to a MAXLEN count.
116    async fn trim<C: ConnectionLike + Send>(maxlen: StreamMaxlen, conn: &mut C) -> RedisResult<()> {
117        cmds::trim::<Self>(maxlen)?.query_async(conn).await
118    }
119
120    /// Returns a range of messages.
121    ///
122    /// Set `start` to `-` to begin at the first message.
123    /// Set `end` to `+` to end the most recent message.
124    ///
125    /// You can pass message `id` to both `start` and `end`.
126    ///
127    async fn range_count<
128        C: ConnectionLike + Send,
129        S: ToRedisArgs + Send,
130        E: ToRedisArgs + Send,
131        N: ToRedisArgs + Send,
132    >(
133        start: S,
134        end: E,
135        count: N,
136        conn: &mut C,
137    ) -> RedisResult<Vec<Message>> {
138        cmds::range_count::<Self, _, _, _>(start, end, count)?
139            .query_async(conn)
140            .await
141            .map(transformers::stream_range_to_messages)?
142    }
143
144    /// A method for paginating the stream
145    async fn range<C: ConnectionLike + Send, S: ToRedisArgs + Send, E: ToRedisArgs + Send>(
146        start: S,
147        end: E,
148        conn: &mut C,
149    ) -> RedisResult<Vec<Message>> {
150        cmds::range::<Self, _, _>(start, end)?
151            .query_async(conn)
152            .await
153            .map(transformers::stream_range_to_messages)?
154    }
155
156    /// A helper method for automatically returning all messages in a stream by `key`.
157    /// **Use with caution!**
158    async fn range_all<C: ConnectionLike + Send>(conn: &mut C) -> RedisResult<Vec<Message>> {
159        cmds::range_all::<Self>()?
160            .query_async(conn)
161            .await
162            .map(transformers::stream_range_to_messages)?
163    }
164}