swiftide_integrations/redis/mod.rs
1//! This module provides the integration with Redis for caching nodes in the Swiftide system.
2//!
3//! The primary component of this module is the `Redis`, which is re-exported for use
4//! in other parts of the system. The `Redis` struct is responsible for managing and
5//! caching nodes during the indexing process, leveraging Redis for efficient storage and retrieval.
6//!
7//! # Overview
8//!
9//! Redis implements the following `Swiftide` traits:
10//! - `NodeCache`
11//! - `Persist`
12//! - `MessageHistory`
13//!
14//! Additionally it provides various helper and utility functions for managing the Redis connection
15//! and key management. The connection is managed using a connection manager. When
16//! cloned, the connection manager is shared across all instances.
17
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result};
21use derive_builder::Builder;
22use tokio::sync::RwLock;
23
24use swiftide_core::indexing::Node;
25
26mod message_history;
27mod node_cache;
28mod persist;
29
30/// `Redis` provides a caching mechanism for nodes using Redis.
31/// It helps in optimizing the indexing process by skipping nodes that have already been processed.
32///
33/// # Fields
34///
35/// * `client` - The Redis client used to interact with the Redis server.
36/// * `connection_manager` - Manages the Redis connections asynchronously.
37/// * `key_prefix` - A prefix used for keys stored in Redis to avoid collisions.
38#[derive(Builder, Clone)]
39#[builder(pattern = "owned", setter(strip_option))]
40pub struct Redis {
41 #[builder(setter(into))]
42 client: Arc<redis::Client>,
43 #[builder(default, setter(skip))]
44 connection_manager: Arc<RwLock<Option<redis::aio::ConnectionManager>>>,
45 #[builder(default, setter(into))]
46 cache_key_prefix: Arc<String>,
47 #[builder(default = "10")]
48 /// The batch size used for persisting nodes. Defaults to a safe 10.
49 batch_size: usize,
50 #[builder(default)]
51 /// Customize the key used for persisting nodes
52 persist_key_fn: Option<fn(&Node) -> Result<String>>,
53 #[builder(default)]
54 /// Customize the value used for persisting nodes
55 persist_value_fn: Option<fn(&Node) -> Result<String>>,
56 #[builder(default = "message_history".to_string().into(), setter(into))]
57 message_history_key: Arc<String>,
58}
59
60impl Redis {
61 /// Creates a new `Redis` instance from a given Redis URL and key prefix.
62 ///
63 /// # Parameters
64 ///
65 /// * `url` - The URL of the Redis server.
66 /// * `prefix` - The prefix to be used for keys stored in Redis.
67 ///
68 /// # Returns
69 ///
70 /// A `Result` containing the `Redis` instance or an error if the client could not be created.
71 ///
72 /// # Errors
73 ///
74 /// Returns an error if the Redis client cannot be opened.
75 pub fn try_from_url(url: impl AsRef<str>, prefix: impl AsRef<str>) -> Result<Self> {
76 let client = redis::Client::open(url.as_ref()).context("Failed to open redis client")?;
77 Ok(Self {
78 client: client.into(),
79 connection_manager: Arc::new(RwLock::new(None)),
80 cache_key_prefix: prefix.as_ref().to_string().into(),
81 batch_size: 10,
82 persist_key_fn: None,
83 persist_value_fn: None,
84 message_history_key: format!("{}:message_history", prefix.as_ref()).into(),
85 })
86 }
87
88 /// # Errors
89 ///
90 /// Returns an error if the Redis client cannot be opened
91 pub fn try_build_from_url(url: impl AsRef<str>) -> Result<RedisBuilder> {
92 Ok(RedisBuilder::default()
93 .client(redis::Client::open(url.as_ref()).context("Failed to open redis client")?))
94 }
95
96 /// Builds a new `Redis` instance from the builder.
97 pub fn builder() -> RedisBuilder {
98 RedisBuilder::default()
99 }
100
101 /// Set the key to be used for the message history
102 pub fn with_message_history_key(&mut self, prefix: impl Into<String>) -> &mut Self {
103 self.message_history_key = Arc::new(prefix.into());
104 self
105 }
106
107 /// Lazily connects to the Redis server and returns the connection manager.
108 ///
109 /// # Returns
110 ///
111 /// An `Option` containing the `ConnectionManager` if the connection is successful, or `None` if
112 /// it fails.
113 ///
114 /// # Errors
115 ///
116 /// Logs an error and returns `None` if the connection manager cannot be obtained.
117 async fn lazy_connect(&self) -> Option<redis::aio::ConnectionManager> {
118 if self.connection_manager.read().await.is_none() {
119 let result = self.client.get_connection_manager().await;
120 if let Err(e) = result {
121 tracing::error!("Failed to get connection manager: {}", e);
122 return None;
123 }
124 let mut cm = self.connection_manager.write().await;
125 *cm = result.ok();
126 }
127
128 self.connection_manager.read().await.clone()
129 }
130
131 /// Generates a Redis key for a given node using the key prefix and the node's hash.
132 ///
133 /// # Parameters
134 ///
135 /// * `node` - The node for which the key is to be generated.
136 ///
137 /// # Returns
138 ///
139 /// A `String` representing the Redis key for the node.
140 fn cache_key_for_node(&self, node: &Node) -> String {
141 let cache_id = node.parent_id().unwrap_or_else(|| node.id());
142 format!("{}:{}", self.cache_key_prefix, cache_id)
143 }
144
145 /// Generates a key for a given node to be persisted in Redis.
146 fn persist_key_for_node(&self, node: &Node) -> Result<String> {
147 if let Some(key_fn) = self.persist_key_fn {
148 key_fn(node)
149 } else {
150 let hash = node.id();
151 Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
152 }
153 }
154
155 /// Generates a value for a given node to be persisted in Redis.
156 /// By default, the node is serialized as JSON.
157 /// If a custom function is provided, it is used to generate the value.
158 /// Otherwise, the node is serialized as JSON.
159 fn persist_value_for_node(&self, node: &Node) -> Result<String> {
160 if let Some(value_fn) = self.persist_value_fn {
161 value_fn(node)
162 } else {
163 Ok(serde_json::to_string(node)?)
164 }
165 }
166
167 /// Resets the cache by deleting all keys with the specified prefix.
168 /// This function is intended for testing purposes and is inefficient for production use.
169 ///
170 /// # Errors
171 ///
172 /// Panics if the keys cannot be retrieved or deleted.
173 #[allow(dead_code)]
174 async fn reset_cache(&self) {
175 if let Some(mut cm) = self.lazy_connect().await {
176 let keys: Vec<String> = redis::cmd("KEYS")
177 .arg(format!("{}:*", self.cache_key_prefix))
178 .query_async(&mut cm)
179 .await
180 .expect("Could not get keys");
181
182 for key in &keys {
183 let _: usize = redis::cmd("DEL")
184 .arg(key)
185 .query_async(&mut cm)
186 .await
187 .expect("Failed to reset cache");
188 }
189 }
190 }
191
192 /// Gets a node persisted in Redis using the GET command
193 /// Takes a node and returns a Result<Option<String>>
194 #[allow(dead_code)]
195 async fn get_node(&self, node: &Node) -> Result<Option<String>> {
196 if let Some(mut cm) = self.lazy_connect().await {
197 let key = self.persist_key_for_node(node)?;
198 let result: Option<String> = redis::cmd("GET")
199 .arg(key)
200 .query_async(&mut cm)
201 .await
202 .context("Error getting from redis")?;
203 Ok(result)
204 } else {
205 anyhow::bail!("Failed to connect to Redis")
206 }
207 }
208}
209
210// Redis CM does not implement debug
211#[allow(clippy::missing_fields_in_debug)]
212impl std::fmt::Debug for Redis {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 f.debug_struct("Redis")
215 .field("client", &self.client)
216 .finish()
217 }
218}