redis_om/stream_model/
async.rs1use redis::aio::ConnectionLike;
2use redis::streams::StreamMaxlen;
3use redis::{FromRedisValue, RedisResult, ToRedisArgs};
4
5use super::cmds;
6use super::message::Message;
7use super::reply::StreamReadReply;
8use super::transformers;
9
10impl Message {
11 pub async fn ack<Data: StreamModel, C: ConnectionLike + Send>(
12 &self,
13 conn: &mut C,
14 ) -> RedisResult<()> {
15 Data::ack(&self.group, &[&self.id], conn).await
16 }
17}
18
19#[async_trait::async_trait]
21pub trait StreamModel: Sized + Send {
22 type Data: ToRedisArgs + FromRedisValue + Sync;
24
25 fn stream_key() -> &'static str;
27
28 fn group_name(&self) -> &str;
30
31 fn consumer_name(&self) -> &str;
33
34 async fn publish<C: ConnectionLike + Send>(
36 data: &Self::Data,
37 conn: &mut C,
38 ) -> RedisResult<String> {
39 cmds::publish::<Self, _>(data)?.query_async(conn).await
40 }
41
42 async fn ensure_group_stream<C: ConnectionLike + Send>(&self, conn: &mut C) -> RedisResult<()> {
45 let res = cmds::ensure_group_stream::<Self>(self)?
46 .query_async(conn)
47 .await;
48 transformers::ensure_group_stream_success(res)
49 }
50
51 async fn read<C: ConnectionLike + Send>(
53 &self,
54 read_count: Option<usize>,
55 block_interval: Option<usize>,
56 conn: &mut C,
57 ) -> RedisResult<Vec<Message>> {
58 cmds::read::<Self>(self, read_count, block_interval)?
59 .query_async::<_, StreamReadReply>(conn)
60 .await
61 .map(|reply| transformers::stream_read_reply_to_messages(self, reply))?
62 }
63
64 async fn read_no_group<C: ConnectionLike + Send>(
66 id: impl AsRef<str> + Send,
67 conn: &mut C,
68 ) -> RedisResult<Vec<Message>> {
69 cmds::read_no_group::<Self>(id)?
70 .query_async::<_, StreamReadReply>(conn)
71 .await
72 .map(transformers::stream_read_no_group_reply_to_messages)?
73 }
74
75 async fn autoclaim<C: ConnectionLike + Send>(
77 group: impl AsRef<str> + Send,
78 consumer: impl AsRef<str> + Send,
79 min_idle_time: usize,
80 last_autocalim_id: impl AsRef<str> + Send,
81 read_count: Option<usize>,
82 conn: &mut C,
83 ) -> RedisResult<(String, Vec<Message>)> {
84 let group = group.as_ref();
85 cmds::autoclaim::<Self>(
86 group,
87 consumer,
88 min_idle_time,
89 last_autocalim_id,
90 read_count,
91 )?
92 .query_async(conn)
93 .await
94 .map(|(new_id, reply)| {
95 transformers::autoclaim_range_to_id_and_messages(group, new_id, reply)
96 })?
97 }
98
99 async fn ack<C: ConnectionLike + Send, I: ToRedisArgs + Sync>(
101 group: impl ToRedisArgs + Send,
102 ids: &[I],
103 conn: &mut C,
104 ) -> RedisResult<()> {
105 let cmd = cmds::ack::<Self>(group, ids)?;
106
107 cmd.query_async(conn).await
108 }
109
110 async fn len<C: ConnectionLike + Send>(conn: &mut C) -> RedisResult<usize> {
112 cmds::len::<Self>()?.query_async(conn).await
113 }
114
115 async fn trim<C: ConnectionLike + Send>(maxlen: StreamMaxlen, conn: &mut C) -> RedisResult<()> {
117 cmds::trim::<Self>(maxlen)?.query_async(conn).await
118 }
119
120 async fn range_count<
128 C: ConnectionLike + Send,
129 S: ToRedisArgs + Send,
130 E: ToRedisArgs + Send,
131 N: ToRedisArgs + Send,
132 >(
133 start: S,
134 end: E,
135 count: N,
136 conn: &mut C,
137 ) -> RedisResult<Vec<Message>> {
138 cmds::range_count::<Self, _, _, _>(start, end, count)?
139 .query_async(conn)
140 .await
141 .map(transformers::stream_range_to_messages)?
142 }
143
144 async fn range<C: ConnectionLike + Send, S: ToRedisArgs + Send, E: ToRedisArgs + Send>(
146 start: S,
147 end: E,
148 conn: &mut C,
149 ) -> RedisResult<Vec<Message>> {
150 cmds::range::<Self, _, _>(start, end)?
151 .query_async(conn)
152 .await
153 .map(transformers::stream_range_to_messages)?
154 }
155
156 async fn range_all<C: ConnectionLike + Send>(conn: &mut C) -> RedisResult<Vec<Message>> {
159 cmds::range_all::<Self>()?
160 .query_async(conn)
161 .await
162 .map(transformers::stream_range_to_messages)?
163 }
164}