swiftide_integrations/redis/
mod.rs

1//! This module provides the integration with Redis for caching nodes in the Swiftide system.
2//!
3//! The primary component of this module is the `Redis`, which is re-exported for use
4//! in other parts of the system. The `Redis` struct is responsible for managing and
5//! caching nodes during the indexing process, leveraging Redis for efficient storage and retrieval.
6//!
7//! # Overview
8//!
9//! Redis implements the following `Swiftide` traits:
10//! - `Node<T>Cache`
11//! - `Persist`
12//! - `MessageHistory`
13//!
14//! Additionally it provides various helper and utility functions for managing the Redis connection
15//! and key management. The connection is managed using a connection manager. When
16//! cloned, the connection manager is shared across all instances.
17
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result};
21use derive_builder::Builder;
22use serde::Serialize;
23use tokio::sync::RwLock;
24
25use swiftide_core::indexing::{Chunk, Node};
26
27mod message_history;
28mod node_cache;
29mod persist;
30
31/// `Redis` provides a caching mechanism for nodes using Redis.
32/// It helps in optimizing the indexing process by skipping nodes that have already been processed.
33///
34/// # Fields
35///
36/// * `client` - The Redis client used to interact with the Redis server.
37/// * `connection_manager` - Manages the Redis connections asynchronously.
38/// * `key_prefix` - A prefix used for keys stored in Redis to avoid collisions.
39#[allow(clippy::type_complexity)]
40#[derive(Builder, Clone)]
41#[builder(pattern = "owned", setter(strip_option))]
42pub struct Redis<T: Chunk = String> {
43    #[builder(setter(into))]
44    client: Arc<redis::Client>,
45    #[builder(default, setter(skip))]
46    connection_manager: Arc<RwLock<Option<redis::aio::ConnectionManager>>>,
47    #[builder(default, setter(into))]
48    cache_key_prefix: Arc<String>,
49    #[builder(default = "10")]
50    /// The batch size used for persisting nodes. Defaults to a safe 10.
51    batch_size: usize,
52    #[builder(default)]
53    /// Customize the key used for persisting nodes
54    persist_key_fn: Option<fn(&Node<T>) -> Result<String>>,
55    #[builder(default)]
56    /// Customize the value used for persisting nodes
57    persist_value_fn: Option<fn(&Node<T>) -> Result<String>>,
58    #[builder(default = "message_history".to_string().into(), setter(into))]
59    message_history_key: Arc<String>,
60}
61
62impl Redis<String> {
63    /// Creates a new `Redis` instance from a given Redis URL and key prefix.
64    ///
65    /// # Parameters
66    ///
67    /// * `url` - The URL of the Redis server.
68    /// * `prefix` - The prefix to be used for keys stored in Redis.
69    ///
70    /// # Returns
71    ///
72    /// A `Result` containing the `Redis` instance or an error if the client could not be created.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the Redis client cannot be opened.
77    pub fn try_from_url(url: impl AsRef<str>, prefix: impl AsRef<str>) -> Result<Redis<String>> {
78        let client = redis::Client::open(url.as_ref()).context("Failed to open redis client")?;
79        Ok(Redis::<String> {
80            client: client.into(),
81            connection_manager: Arc::new(RwLock::new(None)),
82            cache_key_prefix: prefix.as_ref().to_string().into(),
83            batch_size: 10,
84            persist_key_fn: None,
85            persist_value_fn: None,
86            message_history_key: format!("{}:message_history", prefix.as_ref()).into(),
87        })
88    }
89}
90
91impl<T: Chunk> Redis<T> {
92    /// # Errors
93    ///
94    /// Returns an error if the Redis client cannot be opened
95    pub fn try_build_from_url(url: impl AsRef<str>) -> Result<RedisBuilder<T>> {
96        Ok(RedisBuilder::default()
97            .client(redis::Client::open(url.as_ref()).context("Failed to open redis client")?))
98    }
99
100    /// Builds a new `Redis` instance from the builder.
101    pub fn builder() -> RedisBuilder<T> {
102        RedisBuilder::default()
103    }
104
105    /// Set the key to be used for the message history
106    pub fn with_message_history_key(&mut self, prefix: impl Into<String>) -> &mut Self {
107        self.message_history_key = Arc::new(prefix.into());
108        self
109    }
110
111    /// Lazily connects to the Redis server and returns the connection manager.
112    ///
113    /// # Returns
114    ///
115    /// An `Option` containing the `ConnectionManager` if the connection is successful, or `None` if
116    /// it fails.
117    ///
118    /// # Errors
119    ///
120    /// Logs an error and returns `None` if the connection manager cannot be obtained.
121    async fn lazy_connect(&self) -> Option<redis::aio::ConnectionManager> {
122        if self.connection_manager.read().await.is_none() {
123            let result = self.client.get_connection_manager().await;
124            if let Err(e) = result {
125                tracing::error!("Failed to get connection manager: {}", e);
126                return None;
127            }
128            let mut cm = self.connection_manager.write().await;
129            *cm = result.ok();
130        }
131
132        self.connection_manager.read().await.clone()
133    }
134
135    /// Generates a Redis key for a given node using the key prefix and the node's hash.
136    ///
137    /// # Parameters
138    ///
139    /// * `node` - The node for which the key is to be generated.
140    ///
141    /// # Returns
142    ///
143    /// A `String` representing the Redis key for the node.
144    fn cache_key_for_node(&self, node: &Node<T>) -> String {
145        format!("{}:{}", self.cache_key_prefix, node.id())
146    }
147
148    /// Generates a key for a given node to be persisted in Redis.
149    fn persist_key_for_node(&self, node: &Node<T>) -> Result<String> {
150        if let Some(key_fn) = self.persist_key_fn {
151            key_fn(node)
152        } else {
153            let hash = node.id();
154            Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
155        }
156    }
157
158    /// Resets the cache by deleting all keys with the specified prefix.
159    /// This function is intended for testing purposes and is inefficient for production use.
160    ///
161    /// # Errors
162    ///
163    /// Panics if the keys cannot be retrieved or deleted.
164    #[allow(dead_code)]
165    async fn reset_cache(&self) {
166        if let Some(mut cm) = self.lazy_connect().await {
167            let keys: Vec<String> = redis::cmd("KEYS")
168                .arg(format!("{}:*", self.cache_key_prefix))
169                .query_async(&mut cm)
170                .await
171                .expect("Could not get keys");
172
173            for key in &keys {
174                let _: usize = redis::cmd("DEL")
175                    .arg(key)
176                    .query_async(&mut cm)
177                    .await
178                    .expect("Failed to reset cache");
179            }
180        }
181    }
182
183    /// Gets a node persisted in Redis using the GET command
184    /// Takes a node and returns a Result<Option<String>>
185    #[allow(dead_code)]
186    async fn get_node(&self, node: &Node<T>) -> Result<Option<String>> {
187        if let Some(mut cm) = self.lazy_connect().await {
188            let key = self.persist_key_for_node(node)?;
189            let result: Option<String> = redis::cmd("GET")
190                .arg(key)
191                .query_async(&mut cm)
192                .await
193                .context("Error getting from redis")?;
194            Ok(result)
195        } else {
196            anyhow::bail!("Failed to connect to Redis")
197        }
198    }
199}
200
201impl<T: Chunk + Serialize> Redis<T> {
202    /// Generates a value for a given node to be persisted in Redis.
203    /// By default, the node is serialized as JSON.
204    /// If a custom function is provided, it is used to generate the value.
205    /// Otherwise, the node is serialized as JSON.
206    fn persist_value_for_node(&self, node: &Node<T>) -> Result<String> {
207        if let Some(value_fn) = self.persist_value_fn {
208            value_fn(node)
209        } else {
210            Ok(serde_json::to_string(node)?)
211        }
212    }
213}
214
215// Redis CM does not implement debug
216#[allow(clippy::missing_fields_in_debug)]
217impl<T: Chunk> std::fmt::Debug for Redis<T> {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        f.debug_struct("Redis")
220            .field("client", &self.client)
221            .finish()
222    }
223}