multi_tier_cache/
redis_streams.rs1use crate::error::CacheResult;
2use crate::traits::{StreamEntry, StreamingBackend};
3use futures_util::future::BoxFuture;
4use redis::AsyncCommands;
5use redis::aio::ConnectionManager;
6use tracing::debug;
7
8type RawStreamEntry = (String, Vec<(String, String)>);
10
11type XReadResult = Vec<(String, Vec<RawStreamEntry>)>;
13
14#[derive(Clone)]
16pub struct RedisStreams {
17 conn_manager: ConnectionManager,
18}
19
20impl RedisStreams {
21 pub async fn new(redis_url: &str) -> CacheResult<Self> {
27 let client = redis::Client::open(redis_url).map_err(|e| {
28 crate::error::CacheError::ConfigError(format!(
29 "Failed to create Redis client for streams: {e}"
30 ))
31 })?;
32 let conn_manager = ConnectionManager::new(client).await?;
33
34 debug!("Redis Streams initialized at {}", redis_url);
35
36 Ok(Self { conn_manager })
37 }
38}
39
40impl StreamingBackend for RedisStreams {
42 fn stream_add<'a>(
43 &'a self,
44 stream_key: &'a str,
45 fields: Vec<(String, String)>,
46 maxlen: Option<usize>,
47 ) -> BoxFuture<'a, CacheResult<String>> {
48 Box::pin(async move {
49 let mut conn = self.conn_manager.clone();
50 let mut cmd = redis::cmd("XADD");
51 cmd.arg(stream_key);
52
53 if let Some(max) = maxlen {
54 cmd.arg("MAXLEN").arg("~").arg(max);
55 }
56
57 cmd.arg("*");
58
59 for (field, value) in fields {
60 cmd.arg(field).arg(value);
61 }
62
63 cmd.query_async(&mut conn).await.map_err(|e| {
64 crate::error::CacheError::BackendError(format!(
65 "Failed to add to Redis stream: {e}"
66 ))
67 })
68 })
69 }
70
71 fn stream_read_latest<'a>(
72 &'a self,
73 stream_key: &'a str,
74 count: usize,
75 ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
76 Box::pin(async move {
77 let mut conn = self.conn_manager.clone();
78 let raw_entries: Vec<RawStreamEntry> = redis::cmd("XREVRANGE")
80 .arg(stream_key)
81 .arg("+")
82 .arg("-")
83 .arg("COUNT")
84 .arg(count)
85 .query_async(&mut conn)
86 .await
87 .map_err(|e| {
88 crate::error::CacheError::BackendError(format!(
89 "Failed to read latest from Redis stream using XREVRANGE: {e}"
90 ))
91 })?;
92
93 debug!(
94 "[Stream] Read {} latest entries from '{}'",
95 raw_entries.len(),
96 stream_key
97 );
98 Ok(raw_entries)
99 })
100 }
101
102 fn stream_read<'a>(
103 &'a self,
104 stream_key: &'a str,
105 last_id: &'a str,
106 count: usize,
107 block_ms: Option<usize>,
108 ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
109 Box::pin(async move {
110 let mut conn = self.conn_manager.clone();
111 let mut options = redis::streams::StreamReadOptions::default().count(count);
112 if let Some(ms) = block_ms {
113 options = options.block(ms);
114 }
115
116 let result: XReadResult = conn
118 .xread_options(&[stream_key], &[last_id], &options)
119 .await
120 .map_err(|e| {
121 crate::error::CacheError::BackendError(format!(
122 "Failed to read from Redis stream using XREAD: {e}"
123 ))
124 })?;
125
126 let mut all_entries = Vec::new();
127 for (_stream, entries) in result {
128 for (id, fields) in entries {
129 all_entries.push((id, fields));
130 }
131 }
132
133 debug!(
134 "[Stream] XREAD retrieved {} entries from '{}'",
135 all_entries.len(),
136 stream_key
137 );
138 Ok(all_entries)
139 })
140 }
141
142 fn stream_create_group<'a>(
143 &'a self,
144 stream_key: &'a str,
145 group_name: &'a str,
146 id: &'a str,
147 ) -> BoxFuture<'a, CacheResult<()>> {
148 Box::pin(async move {
149 let mut conn = self.conn_manager.clone();
150 let _: () = redis::cmd("XGROUP")
151 .arg("CREATE")
152 .arg(stream_key)
153 .arg(group_name)
154 .arg(id)
155 .arg("MKSTREAM")
156 .query_async(&mut conn)
157 .await
158 .map_err(|e| {
159 crate::error::CacheError::BackendError(format!(
160 "Failed to create Redis stream consumer group: {e}"
161 ))
162 })?;
163 Ok(())
164 })
165 }
166
167 fn stream_read_group<'a>(
168 &'a self,
169 stream_key: &'a str,
170 group_name: &'a str,
171 consumer_name: &'a str,
172 count: usize,
173 block_ms: Option<usize>,
174 ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
175 Box::pin(async move {
176 let mut conn = self.conn_manager.clone();
177 let mut options = redis::streams::StreamReadOptions::default()
178 .group(group_name, consumer_name)
179 .count(count);
180
181 if let Some(ms) = block_ms {
182 options = options.block(ms);
183 }
184
185 let result: XReadResult = conn
187 .xread_options(&[stream_key], &[">"], &options)
188 .await
189 .map_err(|e| {
190 crate::error::CacheError::BackendError(format!(
191 "Failed to read from Redis stream group: {e}"
192 ))
193 })?;
194
195 let mut all_entries = Vec::new();
196 for (_stream, entries) in result {
197 for (id, fields) in entries {
198 all_entries.push((id, fields));
199 }
200 }
201 Ok(all_entries)
202 })
203 }
204
205 fn stream_ack<'a>(
206 &'a self,
207 stream_key: &'a str,
208 group_name: &'a str,
209 ids: &'a [String],
210 ) -> BoxFuture<'a, CacheResult<()>> {
211 Box::pin(async move {
212 let mut conn = self.conn_manager.clone();
213 let _: usize = conn.xack(stream_key, group_name, ids).await.map_err(|e| {
214 crate::error::CacheError::BackendError(format!("Failed to ACK stream entries: {e}"))
215 })?;
216 Ok(())
217 })
218 }
219}