swiftide_integrations/redis/mod.rs
1//! This module provides the integration with Redis for caching nodes in the Swiftide system.
2//!
3//! The primary component of this module is the `Redis`, which is re-exported for use
4//! in other parts of the system. The `Redis` struct is responsible for managing and
5//! caching nodes during the indexing process, leveraging Redis for efficient storage and retrieval.
6//!
7//! # Overview
8//!
9//! Redis implements the following `Swiftide` traits:
10//! - `Node<T>Cache`
11//! - `Persist`
12//! - `MessageHistory`
13//!
14//! Additionally it provides various helper and utility functions for managing the Redis connection
15//! and key management. The connection is managed using a connection manager. When
16//! cloned, the connection manager is shared across all instances.
17
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result};
21use derive_builder::Builder;
22use serde::Serialize;
23use tokio::sync::RwLock;
24
25use swiftide_core::indexing::{Chunk, Node};
26
27mod message_history;
28mod node_cache;
29mod persist;
30
31/// `Redis` provides a caching mechanism for nodes using Redis.
32/// It helps in optimizing the indexing process by skipping nodes that have already been processed.
33///
34/// # Fields
35///
36/// * `client` - The Redis client used to interact with the Redis server.
37/// * `connection_manager` - Manages the Redis connections asynchronously.
38/// * `key_prefix` - A prefix used for keys stored in Redis to avoid collisions.
39#[allow(clippy::type_complexity)]
40#[derive(Builder, Clone)]
41#[builder(pattern = "owned", setter(strip_option))]
42pub struct Redis<T: Chunk = String> {
43 #[builder(setter(into))]
44 client: Arc<redis::Client>,
45 #[builder(default, setter(skip))]
46 connection_manager: Arc<RwLock<Option<redis::aio::ConnectionManager>>>,
47 #[builder(default, setter(into))]
48 cache_key_prefix: Arc<String>,
49 #[builder(default = "10")]
50 /// The batch size used for persisting nodes. Defaults to a safe 10.
51 batch_size: usize,
52 #[builder(default)]
53 /// Customize the key used for persisting nodes
54 persist_key_fn: Option<fn(&Node<T>) -> Result<String>>,
55 #[builder(default)]
56 /// Customize the value used for persisting nodes
57 persist_value_fn: Option<fn(&Node<T>) -> Result<String>>,
58 #[builder(default = "message_history".to_string().into(), setter(into))]
59 message_history_key: Arc<String>,
60}
61
62impl Redis<String> {
63 /// Creates a new `Redis` instance from a given Redis URL and key prefix.
64 ///
65 /// # Parameters
66 ///
67 /// * `url` - The URL of the Redis server.
68 /// * `prefix` - The prefix to be used for keys stored in Redis.
69 ///
70 /// # Returns
71 ///
72 /// A `Result` containing the `Redis` instance or an error if the client could not be created.
73 ///
74 /// # Errors
75 ///
76 /// Returns an error if the Redis client cannot be opened.
77 pub fn try_from_url(url: impl AsRef<str>, prefix: impl AsRef<str>) -> Result<Redis<String>> {
78 let client = redis::Client::open(url.as_ref()).context("Failed to open redis client")?;
79 Ok(Redis::<String> {
80 client: client.into(),
81 connection_manager: Arc::new(RwLock::new(None)),
82 cache_key_prefix: prefix.as_ref().to_string().into(),
83 batch_size: 10,
84 persist_key_fn: None,
85 persist_value_fn: None,
86 message_history_key: format!("{}:message_history", prefix.as_ref()).into(),
87 })
88 }
89}
90
91impl<T: Chunk> Redis<T> {
92 /// # Errors
93 ///
94 /// Returns an error if the Redis client cannot be opened
95 pub fn try_build_from_url(url: impl AsRef<str>) -> Result<RedisBuilder<T>> {
96 Ok(RedisBuilder::default()
97 .client(redis::Client::open(url.as_ref()).context("Failed to open redis client")?))
98 }
99
100 /// Builds a new `Redis` instance from the builder.
101 pub fn builder() -> RedisBuilder<T> {
102 RedisBuilder::default()
103 }
104
105 /// Set the key to be used for the message history
106 pub fn with_message_history_key(&mut self, prefix: impl Into<String>) -> &mut Self {
107 self.message_history_key = Arc::new(prefix.into());
108 self
109 }
110
111 /// Lazily connects to the Redis server and returns the connection manager.
112 ///
113 /// # Returns
114 ///
115 /// An `Option` containing the `ConnectionManager` if the connection is successful, or `None` if
116 /// it fails.
117 ///
118 /// # Errors
119 ///
120 /// Logs an error and returns `None` if the connection manager cannot be obtained.
121 async fn lazy_connect(&self) -> Option<redis::aio::ConnectionManager> {
122 if self.connection_manager.read().await.is_none() {
123 let result = self.client.get_connection_manager().await;
124 if let Err(e) = result {
125 tracing::error!("Failed to get connection manager: {}", e);
126 return None;
127 }
128 let mut cm = self.connection_manager.write().await;
129 *cm = result.ok();
130 }
131
132 self.connection_manager.read().await.clone()
133 }
134
135 /// Generates a Redis key for a given node using the key prefix and the node's hash.
136 ///
137 /// # Parameters
138 ///
139 /// * `node` - The node for which the key is to be generated.
140 ///
141 /// # Returns
142 ///
143 /// A `String` representing the Redis key for the node.
144 fn cache_key_for_node(&self, node: &Node<T>) -> String {
145 format!("{}:{}", self.cache_key_prefix, node.id())
146 }
147
148 /// Generates a key for a given node to be persisted in Redis.
149 fn persist_key_for_node(&self, node: &Node<T>) -> Result<String> {
150 if let Some(key_fn) = self.persist_key_fn {
151 key_fn(node)
152 } else {
153 let hash = node.id();
154 Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
155 }
156 }
157
158 /// Resets the cache by deleting all keys with the specified prefix.
159 /// This function is intended for testing purposes and is inefficient for production use.
160 ///
161 /// # Errors
162 ///
163 /// Panics if the keys cannot be retrieved or deleted.
164 #[allow(dead_code)]
165 async fn reset_cache(&self) {
166 if let Some(mut cm) = self.lazy_connect().await {
167 let keys: Vec<String> = redis::cmd("KEYS")
168 .arg(format!("{}:*", self.cache_key_prefix))
169 .query_async(&mut cm)
170 .await
171 .expect("Could not get keys");
172
173 for key in &keys {
174 let _: usize = redis::cmd("DEL")
175 .arg(key)
176 .query_async(&mut cm)
177 .await
178 .expect("Failed to reset cache");
179 }
180 }
181 }
182
183 /// Gets a node persisted in Redis using the GET command
184 /// Takes a node and returns a Result<Option<String>>
185 #[allow(dead_code)]
186 async fn get_node(&self, node: &Node<T>) -> Result<Option<String>> {
187 if let Some(mut cm) = self.lazy_connect().await {
188 let key = self.persist_key_for_node(node)?;
189 let result: Option<String> = redis::cmd("GET")
190 .arg(key)
191 .query_async(&mut cm)
192 .await
193 .context("Error getting from redis")?;
194 Ok(result)
195 } else {
196 anyhow::bail!("Failed to connect to Redis")
197 }
198 }
199}
200
201impl<T: Chunk + Serialize> Redis<T> {
202 /// Generates a value for a given node to be persisted in Redis.
203 /// By default, the node is serialized as JSON.
204 /// If a custom function is provided, it is used to generate the value.
205 /// Otherwise, the node is serialized as JSON.
206 fn persist_value_for_node(&self, node: &Node<T>) -> Result<String> {
207 if let Some(value_fn) = self.persist_value_fn {
208 value_fn(node)
209 } else {
210 Ok(serde_json::to_string(node)?)
211 }
212 }
213}
214
215// Redis CM does not implement debug
216#[allow(clippy::missing_fields_in_debug)]
217impl<T: Chunk> std::fmt::Debug for Redis<T> {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 f.debug_struct("Redis")
220 .field("client", &self.client)
221 .finish()
222 }
223}