Skip to main content

hydracache_db/
query.rs

1use std::error::Error;
2use std::fmt;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::time::Duration;
6
7use hydracache::{CacheKeyBuilder, CacheOptions, HydraCache, PostcardCodec, TagSet};
8use hydracache_core::CacheCodec;
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::{DbCacheError, Result};
12
13/// A database-oriented view over a [`HydraCache`] instance.
14///
15/// `DbCache` groups query result keys under a namespace while keeping all
16/// cache storage, single-flight, tags, TTL, and stats in the shared local cache.
17///
18/// # Example
19///
20/// ```rust
21/// use std::time::Duration;
22///
23/// use hydracache::HydraCache;
24/// use hydracache_db::DbCache;
25/// use serde::{Deserialize, Serialize};
26///
27/// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28/// struct User {
29///     id: i64,
30///     name: String,
31/// }
32///
33/// # #[tokio::main]
34/// # async fn main() -> hydracache_db::Result<()> {
35/// let local = HydraCache::local().build();
36/// let queries = DbCache::new(local, "db");
37///
38/// let user = queries
39///     .cached::<User>()
40///     // Physical cache key: "db:user:42".
41///     .key("user:42")
42///     // Later, invalidate_tag("user:42") removes this result.
43///     .tag("user:42")
44///     .ttl(Duration::from_secs(60))
45///     .fetch_with(|| async {
46///         // Replace this block with code from sqlx, diesel, sea-orm, or any
47///         // other database client. It is called only when the cache does not
48///         // already contain "db:user:42" or when the cached value has expired.
49///         Ok::<_, std::io::Error>(User {
50///             id: 42,
51///             name: "Ada".to_owned(),
52///         })
53///     })
54///     .await?;
55///
56/// assert_eq!(user.id, 42);
57/// # Ok(())
58/// # }
59/// ```
60pub struct DbCache<C = PostcardCodec>
61where
62    C: CacheCodec,
63{
64    cache: HydraCache<C>,
65    namespace: String,
66}
67
68impl<C> Clone for DbCache<C>
69where
70    C: CacheCodec,
71{
72    fn clone(&self) -> Self {
73        Self {
74            cache: self.cache.clone(),
75            namespace: self.namespace.clone(),
76        }
77    }
78}
79
80impl<C> fmt::Debug for DbCache<C>
81where
82    C: CacheCodec,
83{
84    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
85        formatter
86            .debug_struct("DbCache")
87            .field("namespace", &self.namespace)
88            .finish_non_exhaustive()
89    }
90}
91
92impl<C> DbCache<C>
93where
94    C: CacheCodec,
95{
96    /// Create a database query cache adapter over an existing local cache.
97    pub fn new(cache: HydraCache<C>, namespace: impl Into<String>) -> Self {
98        Self {
99            cache,
100            namespace: namespace.into(),
101        }
102    }
103
104    /// Return the namespace used for physical cache keys.
105    pub fn namespace(&self) -> &str {
106        &self.namespace
107    }
108
109    /// Return the underlying local cache.
110    pub fn cache(&self) -> &HydraCache<C> {
111        &self.cache
112    }
113
114    /// Start describing a cacheable database-loaded value.
115    ///
116    /// This is the preferred entry point when the query is already visible
117    /// inside the `fetch_with` loader through a database client, ORM, or
118    /// repository method.
119    pub fn cached<T>(&self) -> DbQuery<T, C> {
120        DbQuery {
121            cache: self.cache.clone(),
122            namespace: self.namespace.clone(),
123            name: None,
124            key: None,
125            tags: TagSet::new(),
126            ttl: None,
127            value: PhantomData,
128        }
129    }
130
131    /// Start describing a cacheable database-loaded value with a diagnostic name.
132    pub fn named<T>(&self, name: impl Into<String>) -> DbQuery<T, C> {
133        DbQuery {
134            cache: self.cache.clone(),
135            namespace: self.namespace.clone(),
136            name: Some(name.into()),
137            key: None,
138            tags: TagSet::new(),
139            ttl: None,
140            value: PhantomData,
141        }
142    }
143
144    /// Start describing a cacheable SQL query result.
145    ///
146    /// Prefer [`DbCache::cached`] or [`DbCache::named`] when writing new code.
147    /// This method remains useful if you want the SQL text itself to be the
148    /// diagnostic label for errors and logs.
149    pub fn query_as<T>(&self, sql: impl Into<String>) -> DbQuery<T, C> {
150        self.named(sql)
151    }
152}
153
154/// A cacheable database query descriptor.
155///
156/// The descriptor is deliberately explicit: callers choose the key, tags, and
157/// TTL that match their freshness model. An operation name is optional and used
158/// only for diagnostics. `fetch_with` executes the supplied loader only on a
159/// cache miss.
160pub struct DbQuery<T, C = PostcardCodec>
161where
162    C: CacheCodec,
163{
164    cache: HydraCache<C>,
165    namespace: String,
166    name: Option<String>,
167    key: Option<String>,
168    tags: TagSet,
169    ttl: Option<Duration>,
170    value: PhantomData<fn() -> T>,
171}
172
173impl<T, C> Clone for DbQuery<T, C>
174where
175    C: CacheCodec,
176{
177    fn clone(&self) -> Self {
178        Self {
179            cache: self.cache.clone(),
180            namespace: self.namespace.clone(),
181            name: self.name.clone(),
182            key: self.key.clone(),
183            tags: self.tags.clone(),
184            ttl: self.ttl,
185            value: PhantomData,
186        }
187    }
188}
189
190impl<T, C> fmt::Debug for DbQuery<T, C>
191where
192    C: CacheCodec,
193{
194    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
195        formatter
196            .debug_struct("DbQuery")
197            .field("namespace", &self.namespace)
198            .field("name", &self.name)
199            .field("key", &self.key)
200            .field("tags", &self.tags)
201            .field("ttl", &self.ttl)
202            .finish_non_exhaustive()
203    }
204}
205
206impl<T, C> DbQuery<T, C>
207where
208    C: CacheCodec,
209{
210    /// Return the optional diagnostic operation name.
211    pub fn name(&self) -> Option<&str> {
212        self.name.as_deref()
213    }
214
215    /// Set or replace the diagnostic operation name.
216    pub fn with_name(mut self, name: impl Into<String>) -> Self {
217        self.name = Some(name.into());
218        self
219    }
220
221    /// Return the namespace used for physical cache keys.
222    pub fn namespace(&self) -> &str {
223        &self.namespace
224    }
225
226    /// Return the logical key, if one has been configured.
227    pub fn key_value(&self) -> Option<&str> {
228        self.key.as_deref()
229    }
230
231    /// Return the physical cache key, including the adapter namespace.
232    pub fn physical_key(&self) -> Option<String> {
233        self.key
234            .as_deref()
235            .map(|key| physical_key(&self.namespace, key))
236    }
237
238    /// Return the configured tags.
239    pub fn tags_value(&self) -> &[String] {
240        self.tags.as_slice()
241    }
242
243    /// Return the configured per-entry TTL.
244    pub fn ttl_value(&self) -> Option<Duration> {
245        self.ttl
246    }
247
248    /// Set the logical cache key for this query result.
249    pub fn key(mut self, key: impl Into<String>) -> Self {
250        self.key = Some(key.into());
251        self
252    }
253
254    /// Set the logical cache key from a segmented key builder.
255    pub fn key_builder(self, key: CacheKeyBuilder) -> Self {
256        self.key(key.build_string())
257    }
258
259    /// Add one invalidation tag.
260    pub fn tag(mut self, tag: impl Into<String>) -> Self {
261        self.tags = self.tags.tag(tag);
262        self
263    }
264
265    /// Add several invalidation tags.
266    pub fn tags<I, S>(mut self, tags: I) -> Self
267    where
268        I: IntoIterator<Item = S>,
269        S: Into<String>,
270    {
271        self.tags = self.tags.tags(tags);
272        self
273    }
274
275    /// Replace invalidation tags from a reusable [`TagSet`].
276    pub fn tag_set(mut self, tags: TagSet) -> Self {
277        self.tags = tags;
278        self
279    }
280
281    /// Set a per-entry TTL for this query result.
282    pub fn ttl(mut self, ttl: Duration) -> Self {
283        self.ttl = Some(ttl);
284        self
285    }
286
287    /// Fetch a cached value or run the supplied database loader on miss.
288    ///
289    /// The loader is intentionally caller-supplied so the database library
290    /// remains responsible for pools, transactions, compile-time checked
291    /// queries, and row mapping. HydraCache owns only the cache boundary.
292    pub async fn fetch_with<E, F, Fut>(self, loader: F) -> Result<T>
293    where
294        T: Serialize + DeserializeOwned + Send + 'static,
295        E: Error + Send + Sync + 'static,
296        F: FnOnce() -> Fut + Send + 'static,
297        Fut: Future<Output = std::result::Result<T, E>> + Send + 'static,
298    {
299        self.fetch_value_with(loader).await
300    }
301
302    /// Fetch a cached value with an output type chosen by an adapter.
303    ///
304    /// Most application code should use [`DbQuery::fetch_with`]. This method is
305    /// intended for adapter crates that keep the descriptor type focused on a
306    /// database row while caching shapes such as `Option<T>` or `Vec<T>`.
307    pub async fn fetch_value_with<U, E, F, Fut>(self, loader: F) -> Result<U>
308    where
309        U: Serialize + DeserializeOwned + Send + 'static,
310        E: Error + Send + Sync + 'static,
311        F: FnOnce() -> Fut + Send + 'static,
312        Fut: Future<Output = std::result::Result<U, E>> + Send + 'static,
313    {
314        let Some(key) = self.physical_key() else {
315            return Err(DbCacheError::MissingKey {
316                operation: self.operation_label(),
317            });
318        };
319
320        self.cache
321            .get_or_load(&key, self.options(), loader)
322            .await
323            .map_err(DbCacheError::from)
324    }
325
326    fn options(&self) -> CacheOptions {
327        let mut options = CacheOptions::new().tag_set(self.tags.clone());
328        if let Some(ttl) = self.ttl {
329            options = options.ttl(ttl);
330        }
331        options
332    }
333
334    fn operation_label(&self) -> String {
335        match &self.name {
336            Some(name) => name.clone(),
337            None if self.namespace.is_empty() => "unnamed".to_owned(),
338            None => format!("{}:unnamed", self.namespace),
339        }
340    }
341}
342
343fn physical_key(namespace: &str, key: &str) -> String {
344    if namespace.is_empty() {
345        key.to_owned()
346    } else {
347        format!("{namespace}:{key}")
348    }
349}