1use super::Storage;
2use crate::Result;
3use crate::types::{Event, Room, RoomId, UserId, DeviceId, Session, Device, EventId};
4use async_trait::async_trait;
5use redis::{aio::ConnectionManager, AsyncCommands, Client};
6
7pub struct RedisStorage {
9 client: ConnectionManager,
10}
11
12impl RedisStorage {
13 pub async fn new(redis_url: &str) -> Result<Self> {
15 let client = Client::open(redis_url)?;
16 let connection = ConnectionManager::new(client).await?;
17 Ok(Self { client: connection })
18 }
19
20 pub fn with_connection(client: ConnectionManager) -> Self {
22 Self { client }
23 }
24
25 fn event_key(event_id: &EventId) -> String {
27 format!("matrix:event:{}", event_id.as_str())
28 }
29
30 fn room_events_key(room_id: &RoomId) -> String {
31 format!("matrix:room:{}:events", room_id.as_str())
32 }
33
34 fn room_key(room_id: &RoomId) -> String {
35 format!("matrix:room:{}", room_id.as_str())
36 }
37
38 fn user_rooms_key(user_id: &UserId) -> String {
39 format!("matrix:user:{}:rooms", user_id.as_str())
40 }
41
42 fn session_key(access_token: &str) -> String {
43 format!("matrix:session:{}", access_token)
44 }
45
46 fn device_key(user_id: &UserId, device_id: &DeviceId) -> String {
47 format!("matrix:device:{}:{}", user_id.as_str(), device_id.as_str())
48 }
49
50 fn user_devices_key(user_id: &UserId) -> String {
51 format!("matrix:user:{}:devices", user_id.as_str())
52 }
53
54 fn encryption_key_key(user_id: &UserId, device_id: &DeviceId) -> String {
55 format!("matrix:encryption:{}:{}", user_id.as_str(), device_id.as_str())
56 }
57
58 fn read_marker_key(user_id: &UserId, room_id: &RoomId) -> String {
59 format!("matrix:read:{}:{}", user_id.as_str(), room_id.as_str())
60 }
61
62 fn unread_count_key(user_id: &UserId, room_id: &RoomId) -> String {
63 format!("matrix:unread:{}:{}", user_id.as_str(), room_id.as_str())
64 }
65}
66
67#[async_trait]
68impl Storage for RedisStorage {
69 async fn store_event(&self, event: &Event) -> Result<()> {
70 let mut conn = self.client.clone();
71
72 let event_json = serde_json::to_string(event)?;
74
75 let event_key = Self::event_key(&event.event_id);
77 conn.set::<_, _, ()>(&event_key, &event_json).await?;
78
79 let room_events_key = Self::room_events_key(&event.room_id);
81 let timestamp = event.timestamp.timestamp_millis() as f64;
82 conn.zadd::<_, _, _, ()>(&room_events_key, event.event_id.as_str(), timestamp).await?;
83
84 Ok(())
85 }
86
87 async fn get_room_events(&self, room_id: &RoomId, limit: usize) -> Result<Vec<Event>> {
88 let mut conn = self.client.clone();
89
90 let room_events_key = Self::room_events_key(room_id);
91
92 let event_ids: Vec<String> = conn.zrevrange(&room_events_key, 0, (limit - 1) as isize).await?;
94
95 let mut events = Vec::new();
96
97 for event_id_str in event_ids {
98 let event_id = EventId::new(event_id_str);
99 let event_key = Self::event_key(&event_id);
100
101 if let Some(event_json) = conn.get::<_, Option<String>>(&event_key).await? {
102 let event: Event = serde_json::from_str(&event_json)?;
103 events.push(event);
104 }
105 }
106
107 events.reverse();
109
110 Ok(events)
111 }
112
113 async fn store_room(&self, room: &Room) -> Result<()> {
114 let mut conn = self.client.clone();
115
116 let room_json = serde_json::to_string(room)?;
117 let room_key = Self::room_key(&room.room_id);
118
119 conn.set::<_, _, ()>(&room_key, &room_json).await?;
121
122 for member in &room.members {
124 let user_rooms_key = Self::user_rooms_key(&member.user_id);
125 conn.sadd::<_, _, ()>(&user_rooms_key, room.room_id.as_str()).await?;
126 }
127
128 Ok(())
129 }
130
131 async fn get_room(&self, room_id: &RoomId) -> Result<Option<Room>> {
132 let mut conn = self.client.clone();
133 let room_key = Self::room_key(room_id);
134
135 let room_json: Option<String> = conn.get(&room_key).await?;
136
137 match room_json {
138 Some(json) => {
139 let room: Room = serde_json::from_str(&json)?;
140 Ok(Some(room))
141 }
142 None => Ok(None),
143 }
144 }
145
146 async fn get_user_rooms(&self, user_id: &UserId) -> Result<Vec<Room>> {
147 let mut conn = self.client.clone();
148 let user_rooms_key = Self::user_rooms_key(user_id);
149
150 let room_ids: Vec<String> = conn.smembers(&user_rooms_key).await?;
152
153 let mut rooms = Vec::new();
154
155 for room_id_str in room_ids {
156 let room_id = RoomId::new(room_id_str);
157 if let Some(room) = self.get_room(&room_id).await? {
158 rooms.push(room);
159 }
160 }
161
162 Ok(rooms)
163 }
164
165 async fn store_session(&self, session: &Session) -> Result<()> {
166 let mut conn = self.client.clone();
167
168 let session_json = serde_json::to_string(session)?;
169 let session_key = Self::session_key(&session.access_token);
170
171 if let Some(expires_at) = session.expires_at {
173 let ttl = (expires_at - session.created_at).num_seconds() as u64;
174 conn.set_ex::<_, _, ()>(&session_key, &session_json, ttl).await?;
175 } else {
176 conn.set::<_, _, ()>(&session_key, &session_json).await?;
177 }
178
179 Ok(())
180 }
181
182 async fn get_session(&self, access_token: &str) -> Result<Option<Session>> {
183 let mut conn = self.client.clone();
184 let session_key = Self::session_key(access_token);
185
186 let session_json: Option<String> = conn.get(&session_key).await?;
187
188 match session_json {
189 Some(json) => {
190 let session: Session = serde_json::from_str(&json)?;
191 Ok(Some(session))
192 }
193 None => Ok(None),
194 }
195 }
196
197 async fn delete_session(&self, access_token: &str) -> Result<()> {
198 let mut conn = self.client.clone();
199 let session_key = Self::session_key(access_token);
200
201 conn.del::<_, ()>(&session_key).await?;
202
203 Ok(())
204 }
205
206 async fn store_device(&self, device: &Device) -> Result<()> {
207 let mut conn = self.client.clone();
208
209 let device_json = serde_json::to_string(device)?;
210 let device_key = Self::device_key(&device.user_id, &device.device_id);
211
212 conn.set::<_, _, ()>(&device_key, &device_json).await?;
214
215 let user_devices_key = Self::user_devices_key(&device.user_id);
217 conn.sadd::<_, _, ()>(&user_devices_key, device.device_id.as_str()).await?;
218
219 Ok(())
220 }
221
222 async fn get_device(&self, user_id: &UserId, device_id: &DeviceId) -> Result<Option<Device>> {
223 let mut conn = self.client.clone();
224 let device_key = Self::device_key(user_id, device_id);
225
226 let device_json: Option<String> = conn.get(&device_key).await?;
227
228 match device_json {
229 Some(json) => {
230 let device: Device = serde_json::from_str(&json)?;
231 Ok(Some(device))
232 }
233 None => Ok(None),
234 }
235 }
236
237 async fn get_user_devices(&self, user_id: &UserId) -> Result<Vec<Device>> {
238 let mut conn = self.client.clone();
239 let user_devices_key = Self::user_devices_key(user_id);
240
241 let device_ids: Vec<String> = conn.smembers(&user_devices_key).await?;
242
243 let mut devices = Vec::new();
244
245 for device_id_str in device_ids {
246 let device_id = DeviceId::new(device_id_str);
247 if let Some(device) = self.get_device(user_id, &device_id).await? {
248 devices.push(device);
249 }
250 }
251
252 Ok(devices)
253 }
254
255 async fn delete_device(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> {
256 let mut conn = self.client.clone();
257
258 let device_key = Self::device_key(user_id, device_id);
259 let user_devices_key = Self::user_devices_key(user_id);
260
261 conn.del::<_, ()>(&device_key).await?;
263
264 conn.srem::<_, _, ()>(&user_devices_key, device_id.as_str()).await?;
266
267 Ok(())
268 }
269
270 async fn store_encryption_key(&self, user_id: &UserId, device_id: &DeviceId, key: &str) -> Result<()> {
271 let mut conn = self.client.clone();
272 let key_key = Self::encryption_key_key(user_id, device_id);
273
274 conn.set::<_, _, ()>(&key_key, key).await?;
275
276 Ok(())
277 }
278
279 async fn get_encryption_key(&self, user_id: &UserId, device_id: &DeviceId) -> Result<Option<String>> {
280 let mut conn = self.client.clone();
281 let key_key = Self::encryption_key_key(user_id, device_id);
282
283 let key: Option<String> = conn.get(&key_key).await?;
284
285 Ok(key)
286 }
287
288 async fn mark_read(&self, user_id: &UserId, room_id: &RoomId, event_id: &str) -> Result<()> {
289 let mut conn = self.client.clone();
290
291 let read_marker_key = Self::read_marker_key(user_id, room_id);
292 let unread_count_key = Self::unread_count_key(user_id, room_id);
293
294 conn.set::<_, _, ()>(&read_marker_key, event_id).await?;
296
297 conn.set::<_, _, ()>(&unread_count_key, 0).await?;
299
300 Ok(())
301 }
302
303 async fn get_unread_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<usize> {
304 let mut conn = self.client.clone();
305 let unread_count_key = Self::unread_count_key(user_id, room_id);
306
307 let count: Option<usize> = conn.get(&unread_count_key).await?;
308
309 Ok(count.unwrap_or(0))
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use chrono::Utc;
317
318 #[tokio::test]
320 async fn test_storage_creation() {
321 assert!(true);
323 }
324
325 #[tokio::test]
326 async fn test_event_storage() {
327 assert!(true);
329 }
330
331 #[tokio::test]
332 async fn test_room_management() {
333 assert!(true);
335 }
336
337 #[tokio::test]
338 async fn test_session_management() {
339 assert!(true);
341 }
342
343 #[tokio::test]
344 async fn test_device_management() {
345 assert!(true);
347 }
348
349 #[tokio::test]
350 async fn test_encryption_keys() {
351 assert!(true);
353 }
354
355 #[tokio::test]
356 async fn test_read_markers() {
357 assert!(true);
359 }
360}