hydracache_db/query.rs
1use std::error::Error;
2use std::fmt;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::time::Duration;
6
7use hydracache::{CacheKeyBuilder, CacheOptions, HydraCache, PostcardCodec, TagSet};
8use hydracache_core::CacheCodec;
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::{CacheEntity, DbCacheError, Result};
12
13/// A database-oriented view over a [`HydraCache`] instance.
14///
15/// `DbCache` groups query result keys under a namespace while keeping all
16/// cache storage, single-flight, tags, TTL, and stats in the shared local cache.
17///
18/// # Example
19///
20/// ```rust
21/// use std::time::Duration;
22///
23/// use hydracache::HydraCache;
24/// use hydracache_db::DbCache;
25/// use serde::{Deserialize, Serialize};
26///
27/// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28/// struct User {
29/// id: i64,
30/// name: String,
31/// }
32///
33/// # #[tokio::main]
34/// # async fn main() -> hydracache_db::Result<()> {
35/// let local = HydraCache::local().build();
36/// let queries = DbCache::new(local, "db");
37///
38/// let user = queries
39/// .entity::<User>("user", 42)
40/// // Later, invalidate_tag("user:42") removes this result.
41/// .collection_tag("users")
42/// .ttl(Duration::from_secs(60))
43/// .fetch_with(|| async {
44/// // Replace this block with code from sqlx, diesel, sea-orm, or any
45/// // other database client. It is called only when the cache does not
46/// // already contain "db:user:42" or when the cached value has expired.
47/// Ok::<_, std::io::Error>(User {
48/// id: 42,
49/// name: "Ada".to_owned(),
50/// })
51/// })
52/// .await?;
53///
54/// assert_eq!(user.id, 42);
55/// # Ok(())
56/// # }
57/// ```
58pub struct DbCache<C = PostcardCodec>
59where
60 C: CacheCodec,
61{
62 cache: HydraCache<C>,
63 namespace: String,
64}
65
66impl<C> Clone for DbCache<C>
67where
68 C: CacheCodec,
69{
70 fn clone(&self) -> Self {
71 Self {
72 cache: self.cache.clone(),
73 namespace: self.namespace.clone(),
74 }
75 }
76}
77
78impl<C> fmt::Debug for DbCache<C>
79where
80 C: CacheCodec,
81{
82 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
83 formatter
84 .debug_struct("DbCache")
85 .field("namespace", &self.namespace)
86 .finish_non_exhaustive()
87 }
88}
89
90impl<C> DbCache<C>
91where
92 C: CacheCodec,
93{
94 /// Create a database query cache adapter over an existing local cache.
95 pub fn new(cache: HydraCache<C>, namespace: impl Into<String>) -> Self {
96 Self {
97 cache,
98 namespace: namespace.into(),
99 }
100 }
101
102 /// Return the namespace used for physical cache keys.
103 pub fn namespace(&self) -> &str {
104 &self.namespace
105 }
106
107 /// Return the underlying local cache.
108 pub fn cache(&self) -> &HydraCache<C> {
109 &self.cache
110 }
111
112 /// Start describing a cacheable database-loaded value.
113 ///
114 /// This is the preferred entry point when the query is already visible
115 /// inside the `fetch_with` loader through a database client, ORM, or
116 /// repository method.
117 pub fn cached<T>(&self) -> DbQuery<T, C> {
118 DbQuery {
119 cache: self.cache.clone(),
120 namespace: self.namespace.clone(),
121 name: None,
122 key: None,
123 tags: TagSet::new(),
124 ttl: None,
125 value: PhantomData,
126 }
127 }
128
129 /// Start describing an entity-shaped cached value.
130 ///
131 /// This is a convenience layer over [`DbCache::cached`] that sets both the
132 /// logical key and the entity invalidation tag from escaped key segments.
133 /// For example, `entity::<User>("user", 42)` creates key `user:42` and tag
134 /// `user:42`; with namespace `db`, the physical cache key is `db:user:42`.
135 ///
136 /// # Example
137 ///
138 /// ```rust
139 /// use hydracache::HydraCache;
140 /// use hydracache_db::DbCache;
141 /// use serde::{Deserialize, Serialize};
142 ///
143 /// #[derive(Debug, Clone, Serialize, Deserialize)]
144 /// struct User {
145 /// id: i64,
146 /// }
147 ///
148 /// let queries = DbCache::new(HydraCache::local().build(), "db");
149 /// let query = queries.entity::<User>("user", 42);
150 ///
151 /// assert_eq!(query.key_value(), Some("user:42"));
152 /// assert_eq!(query.tags_value(), &["user:42".to_owned()]);
153 /// assert_eq!(query.physical_key(), Some("db:user:42".to_owned()));
154 /// ```
155 pub fn entity<T>(&self, kind: impl ToString, id: impl ToString) -> DbQuery<T, C> {
156 self.cached::<T>().for_entity(kind, id)
157 }
158
159 /// Start describing an entity-shaped cached value from [`CacheEntity`]
160 /// metadata.
161 ///
162 /// This helper removes repeated entity and collection literals from call
163 /// sites. It sets the logical key, entity tag, and optional collection tag
164 /// defined by `T`.
165 ///
166 /// # Example
167 ///
168 /// ```rust
169 /// use hydracache::HydraCache;
170 /// use hydracache_db::{CacheEntity, DbCache};
171 /// use serde::{Deserialize, Serialize};
172 ///
173 /// #[derive(Debug, Clone, Serialize, Deserialize)]
174 /// struct User {
175 /// id: i64,
176 /// }
177 ///
178 /// impl CacheEntity for User {
179 /// type Id = i64;
180 ///
181 /// const ENTITY: &'static str = "user";
182 /// const COLLECTION: Option<&'static str> = Some("users");
183 /// }
184 ///
185 /// let queries = DbCache::new(HydraCache::local().build(), "db");
186 /// let query = queries.for_entity::<User>(42);
187 ///
188 /// assert_eq!(query.key_value(), Some("user:42"));
189 /// assert_eq!(
190 /// query.tags_value(),
191 /// &["user:42".to_owned(), "users".to_owned()]
192 /// );
193 /// ```
194 pub fn for_entity<T>(&self, id: T::Id) -> DbQuery<T, C>
195 where
196 T: CacheEntity,
197 {
198 self.cached::<T>().for_cache_entity(id)
199 }
200
201 /// Start describing a collection-shaped cached value.
202 ///
203 /// This sets both the logical key and the collection invalidation tag to
204 /// the escaped collection name. For example, `collection::<User>("users")`
205 /// creates key `users` and tag `users`.
206 ///
207 /// # Example
208 ///
209 /// ```rust
210 /// use hydracache::HydraCache;
211 /// use hydracache_db::DbCache;
212 /// use serde::{Deserialize, Serialize};
213 ///
214 /// #[derive(Debug, Clone, Serialize, Deserialize)]
215 /// struct User {
216 /// id: i64,
217 /// }
218 ///
219 /// let queries = DbCache::new(HydraCache::local().build(), "db");
220 /// let query = queries.collection::<User>("users:active");
221 ///
222 /// assert_eq!(query.key_value(), Some("users%3Aactive"));
223 /// assert_eq!(query.tags_value(), &["users%3Aactive".to_owned()]);
224 /// assert_eq!(query.physical_key(), Some("db:users%3Aactive".to_owned()));
225 /// ```
226 pub fn collection<T>(&self, name: impl ToString) -> DbQuery<T, C> {
227 let tag = collection_tag(name);
228 self.cached::<T>().key(tag.clone()).tag(tag)
229 }
230
231 /// Start describing a cacheable database-loaded value with a diagnostic name.
232 pub fn named<T>(&self, name: impl Into<String>) -> DbQuery<T, C> {
233 DbQuery {
234 cache: self.cache.clone(),
235 namespace: self.namespace.clone(),
236 name: Some(name.into()),
237 key: None,
238 tags: TagSet::new(),
239 ttl: None,
240 value: PhantomData,
241 }
242 }
243
244 /// Start describing a cacheable SQL query result.
245 ///
246 /// Prefer [`DbCache::cached`] or [`DbCache::named`] when writing new code.
247 /// This method remains useful if you want the SQL text itself to be the
248 /// diagnostic label for errors and logs.
249 pub fn query_as<T>(&self, sql: impl Into<String>) -> DbQuery<T, C> {
250 self.named(sql)
251 }
252}
253
254/// A cacheable database query descriptor.
255///
256/// The descriptor is deliberately explicit: callers choose the key, tags, and
257/// TTL that match their freshness model. An operation name is optional and used
258/// only for diagnostics. `fetch_with` executes the supplied loader only on a
259/// cache miss.
260pub struct DbQuery<T, C = PostcardCodec>
261where
262 C: CacheCodec,
263{
264 cache: HydraCache<C>,
265 namespace: String,
266 name: Option<String>,
267 key: Option<String>,
268 tags: TagSet,
269 ttl: Option<Duration>,
270 value: PhantomData<fn() -> T>,
271}
272
273impl<T, C> Clone for DbQuery<T, C>
274where
275 C: CacheCodec,
276{
277 fn clone(&self) -> Self {
278 Self {
279 cache: self.cache.clone(),
280 namespace: self.namespace.clone(),
281 name: self.name.clone(),
282 key: self.key.clone(),
283 tags: self.tags.clone(),
284 ttl: self.ttl,
285 value: PhantomData,
286 }
287 }
288}
289
290impl<T, C> fmt::Debug for DbQuery<T, C>
291where
292 C: CacheCodec,
293{
294 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
295 formatter
296 .debug_struct("DbQuery")
297 .field("namespace", &self.namespace)
298 .field("name", &self.name)
299 .field("key", &self.key)
300 .field("tags", &self.tags)
301 .field("ttl", &self.ttl)
302 .finish_non_exhaustive()
303 }
304}
305
306impl<T, C> DbQuery<T, C>
307where
308 C: CacheCodec,
309{
310 /// Return the optional diagnostic operation name.
311 pub fn name(&self) -> Option<&str> {
312 self.name.as_deref()
313 }
314
315 /// Set or replace the diagnostic operation name.
316 pub fn with_name(mut self, name: impl Into<String>) -> Self {
317 self.name = Some(name.into());
318 self
319 }
320
321 /// Return the namespace used for physical cache keys.
322 pub fn namespace(&self) -> &str {
323 &self.namespace
324 }
325
326 /// Return the logical key, if one has been configured.
327 pub fn key_value(&self) -> Option<&str> {
328 self.key.as_deref()
329 }
330
331 /// Return the physical cache key, including the adapter namespace.
332 pub fn physical_key(&self) -> Option<String> {
333 let key = self.key.as_deref()?;
334 Some(physical_key(&self.namespace, key))
335 }
336
337 /// Return the configured tags.
338 pub fn tags_value(&self) -> &[String] {
339 self.tags.as_slice()
340 }
341
342 /// Return the configured per-entry TTL.
343 pub fn ttl_value(&self) -> Option<Duration> {
344 self.ttl
345 }
346
347 /// Set the logical cache key for this query result.
348 pub fn key(mut self, key: impl Into<String>) -> Self {
349 self.key = Some(key.into());
350 self
351 }
352
353 /// Set the logical cache key from a segmented key builder.
354 pub fn key_builder(self, key: CacheKeyBuilder) -> Self {
355 self.key(key.build_string())
356 }
357
358 /// Set the logical key and add an entity invalidation tag.
359 ///
360 /// `for_entity("user", 42)` sets the key to `user:42` and adds the tag
361 /// `user:42`. Both segments are escaped with [`CacheKeyBuilder`], so `:` and
362 /// `%` inside one segment cannot accidentally create extra key segments.
363 ///
364 /// # Example
365 ///
366 /// ```rust
367 /// use hydracache::HydraCache;
368 /// use hydracache_db::DbCache;
369 /// use serde::{Deserialize, Serialize};
370 ///
371 /// #[derive(Debug, Clone, Serialize, Deserialize)]
372 /// struct User {
373 /// id: i64,
374 /// }
375 ///
376 /// let queries = DbCache::new(HydraCache::local().build(), "db");
377 /// let query = queries
378 /// .cached::<User>()
379 /// .tag("users")
380 /// .for_entity("user", 42);
381 ///
382 /// assert_eq!(query.key_value(), Some("user:42"));
383 /// assert_eq!(
384 /// query.tags_value(),
385 /// &["users".to_owned(), "user:42".to_owned()]
386 /// );
387 /// ```
388 pub fn for_entity(mut self, kind: impl ToString, id: impl ToString) -> Self {
389 let key = entity_key(kind, id);
390 self.key = Some(key.clone());
391 self.tags = self.tags.tag(key);
392 self
393 }
394
395 /// Set the logical key and tags from [`CacheEntity`] metadata.
396 ///
397 /// This is the metadata-driven equivalent of [`DbQuery::for_entity`]. It
398 /// preserves any existing tags, then adds the entity tag and optional
399 /// collection tag defined by `T`.
400 ///
401 /// # Example
402 ///
403 /// ```rust
404 /// use hydracache::HydraCache;
405 /// use hydracache_db::{CacheEntity, DbCache};
406 /// use serde::{Deserialize, Serialize};
407 ///
408 /// #[derive(Debug, Clone, Serialize, Deserialize)]
409 /// struct User {
410 /// id: i64,
411 /// }
412 ///
413 /// impl CacheEntity for User {
414 /// type Id = i64;
415 ///
416 /// const ENTITY: &'static str = "user";
417 /// const COLLECTION: Option<&'static str> = Some("users");
418 /// }
419 ///
420 /// let queries = DbCache::new(HydraCache::local().build(), "db");
421 /// let query = queries
422 /// .cached::<User>()
423 /// .tag("tenant:7")
424 /// .for_cache_entity(42);
425 ///
426 /// assert_eq!(query.key_value(), Some("user:42"));
427 /// assert_eq!(
428 /// query.tags_value(),
429 /// &[
430 /// "tenant:7".to_owned(),
431 /// "user:42".to_owned(),
432 /// "users".to_owned()
433 /// ]
434 /// );
435 /// ```
436 pub fn for_cache_entity(mut self, id: T::Id) -> Self
437 where
438 T: CacheEntity,
439 {
440 let key = T::cache_key_for(&id);
441 self.key = Some(key);
442 self.tags = self.tags.tag(T::entity_tag_for(&id));
443 self.tags = append_optional_tag(self.tags, T::collection_tag());
444 self
445 }
446
447 /// Add one invalidation tag.
448 pub fn tag(mut self, tag: impl Into<String>) -> Self {
449 self.tags = self.tags.tag(tag);
450 self
451 }
452
453 /// Add a collection invalidation tag from one escaped key segment.
454 ///
455 /// Use this with [`DbCache::entity`] or [`DbQuery::for_entity`] when one
456 /// entity result also belongs to a broader list or query group.
457 ///
458 /// # Example
459 ///
460 /// ```rust
461 /// use hydracache::HydraCache;
462 /// use hydracache_db::DbCache;
463 /// use serde::{Deserialize, Serialize};
464 ///
465 /// #[derive(Debug, Clone, Serialize, Deserialize)]
466 /// struct User {
467 /// id: i64,
468 /// }
469 ///
470 /// let queries = DbCache::new(HydraCache::local().build(), "db");
471 /// let query = queries
472 /// .entity::<User>("user", 42)
473 /// .collection_tag("users:active");
474 ///
475 /// assert_eq!(
476 /// query.tags_value(),
477 /// &["user:42".to_owned(), "users%3Aactive".to_owned()]
478 /// );
479 /// ```
480 pub fn collection_tag(mut self, name: impl ToString) -> Self {
481 self.tags = self.tags.tag(collection_tag(name));
482 self
483 }
484
485 /// Add several invalidation tags.
486 pub fn tags<I, S>(mut self, tags: I) -> Self
487 where
488 I: IntoIterator<Item = S>,
489 S: Into<String>,
490 {
491 self.tags = self.tags.tags(tags);
492 self
493 }
494
495 /// Replace invalidation tags from a reusable [`TagSet`].
496 pub fn tag_set(mut self, tags: TagSet) -> Self {
497 self.tags = tags;
498 self
499 }
500
501 /// Set a per-entry TTL for this query result.
502 pub fn ttl(mut self, ttl: Duration) -> Self {
503 self.ttl = Some(ttl);
504 self
505 }
506
507 /// Fetch a cached value or run the supplied database loader on miss.
508 ///
509 /// The loader is intentionally caller-supplied so the database library
510 /// remains responsible for pools, transactions, compile-time checked
511 /// queries, and row mapping. HydraCache owns only the cache boundary.
512 pub async fn fetch_with<E, F, Fut>(self, loader: F) -> Result<T>
513 where
514 T: Serialize + DeserializeOwned + Send + 'static,
515 E: Error + Send + Sync + 'static,
516 F: FnOnce() -> Fut + Send + 'static,
517 Fut: Future<Output = std::result::Result<T, E>> + Send + 'static,
518 {
519 self.fetch_value_with(loader).await
520 }
521
522 /// Fetch a cached value with an output type chosen by an adapter.
523 ///
524 /// Most application code should use [`DbQuery::fetch_with`]. This method is
525 /// intended for adapter crates that keep the descriptor type focused on a
526 /// database row while caching shapes such as `Option<T>` or `Vec<T>`.
527 pub async fn fetch_value_with<U, E, F, Fut>(self, loader: F) -> Result<U>
528 where
529 U: Serialize + DeserializeOwned + Send + 'static,
530 E: Error + Send + Sync + 'static,
531 F: FnOnce() -> Fut + Send + 'static,
532 Fut: Future<Output = std::result::Result<U, E>> + Send + 'static,
533 {
534 let key = self.required_physical_key()?;
535
536 self.cache
537 .get_or_load(&key, self.options(), loader)
538 .await
539 .map_err(DbCacheError::from)
540 }
541
542 fn options(&self) -> CacheOptions {
543 let mut options = CacheOptions::new().tag_set(self.tags.clone());
544 if let Some(ttl) = self.ttl {
545 options = options.ttl(ttl);
546 }
547 options
548 }
549
550 fn required_physical_key(&self) -> Result<String> {
551 self.physical_key().ok_or_else(|| DbCacheError::MissingKey {
552 operation: self.operation_label(),
553 })
554 }
555
556 fn operation_label(&self) -> String {
557 self.name
558 .clone()
559 .unwrap_or_else(|| default_operation_label(&self.namespace))
560 }
561}
562
563fn entity_key(kind: impl ToString, id: impl ToString) -> String {
564 CacheKeyBuilder::new().entity(kind, id).build_string()
565}
566
567fn collection_tag(name: impl ToString) -> String {
568 CacheKeyBuilder::from_segment(name).build_string()
569}
570
571fn append_optional_tag(tags: TagSet, tag: Option<String>) -> TagSet {
572 match tag {
573 Some(tag) => tags.tag(tag),
574 None => tags,
575 }
576}
577
578fn physical_key(namespace: &str, key: &str) -> String {
579 if namespace.is_empty() {
580 key.to_owned()
581 } else {
582 format!("{namespace}:{key}")
583 }
584}
585
586fn default_operation_label(namespace: &str) -> String {
587 if namespace.is_empty() {
588 "unnamed".to_owned()
589 } else {
590 format!("{namespace}:unnamed")
591 }
592}