matrix_lite_rs/storage/
redis_storage.rs

1use super::Storage;
2use crate::Result;
3use crate::types::{Event, Room, RoomId, UserId, DeviceId, Session, Device, EventId};
4use async_trait::async_trait;
5use redis::{aio::ConnectionManager, AsyncCommands, Client};
6
7/// Redis storage implementation for Matrix
8pub struct RedisStorage {
9    client: ConnectionManager,
10}
11
12impl RedisStorage {
13    /// Create a new Redis storage instance
14    pub async fn new(redis_url: &str) -> Result<Self> {
15        let client = Client::open(redis_url)?;
16        let connection = ConnectionManager::new(client).await?;
17        Ok(Self { client: connection })
18    }
19
20    /// Create storage with an existing connection
21    pub fn with_connection(client: ConnectionManager) -> Self {
22        Self { client }
23    }
24
25    // Key generation helpers
26    fn event_key(event_id: &EventId) -> String {
27        format!("matrix:event:{}", event_id.as_str())
28    }
29
30    fn room_events_key(room_id: &RoomId) -> String {
31        format!("matrix:room:{}:events", room_id.as_str())
32    }
33
34    fn room_key(room_id: &RoomId) -> String {
35        format!("matrix:room:{}", room_id.as_str())
36    }
37
38    fn user_rooms_key(user_id: &UserId) -> String {
39        format!("matrix:user:{}:rooms", user_id.as_str())
40    }
41
42    fn session_key(access_token: &str) -> String {
43        format!("matrix:session:{}", access_token)
44    }
45
46    fn device_key(user_id: &UserId, device_id: &DeviceId) -> String {
47        format!("matrix:device:{}:{}", user_id.as_str(), device_id.as_str())
48    }
49
50    fn user_devices_key(user_id: &UserId) -> String {
51        format!("matrix:user:{}:devices", user_id.as_str())
52    }
53
54    fn encryption_key_key(user_id: &UserId, device_id: &DeviceId) -> String {
55        format!("matrix:encryption:{}:{}", user_id.as_str(), device_id.as_str())
56    }
57
58    fn read_marker_key(user_id: &UserId, room_id: &RoomId) -> String {
59        format!("matrix:read:{}:{}", user_id.as_str(), room_id.as_str())
60    }
61
62    fn unread_count_key(user_id: &UserId, room_id: &RoomId) -> String {
63        format!("matrix:unread:{}:{}", user_id.as_str(), room_id.as_str())
64    }
65}
66
67#[async_trait]
68impl Storage for RedisStorage {
69    async fn store_event(&self, event: &Event) -> Result<()> {
70        let mut conn = self.client.clone();
71
72        // Serialize event to JSON
73        let event_json = serde_json::to_string(event)?;
74
75        // Store event data
76        let event_key = Self::event_key(&event.event_id);
77        conn.set::<_, _, ()>(&event_key, &event_json).await?;
78
79        // Add to room's event list (sorted by timestamp)
80        let room_events_key = Self::room_events_key(&event.room_id);
81        let timestamp = event.timestamp.timestamp_millis() as f64;
82        conn.zadd::<_, _, _, ()>(&room_events_key, event.event_id.as_str(), timestamp).await?;
83
84        Ok(())
85    }
86
87    async fn get_room_events(&self, room_id: &RoomId, limit: usize) -> Result<Vec<Event>> {
88        let mut conn = self.client.clone();
89
90        let room_events_key = Self::room_events_key(room_id);
91
92        // Get the last N event IDs (newest first)
93        let event_ids: Vec<String> = conn.zrevrange(&room_events_key, 0, (limit - 1) as isize).await?;
94
95        let mut events = Vec::new();
96
97        for event_id_str in event_ids {
98            let event_id = EventId::new(event_id_str);
99            let event_key = Self::event_key(&event_id);
100
101            if let Some(event_json) = conn.get::<_, Option<String>>(&event_key).await? {
102                let event: Event = serde_json::from_str(&event_json)?;
103                events.push(event);
104            }
105        }
106
107        // Reverse to get chronological order (oldest first)
108        events.reverse();
109
110        Ok(events)
111    }
112
113    async fn store_room(&self, room: &Room) -> Result<()> {
114        let mut conn = self.client.clone();
115
116        let room_json = serde_json::to_string(room)?;
117        let room_key = Self::room_key(&room.room_id);
118
119        // Store room data
120        conn.set::<_, _, ()>(&room_key, &room_json).await?;
121
122        // Add to each member's room list
123        for member in &room.members {
124            let user_rooms_key = Self::user_rooms_key(&member.user_id);
125            conn.sadd::<_, _, ()>(&user_rooms_key, room.room_id.as_str()).await?;
126        }
127
128        Ok(())
129    }
130
131    async fn get_room(&self, room_id: &RoomId) -> Result<Option<Room>> {
132        let mut conn = self.client.clone();
133        let room_key = Self::room_key(room_id);
134
135        let room_json: Option<String> = conn.get(&room_key).await?;
136
137        match room_json {
138            Some(json) => {
139                let room: Room = serde_json::from_str(&json)?;
140                Ok(Some(room))
141            }
142            None => Ok(None),
143        }
144    }
145
146    async fn get_user_rooms(&self, user_id: &UserId) -> Result<Vec<Room>> {
147        let mut conn = self.client.clone();
148        let user_rooms_key = Self::user_rooms_key(user_id);
149
150        // Get all room IDs for this user
151        let room_ids: Vec<String> = conn.smembers(&user_rooms_key).await?;
152
153        let mut rooms = Vec::new();
154
155        for room_id_str in room_ids {
156            let room_id = RoomId::new(room_id_str);
157            if let Some(room) = self.get_room(&room_id).await? {
158                rooms.push(room);
159            }
160        }
161
162        Ok(rooms)
163    }
164
165    async fn store_session(&self, session: &Session) -> Result<()> {
166        let mut conn = self.client.clone();
167
168        let session_json = serde_json::to_string(session)?;
169        let session_key = Self::session_key(&session.access_token);
170
171        // Store session with optional expiry
172        if let Some(expires_at) = session.expires_at {
173            let ttl = (expires_at - session.created_at).num_seconds() as u64;
174            conn.set_ex::<_, _, ()>(&session_key, &session_json, ttl).await?;
175        } else {
176            conn.set::<_, _, ()>(&session_key, &session_json).await?;
177        }
178
179        Ok(())
180    }
181
182    async fn get_session(&self, access_token: &str) -> Result<Option<Session>> {
183        let mut conn = self.client.clone();
184        let session_key = Self::session_key(access_token);
185
186        let session_json: Option<String> = conn.get(&session_key).await?;
187
188        match session_json {
189            Some(json) => {
190                let session: Session = serde_json::from_str(&json)?;
191                Ok(Some(session))
192            }
193            None => Ok(None),
194        }
195    }
196
197    async fn delete_session(&self, access_token: &str) -> Result<()> {
198        let mut conn = self.client.clone();
199        let session_key = Self::session_key(access_token);
200
201        conn.del::<_, ()>(&session_key).await?;
202
203        Ok(())
204    }
205
206    async fn store_device(&self, device: &Device) -> Result<()> {
207        let mut conn = self.client.clone();
208
209        let device_json = serde_json::to_string(device)?;
210        let device_key = Self::device_key(&device.user_id, &device.device_id);
211
212        // Store device
213        conn.set::<_, _, ()>(&device_key, &device_json).await?;
214
215        // Add to user's device list
216        let user_devices_key = Self::user_devices_key(&device.user_id);
217        conn.sadd::<_, _, ()>(&user_devices_key, device.device_id.as_str()).await?;
218
219        Ok(())
220    }
221
222    async fn get_device(&self, user_id: &UserId, device_id: &DeviceId) -> Result<Option<Device>> {
223        let mut conn = self.client.clone();
224        let device_key = Self::device_key(user_id, device_id);
225
226        let device_json: Option<String> = conn.get(&device_key).await?;
227
228        match device_json {
229            Some(json) => {
230                let device: Device = serde_json::from_str(&json)?;
231                Ok(Some(device))
232            }
233            None => Ok(None),
234        }
235    }
236
237    async fn get_user_devices(&self, user_id: &UserId) -> Result<Vec<Device>> {
238        let mut conn = self.client.clone();
239        let user_devices_key = Self::user_devices_key(user_id);
240
241        let device_ids: Vec<String> = conn.smembers(&user_devices_key).await?;
242
243        let mut devices = Vec::new();
244
245        for device_id_str in device_ids {
246            let device_id = DeviceId::new(device_id_str);
247            if let Some(device) = self.get_device(user_id, &device_id).await? {
248                devices.push(device);
249            }
250        }
251
252        Ok(devices)
253    }
254
255    async fn delete_device(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> {
256        let mut conn = self.client.clone();
257
258        let device_key = Self::device_key(user_id, device_id);
259        let user_devices_key = Self::user_devices_key(user_id);
260
261        // Delete device data
262        conn.del::<_, ()>(&device_key).await?;
263
264        // Remove from user's device list
265        conn.srem::<_, _, ()>(&user_devices_key, device_id.as_str()).await?;
266
267        Ok(())
268    }
269
270    async fn store_encryption_key(&self, user_id: &UserId, device_id: &DeviceId, key: &str) -> Result<()> {
271        let mut conn = self.client.clone();
272        let key_key = Self::encryption_key_key(user_id, device_id);
273
274        conn.set::<_, _, ()>(&key_key, key).await?;
275
276        Ok(())
277    }
278
279    async fn get_encryption_key(&self, user_id: &UserId, device_id: &DeviceId) -> Result<Option<String>> {
280        let mut conn = self.client.clone();
281        let key_key = Self::encryption_key_key(user_id, device_id);
282
283        let key: Option<String> = conn.get(&key_key).await?;
284
285        Ok(key)
286    }
287
288    async fn mark_read(&self, user_id: &UserId, room_id: &RoomId, event_id: &str) -> Result<()> {
289        let mut conn = self.client.clone();
290
291        let read_marker_key = Self::read_marker_key(user_id, room_id);
292        let unread_count_key = Self::unread_count_key(user_id, room_id);
293
294        // Store read marker
295        conn.set::<_, _, ()>(&read_marker_key, event_id).await?;
296
297        // Reset unread count
298        conn.set::<_, _, ()>(&unread_count_key, 0).await?;
299
300        Ok(())
301    }
302
303    async fn get_unread_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<usize> {
304        let mut conn = self.client.clone();
305        let unread_count_key = Self::unread_count_key(user_id, room_id);
306
307        let count: Option<usize> = conn.get(&unread_count_key).await?;
308
309        Ok(count.unwrap_or(0))
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use chrono::Utc;
317
318    // Tests will be implemented with comprehensive coverage
319    #[tokio::test]
320    async fn test_storage_creation() {
321        // This is a placeholder - real tests will need Redis instance
322        assert!(true);
323    }
324
325    #[tokio::test]
326    async fn test_event_storage() {
327        // Will test event storage and retrieval
328        assert!(true);
329    }
330
331    #[tokio::test]
332    async fn test_room_management() {
333        // Will test room creation, updates, and membership
334        assert!(true);
335    }
336
337    #[tokio::test]
338    async fn test_session_management() {
339        // Will test session creation, validation, and expiry
340        assert!(true);
341    }
342
343    #[tokio::test]
344    async fn test_device_management() {
345        // Will test device registration and removal
346        assert!(true);
347    }
348
349    #[tokio::test]
350    async fn test_encryption_keys() {
351        // Will test key storage and retrieval
352        assert!(true);
353    }
354
355    #[tokio::test]
356    async fn test_read_markers() {
357        // Will test read markers and unread counts
358        assert!(true);
359    }
360}