1use std::error::Error;
2use std::fmt;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::time::Duration;
6
7use hydracache::{CacheKeyBuilder, CacheOptions, HydraCache, PostcardCodec, TagSet};
8use hydracache_core::CacheCodec;
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::{DbCacheError, Result};
12
13pub struct DbCache<C = PostcardCodec>
61where
62 C: CacheCodec,
63{
64 cache: HydraCache<C>,
65 namespace: String,
66}
67
68impl<C> Clone for DbCache<C>
69where
70 C: CacheCodec,
71{
72 fn clone(&self) -> Self {
73 Self {
74 cache: self.cache.clone(),
75 namespace: self.namespace.clone(),
76 }
77 }
78}
79
80impl<C> fmt::Debug for DbCache<C>
81where
82 C: CacheCodec,
83{
84 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
85 formatter
86 .debug_struct("DbCache")
87 .field("namespace", &self.namespace)
88 .finish_non_exhaustive()
89 }
90}
91
92impl<C> DbCache<C>
93where
94 C: CacheCodec,
95{
96 pub fn new(cache: HydraCache<C>, namespace: impl Into<String>) -> Self {
98 Self {
99 cache,
100 namespace: namespace.into(),
101 }
102 }
103
104 pub fn namespace(&self) -> &str {
106 &self.namespace
107 }
108
109 pub fn cache(&self) -> &HydraCache<C> {
111 &self.cache
112 }
113
114 pub fn cached<T>(&self) -> DbQuery<T, C> {
120 DbQuery {
121 cache: self.cache.clone(),
122 namespace: self.namespace.clone(),
123 name: None,
124 key: None,
125 tags: TagSet::new(),
126 ttl: None,
127 value: PhantomData,
128 }
129 }
130
131 pub fn named<T>(&self, name: impl Into<String>) -> DbQuery<T, C> {
133 DbQuery {
134 cache: self.cache.clone(),
135 namespace: self.namespace.clone(),
136 name: Some(name.into()),
137 key: None,
138 tags: TagSet::new(),
139 ttl: None,
140 value: PhantomData,
141 }
142 }
143
144 pub fn query_as<T>(&self, sql: impl Into<String>) -> DbQuery<T, C> {
150 self.named(sql)
151 }
152}
153
154pub struct DbQuery<T, C = PostcardCodec>
161where
162 C: CacheCodec,
163{
164 cache: HydraCache<C>,
165 namespace: String,
166 name: Option<String>,
167 key: Option<String>,
168 tags: TagSet,
169 ttl: Option<Duration>,
170 value: PhantomData<fn() -> T>,
171}
172
173impl<T, C> Clone for DbQuery<T, C>
174where
175 C: CacheCodec,
176{
177 fn clone(&self) -> Self {
178 Self {
179 cache: self.cache.clone(),
180 namespace: self.namespace.clone(),
181 name: self.name.clone(),
182 key: self.key.clone(),
183 tags: self.tags.clone(),
184 ttl: self.ttl,
185 value: PhantomData,
186 }
187 }
188}
189
190impl<T, C> fmt::Debug for DbQuery<T, C>
191where
192 C: CacheCodec,
193{
194 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
195 formatter
196 .debug_struct("DbQuery")
197 .field("namespace", &self.namespace)
198 .field("name", &self.name)
199 .field("key", &self.key)
200 .field("tags", &self.tags)
201 .field("ttl", &self.ttl)
202 .finish_non_exhaustive()
203 }
204}
205
206impl<T, C> DbQuery<T, C>
207where
208 C: CacheCodec,
209{
210 pub fn name(&self) -> Option<&str> {
212 self.name.as_deref()
213 }
214
215 pub fn with_name(mut self, name: impl Into<String>) -> Self {
217 self.name = Some(name.into());
218 self
219 }
220
221 pub fn namespace(&self) -> &str {
223 &self.namespace
224 }
225
226 pub fn key_value(&self) -> Option<&str> {
228 self.key.as_deref()
229 }
230
231 pub fn physical_key(&self) -> Option<String> {
233 self.key
234 .as_deref()
235 .map(|key| physical_key(&self.namespace, key))
236 }
237
238 pub fn tags_value(&self) -> &[String] {
240 self.tags.as_slice()
241 }
242
243 pub fn ttl_value(&self) -> Option<Duration> {
245 self.ttl
246 }
247
248 pub fn key(mut self, key: impl Into<String>) -> Self {
250 self.key = Some(key.into());
251 self
252 }
253
254 pub fn key_builder(self, key: CacheKeyBuilder) -> Self {
256 self.key(key.build_string())
257 }
258
259 pub fn tag(mut self, tag: impl Into<String>) -> Self {
261 self.tags = self.tags.tag(tag);
262 self
263 }
264
265 pub fn tags<I, S>(mut self, tags: I) -> Self
267 where
268 I: IntoIterator<Item = S>,
269 S: Into<String>,
270 {
271 self.tags = self.tags.tags(tags);
272 self
273 }
274
275 pub fn tag_set(mut self, tags: TagSet) -> Self {
277 self.tags = tags;
278 self
279 }
280
281 pub fn ttl(mut self, ttl: Duration) -> Self {
283 self.ttl = Some(ttl);
284 self
285 }
286
287 pub async fn fetch_with<E, F, Fut>(self, loader: F) -> Result<T>
293 where
294 T: Serialize + DeserializeOwned + Send + 'static,
295 E: Error + Send + Sync + 'static,
296 F: FnOnce() -> Fut + Send + 'static,
297 Fut: Future<Output = std::result::Result<T, E>> + Send + 'static,
298 {
299 self.fetch_value_with(loader).await
300 }
301
302 pub async fn fetch_value_with<U, E, F, Fut>(self, loader: F) -> Result<U>
308 where
309 U: Serialize + DeserializeOwned + Send + 'static,
310 E: Error + Send + Sync + 'static,
311 F: FnOnce() -> Fut + Send + 'static,
312 Fut: Future<Output = std::result::Result<U, E>> + Send + 'static,
313 {
314 let Some(key) = self.physical_key() else {
315 return Err(DbCacheError::MissingKey {
316 operation: self.operation_label(),
317 });
318 };
319
320 self.cache
321 .get_or_load(&key, self.options(), loader)
322 .await
323 .map_err(DbCacheError::from)
324 }
325
326 fn options(&self) -> CacheOptions {
327 let mut options = CacheOptions::new().tag_set(self.tags.clone());
328 if let Some(ttl) = self.ttl {
329 options = options.ttl(ttl);
330 }
331 options
332 }
333
334 fn operation_label(&self) -> String {
335 match &self.name {
336 Some(name) => name.clone(),
337 None if self.namespace.is_empty() => "unnamed".to_owned(),
338 None => format!("{}:unnamed", self.namespace),
339 }
340 }
341}
342
343fn physical_key(namespace: &str, key: &str) -> String {
344 if namespace.is_empty() {
345 key.to_owned()
346 } else {
347 format!("{namespace}:{key}")
348 }
349}