Skip to main content

multi_tier_cache/
redis_streams.rs

1use crate::traits::{StreamEntry, StreamingBackend};
2use anyhow::{Context, Result};
3use futures_util::future::BoxFuture;
4use redis::AsyncCommands;
5use redis::aio::ConnectionManager;
6use tracing::debug;
7
8/// Redis Streams client for event-driven architectures
9#[derive(Clone)]
10pub struct RedisStreams {
11    conn_manager: ConnectionManager,
12}
13
14impl RedisStreams {
15    /// Create a new `RedisStreams` instance
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if the Redis connection fails.
20    pub async fn new(redis_url: &str) -> Result<Self> {
21        let client = redis::Client::open(redis_url)?;
22        let conn_manager = ConnectionManager::new(client).await?;
23
24        debug!("Redis Streams initialized at {}", redis_url);
25
26        Ok(Self { conn_manager })
27    }
28}
29
30/// Implement `StreamingBackend` trait for `RedisStreams`
31impl StreamingBackend for RedisStreams {
32    fn stream_add<'a>(
33        &'a self,
34        stream_key: &'a str,
35        fields: Vec<(String, String)>,
36        maxlen: Option<usize>,
37    ) -> BoxFuture<'a, Result<String>> {
38        Box::pin(async move {
39            let mut conn = self.conn_manager.clone();
40            let mut cmd = redis::cmd("XADD");
41            cmd.arg(stream_key);
42
43            if let Some(max) = maxlen {
44                cmd.arg("MAXLEN").arg("~").arg(max);
45            }
46
47            cmd.arg("*");
48
49            for (field, value) in fields {
50                cmd.arg(field).arg(value);
51            }
52
53            cmd.query_async(&mut conn)
54                .await
55                .context("Failed to add to Redis stream")
56        })
57    }
58
59    fn stream_read_latest<'a>(
60        &'a self,
61        stream_key: &'a str,
62        count: usize,
63    ) -> BoxFuture<'a, Result<Vec<StreamEntry>>> {
64        Box::pin(async move {
65            let mut conn = self.conn_manager.clone();
66            // XREVRANGE is more appropriate for "latest" N entries.
67            // XREAD with "0" reads from the beginning, not necessarily the latest N.
68            // Let's use XREVRANGE for this method.
69            let raw_result: redis::Value = redis::cmd("XREVRANGE")
70                .arg(stream_key)
71                .arg("+") // Start from the latest ID
72                .arg("-") // End at the earliest ID
73                .arg("COUNT")
74                .arg(count)
75                .query_async(&mut conn)
76                .await
77                .context("Failed to read latest from Redis stream using XREVRANGE")?;
78
79            let mut entries = Vec::new();
80            if let redis::Value::Array(redis_entries) = raw_result {
81                for entry_val in redis_entries {
82                    if let redis::Value::Array(entry_parts) = entry_val
83                        && entry_parts.len() >= 2
84                        && let Some(redis::Value::BulkString(id_bytes)) = entry_parts.first()
85                    {
86                        let id = String::from_utf8_lossy(id_bytes).to_string();
87                        if let Some(redis::Value::Array(field_values)) = entry_parts.get(1) {
88                            let mut fields = Vec::new();
89                            for chunk in field_values.chunks(2) {
90                                if chunk.len() == 2
91                                    && let (
92                                        Some(redis::Value::BulkString(f_bytes)),
93                                        Some(redis::Value::BulkString(v_bytes)),
94                                    ) = (chunk.first(), chunk.get(1))
95                                {
96                                    fields.push((
97                                        String::from_utf8_lossy(f_bytes).to_string(),
98                                        String::from_utf8_lossy(v_bytes).to_string(),
99                                    ));
100                                }
101                            }
102                            entries.push((id, fields));
103                        }
104                    }
105                }
106            }
107            debug!(
108                "[Stream] Read {} latest entries from '{}'",
109                entries.len(),
110                stream_key
111            );
112            Ok(entries)
113        })
114    }
115
116    fn stream_read<'a>(
117        &'a self,
118        stream_key: &'a str,
119        last_id: &'a str,
120        count: usize,
121        block_ms: Option<usize>,
122    ) -> BoxFuture<'a, Result<Vec<StreamEntry>>> {
123        Box::pin(async move {
124            let mut conn = self.conn_manager.clone();
125            let mut options = redis::streams::StreamReadOptions::default().count(count);
126            if let Some(ms) = block_ms {
127                options = options.block(ms);
128            }
129
130            let raw_result: redis::Value = conn
131                .xread_options(&[stream_key], &[last_id], &options)
132                .await
133                .context("Failed to read from Redis stream using XREAD")?;
134
135            let mut all_entries = Vec::new();
136
137            if let redis::Value::Array(streams) = raw_result {
138                for stream in streams {
139                    if let redis::Value::Array(stream_parts) = stream
140                        && stream_parts.len() >= 2
141                        && let Some(redis::Value::Array(entries)) = stream_parts.get(1)
142                    {
143                        for entry in entries {
144                            if let redis::Value::Array(entry_parts) = entry
145                                && entry_parts.len() >= 2
146                                && let Some(redis::Value::BulkString(id_bytes)) =
147                                    entry_parts.first()
148                            {
149                                let id = String::from_utf8_lossy(id_bytes).to_string();
150                                if let Some(redis::Value::Array(field_values)) = entry_parts.get(1)
151                                {
152                                    let mut fields = Vec::new();
153                                    for chunk in field_values.chunks(2) {
154                                        if chunk.len() == 2
155                                            && let (
156                                                Some(redis::Value::BulkString(f_bytes)),
157                                                Some(redis::Value::BulkString(v_bytes)),
158                                            ) = (chunk.first(), chunk.get(1))
159                                        {
160                                            fields.push((
161                                                String::from_utf8_lossy(f_bytes).to_string(),
162                                                String::from_utf8_lossy(v_bytes).to_string(),
163                                            ));
164                                        }
165                                    }
166                                    all_entries.push((id, fields));
167                                }
168                            }
169                        }
170                    }
171                }
172            }
173
174            debug!(
175                "[Stream] XREAD retrieved {} entries from '{}'",
176                all_entries.len(),
177                stream_key
178            );
179            Ok(all_entries)
180        })
181    }
182}