multi_tier_cache/
redis_streams.rs1use crate::traits::{StreamEntry, StreamingBackend};
2use anyhow::{Context, Result};
3use futures_util::future::BoxFuture;
4use redis::AsyncCommands;
5use redis::aio::ConnectionManager;
6use tracing::debug;
7
8#[derive(Clone)]
10pub struct RedisStreams {
11 conn_manager: ConnectionManager,
12}
13
14impl RedisStreams {
15 pub async fn new(redis_url: &str) -> Result<Self> {
21 let client = redis::Client::open(redis_url)?;
22 let conn_manager = ConnectionManager::new(client).await?;
23
24 debug!("Redis Streams initialized at {}", redis_url);
25
26 Ok(Self { conn_manager })
27 }
28}
29
30impl StreamingBackend for RedisStreams {
32 fn stream_add<'a>(
33 &'a self,
34 stream_key: &'a str,
35 fields: Vec<(String, String)>,
36 maxlen: Option<usize>,
37 ) -> BoxFuture<'a, Result<String>> {
38 Box::pin(async move {
39 let mut conn = self.conn_manager.clone();
40 let mut cmd = redis::cmd("XADD");
41 cmd.arg(stream_key);
42
43 if let Some(max) = maxlen {
44 cmd.arg("MAXLEN").arg("~").arg(max);
45 }
46
47 cmd.arg("*");
48
49 for (field, value) in fields {
50 cmd.arg(field).arg(value);
51 }
52
53 cmd.query_async(&mut conn)
54 .await
55 .context("Failed to add to Redis stream")
56 })
57 }
58
59 fn stream_read_latest<'a>(
60 &'a self,
61 stream_key: &'a str,
62 count: usize,
63 ) -> BoxFuture<'a, Result<Vec<StreamEntry>>> {
64 Box::pin(async move {
65 let mut conn = self.conn_manager.clone();
66 let raw_result: redis::Value = redis::cmd("XREVRANGE")
70 .arg(stream_key)
71 .arg("+") .arg("-") .arg("COUNT")
74 .arg(count)
75 .query_async(&mut conn)
76 .await
77 .context("Failed to read latest from Redis stream using XREVRANGE")?;
78
79 let mut entries = Vec::new();
80 if let redis::Value::Array(redis_entries) = raw_result {
81 for entry_val in redis_entries {
82 if let redis::Value::Array(entry_parts) = entry_val
83 && entry_parts.len() >= 2
84 && let Some(redis::Value::BulkString(id_bytes)) = entry_parts.first()
85 {
86 let id = String::from_utf8_lossy(id_bytes).to_string();
87 if let Some(redis::Value::Array(field_values)) = entry_parts.get(1) {
88 let mut fields = Vec::new();
89 for chunk in field_values.chunks(2) {
90 if chunk.len() == 2
91 && let (
92 Some(redis::Value::BulkString(f_bytes)),
93 Some(redis::Value::BulkString(v_bytes)),
94 ) = (chunk.first(), chunk.get(1))
95 {
96 fields.push((
97 String::from_utf8_lossy(f_bytes).to_string(),
98 String::from_utf8_lossy(v_bytes).to_string(),
99 ));
100 }
101 }
102 entries.push((id, fields));
103 }
104 }
105 }
106 }
107 debug!(
108 "[Stream] Read {} latest entries from '{}'",
109 entries.len(),
110 stream_key
111 );
112 Ok(entries)
113 })
114 }
115
116 fn stream_read<'a>(
117 &'a self,
118 stream_key: &'a str,
119 last_id: &'a str,
120 count: usize,
121 block_ms: Option<usize>,
122 ) -> BoxFuture<'a, Result<Vec<StreamEntry>>> {
123 Box::pin(async move {
124 let mut conn = self.conn_manager.clone();
125 let mut options = redis::streams::StreamReadOptions::default().count(count);
126 if let Some(ms) = block_ms {
127 options = options.block(ms);
128 }
129
130 let raw_result: redis::Value = conn
131 .xread_options(&[stream_key], &[last_id], &options)
132 .await
133 .context("Failed to read from Redis stream using XREAD")?;
134
135 let mut all_entries = Vec::new();
136
137 if let redis::Value::Array(streams) = raw_result {
138 for stream in streams {
139 if let redis::Value::Array(stream_parts) = stream
140 && stream_parts.len() >= 2
141 && let Some(redis::Value::Array(entries)) = stream_parts.get(1)
142 {
143 for entry in entries {
144 if let redis::Value::Array(entry_parts) = entry
145 && entry_parts.len() >= 2
146 && let Some(redis::Value::BulkString(id_bytes)) =
147 entry_parts.first()
148 {
149 let id = String::from_utf8_lossy(id_bytes).to_string();
150 if let Some(redis::Value::Array(field_values)) = entry_parts.get(1)
151 {
152 let mut fields = Vec::new();
153 for chunk in field_values.chunks(2) {
154 if chunk.len() == 2
155 && let (
156 Some(redis::Value::BulkString(f_bytes)),
157 Some(redis::Value::BulkString(v_bytes)),
158 ) = (chunk.first(), chunk.get(1))
159 {
160 fields.push((
161 String::from_utf8_lossy(f_bytes).to_string(),
162 String::from_utf8_lossy(v_bytes).to_string(),
163 ));
164 }
165 }
166 all_entries.push((id, fields));
167 }
168 }
169 }
170 }
171 }
172 }
173
174 debug!(
175 "[Stream] XREAD retrieved {} entries from '{}'",
176 all_entries.len(),
177 stream_key
178 );
179 Ok(all_entries)
180 })
181 }
182}