Skip to main content

multi_tier_cache/
redis_streams.rs

1use crate::error::CacheResult;
2use crate::traits::{StreamEntry, StreamingBackend};
3use futures_util::future::BoxFuture;
4use redis::AsyncCommands;
5use redis::aio::ConnectionManager;
6use tracing::debug;
7
8/// Represents a raw entry from a Redis stream: (ID, [ (Field, Value) ])
9type RawStreamEntry = (String, Vec<(String, String)>);
10
11/// Represents a result from XREAD: [ (`StreamName`, [ (ID, [ (Field, Value) ]) ]) ]
12type XReadResult = Vec<(String, Vec<RawStreamEntry>)>;
13
14/// Redis Streams client for event-driven architectures
15#[derive(Clone)]
16pub struct RedisStreams {
17    conn_manager: ConnectionManager,
18}
19
20impl RedisStreams {
21    /// Create a new `RedisStreams` instance
22    ///
23    /// # Errors
24    ///
25    /// Returns an error if the Redis connection fails.
26    pub async fn new(redis_url: &str) -> CacheResult<Self> {
27        let client = redis::Client::open(redis_url).map_err(|e| {
28            crate::error::CacheError::ConfigError(format!(
29                "Failed to create Redis client for streams: {e}"
30            ))
31        })?;
32        let conn_manager = ConnectionManager::new(client).await?;
33
34        debug!("Redis Streams initialized at {}", redis_url);
35
36        Ok(Self { conn_manager })
37    }
38}
39
40/// Implement `StreamingBackend` trait for `RedisStreams`
41impl StreamingBackend for RedisStreams {
42    fn stream_add<'a>(
43        &'a self,
44        stream_key: &'a str,
45        fields: Vec<(String, String)>,
46        maxlen: Option<usize>,
47    ) -> BoxFuture<'a, CacheResult<String>> {
48        Box::pin(async move {
49            let mut conn = self.conn_manager.clone();
50            let mut cmd = redis::cmd("XADD");
51            cmd.arg(stream_key);
52
53            if let Some(max) = maxlen {
54                cmd.arg("MAXLEN").arg("~").arg(max);
55            }
56
57            cmd.arg("*");
58
59            for (field, value) in fields {
60                cmd.arg(field).arg(value);
61            }
62
63            cmd.query_async(&mut conn).await.map_err(|e| {
64                crate::error::CacheError::BackendError(format!(
65                    "Failed to add to Redis stream: {e}"
66                ))
67            })
68        })
69    }
70
71    fn stream_read_latest<'a>(
72        &'a self,
73        stream_key: &'a str,
74        count: usize,
75    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
76        Box::pin(async move {
77            let mut conn = self.conn_manager.clone();
78            // XREVRANGE returns Vec<(String, Vec<(String, String)>)> (id, fields)
79            let raw_entries: Vec<RawStreamEntry> = redis::cmd("XREVRANGE")
80                .arg(stream_key)
81                .arg("+")
82                .arg("-")
83                .arg("COUNT")
84                .arg(count)
85                .query_async(&mut conn)
86                .await
87                .map_err(|e| {
88                    crate::error::CacheError::BackendError(format!(
89                        "Failed to read latest from Redis stream using XREVRANGE: {e}"
90                    ))
91                })?;
92
93            debug!(
94                "[Stream] Read {} latest entries from '{}'",
95                raw_entries.len(),
96                stream_key
97            );
98            Ok(raw_entries)
99        })
100    }
101
102    fn stream_read<'a>(
103        &'a self,
104        stream_key: &'a str,
105        last_id: &'a str,
106        count: usize,
107        block_ms: Option<usize>,
108    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
109        Box::pin(async move {
110            let mut conn = self.conn_manager.clone();
111            let mut options = redis::streams::StreamReadOptions::default().count(count);
112            if let Some(ms) = block_ms {
113                options = options.block(ms);
114            }
115
116            // XREAD returns Vec<(String, Vec<(String, Vec<(String, String)>)>)> -> (stream_name, entries)
117            let result: XReadResult = conn
118                .xread_options(&[stream_key], &[last_id], &options)
119                .await
120                .map_err(|e| {
121                    crate::error::CacheError::BackendError(format!(
122                        "Failed to read from Redis stream using XREAD: {e}"
123                    ))
124                })?;
125
126            let mut all_entries = Vec::new();
127            for (_stream, entries) in result {
128                for (id, fields) in entries {
129                    all_entries.push((id, fields));
130                }
131            }
132
133            debug!(
134                "[Stream] XREAD retrieved {} entries from '{}'",
135                all_entries.len(),
136                stream_key
137            );
138            Ok(all_entries)
139        })
140    }
141
142    fn stream_create_group<'a>(
143        &'a self,
144        stream_key: &'a str,
145        group_name: &'a str,
146        id: &'a str,
147    ) -> BoxFuture<'a, CacheResult<()>> {
148        Box::pin(async move {
149            let mut conn = self.conn_manager.clone();
150            let _: () = redis::cmd("XGROUP")
151                .arg("CREATE")
152                .arg(stream_key)
153                .arg(group_name)
154                .arg(id)
155                .arg("MKSTREAM")
156                .query_async(&mut conn)
157                .await
158                .map_err(|e| {
159                    crate::error::CacheError::BackendError(format!(
160                        "Failed to create Redis stream consumer group: {e}"
161                    ))
162                })?;
163            Ok(())
164        })
165    }
166
167    fn stream_read_group<'a>(
168        &'a self,
169        stream_key: &'a str,
170        group_name: &'a str,
171        consumer_name: &'a str,
172        count: usize,
173        block_ms: Option<usize>,
174    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
175        Box::pin(async move {
176            let mut conn = self.conn_manager.clone();
177            let mut options = redis::streams::StreamReadOptions::default()
178                .group(group_name, consumer_name)
179                .count(count);
180
181            if let Some(ms) = block_ms {
182                options = options.block(ms);
183            }
184
185            // XREAD GROUP returns same structure as XREAD
186            let result: XReadResult = conn
187                .xread_options(&[stream_key], &[">"], &options)
188                .await
189                .map_err(|e| {
190                    crate::error::CacheError::BackendError(format!(
191                        "Failed to read from Redis stream group: {e}"
192                    ))
193                })?;
194
195            let mut all_entries = Vec::new();
196            for (_stream, entries) in result {
197                for (id, fields) in entries {
198                    all_entries.push((id, fields));
199                }
200            }
201            Ok(all_entries)
202        })
203    }
204
205    fn stream_ack<'a>(
206        &'a self,
207        stream_key: &'a str,
208        group_name: &'a str,
209        ids: &'a [String],
210    ) -> BoxFuture<'a, CacheResult<()>> {
211        Box::pin(async move {
212            let mut conn = self.conn_manager.clone();
213            let _: usize = conn.xack(stream_key, group_name, ids).await.map_err(|e| {
214                crate::error::CacheError::BackendError(format!("Failed to ACK stream entries: {e}"))
215            })?;
216            Ok(())
217        })
218    }
219}